Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: WebSocket client v2 (ws_v2)#151

Draft
Voyz wants to merge 23 commits intomasterVoyz/ibind:masterfrom
feat/ws_v2Voyz/ibind:feat/ws_v2Copy head branch name to clipboard
Draft

feat: WebSocket client v2 (ws_v2)#151
Voyz wants to merge 23 commits intomasterVoyz/ibind:masterfrom
feat/ws_v2Voyz/ibind:feat/ws_v2Copy head branch name to clipboard

Conversation

@Voyz
Copy link
Copy Markdown
Owner

@Voyz Voyz commented May 3, 2026

Full refactor of the WebSocket handling in IBind.

Work is still in progress, no changes are final.

Available as 0.2.1rc1.

Overview:

The IbkrWsClient V1 implementation - existing until now - had several unfavourable angles:

  • The connectivity and authentication issues were handled with aggressive thread restarts which was usually unnecessary, leaving dangling threads and WebSocket connections.
  • All functionalities were carried out on a single thread, coupling WebSocket processing with event propagation and lifecycle management.
  • Subscription API expected user to understand the somewhat-unintuitive requirements of IBKR WebSocket payloads.
  • Subscription API was non-idempotent and blocking

Although usable, it required close familiarity with the documentation, presented slightly awkward usage, and its threading model lead to complex logic, difficult testability and race conditions.

The IbkrWsClient V2 at its core introduces better threading and lifecycle management. It is implemented using three threads:

  • ws_runtime_thread - main thread of the client, handling the lifecycle management
  • ws_transport_thread - specialised thread, handling the WebSocket communication
  • async_sink_thread - specialised thread, handling event propagation

As a result, most connectivity and authentication issues are handled gracefully without need of restarting threads or recreating the WebSocketApp instance.

This refactor was also taken as an opportunity to re-evaluate the API, resulting in a number of improvements to the subscription API, event consumption and connection health. Some responsibilities previously expected of the user are now handled by the client out of the box.


BREAKING CHANGE:

  • Subscription API no longer accepts channel and data in favour of new Pydantic Subscription classes which encapsulate necessary parameters for each topic, reducing ambiguity and necessity to depend on IBKR docs (as discussed in IbkrWsKey should have conid for market data subscriptions #18)
  • Event consumption implemented using EventSink protocol instead of depending on the QueueAccessor API. Currently implemented are CallbackSink (propagating events into user-specified callbacks), QueueSink (supporting the QueueAccessor API), CompositeSink (allowing to mix multiple sinks together) and LogSink (logging all events, useful for debugging). Custom sinks that implement EventSink protocal are accepted.
  • Events are now propagated using Pydantic WsEvent class events, replacing propagating dict objects
  • Subscription API is now idempotent, expressing an intent that is later carried out by the client asynchronously. Behind the scenes, subscriptions are internally represented as Binding objects, used for managing the lifecycle of each subscription. As a result, IbkrWsClient.subscribe() and IbkrWsClient.unsubscribe() are no longer blocking and instead they return a SubscriptionHandle that can be waited on using .wait()
  • New threading model implemented from scratch, changing the WebSocket health and lifecycle handling
  • Removed SubscriptionProcessor class - instead Subscription objects define the subscribe and unsubscribe payload generation
  • Removed restart_on_close and restart_on_critical parameters - both are now fixed as true: the WebSocketApp will always reconnect on close, and the transport thread will always recreate a new WebSocketApp on critical.
  • Removed connected, ready and running properties - now replaced by is_running() and get_state()
  • Removed IbkrWsKey - identifying subscriptions and events now uses concrete WsEvent class types instead of the enum
  • Deprecated the QueueAccessor - currently still available through QueueSink.new_queue_accessor() for backwards compatibility. Instead QueueSink serves as a 1-1 replacement for QueueAccessor, passing appropriate queue-identifying key to its get() and empty() methods.

Feature:

  • Subscriptions accept expiry_seconds allowing to periodically resubscribe (fixes Feature request: auto-refresh IBKR WS smd subscriptions on 15-min server-side expiry #145)
  • Lifecycle state changes (eg. STARTED, CONNECTING, OPEN, AUTHENTICATED, DEGRADED, etc.) are now propagated as events under LIFECYCLE key, facilitating responding to lifecycle changes in user applications
  • Additional information on status of subscriptions (eg. NEW, ACTIVE, FAILED, UNSUBSCRIBED, etc.) is exposed through the API
  • MarketHistory unsubscribing with server IDs is now natively handled by the client, removing the necessity for boilerplate payload generation with a custom SubscriptionProcessor (as demonstrated in https://github.com/Voyz/ibind/blob/master/examples/ws_03_market_history.py)
  • Authentication with IBKR is validated using the session id / cookie, fixing authentication issues where a dangling connection was kept open for a session that lost authentication.

Refactor:

  • Logging of all WebSocket-related logs is now sent through a single ibind.ibkr_ws_client logger.

Chore:

Behind The Scenes

  • Removed QueueController
  • Rewrote and largely simplified SubscriptionController
  • Removed mixin usage in IbkrWsClient (still present in IbkrClient)

Implementation Details

Event Lifecycle

Most events received by the WebSocket go through the following flow:

Step Thread Data Type Description
1 ws_transport_thread raw data Received by the WebSocketApp
2 ws_transport_thread raw data Propagated to appropriate transport handler
3 ws_transport_thread raw data Parsed into TransportEvent and enqueued in runtime transport queue
4 ws_runtime_thread TransportEvent Reads from transport queue and calls appropriate handlers (retries up to 5 times on failure)
5 ws_runtime_thread TransportEvent Parsed by IbkrRouter into WsEvent instances (multiple instances can be generated from a single message)
6 ws_runtime_thread WsEvent Emitted to outgoing sink (AsyncSink by default, or directly to user sink if disabled)
7 async_sink_thread WsEvent Propagated to the user-provided sink

Two event classes TransportEvent and WsEvent - rather than just a single common Event - are used to create clear separation of concerns. TransportEvent instances are used only for WebSocket -> Client internal communication, while WsEvent instances are used for both inner-client and Client -> User Application communication.

Event Propagation

Events are now propagated using a dedicated thread. Once received by the client, they're parsed and enqueued into an outgoing queue. The new thread then consumes and propagates the events through sinks.

This ensures that slow user application code does not affect the functionality of the client.

This method can be disabled using a parameter, causing event propagation to be carried out by the runtime thread, which may be useful for debugging.

Example Usage

In V1:

from ibind import IbkrWsKey, IbkrWsClient

ws_client = IbkrWsClient(cacert=cacert, account_id=account_id)

ws_client.start()

requests = [
    {'channel': 'md+265598', 'data': {'fields': ['55', '71', '84', '86', '88', '85', '87', '7295', '7296', '70']}},
    {'channel': 'or'},
    {'channel': 'tr'},
    {'channel': f'sd+{account_id}'},
    {'channel': f'ld+{account_id}'},
    {'channel': 'pl'},
]
queue_accessors = [
    ws_client.new_queue_accessor(IbkrWsKey.TRADES),
    ws_client.new_queue_accessor(IbkrWsKey.MARKET_DATA),
    ws_client.new_queue_accessor(IbkrWsKey.ORDERS),
    ws_client.new_queue_accessor(IbkrWsKey.ACCOUNT_SUMMARY),
    ws_client.new_queue_accessor(IbkrWsKey.ACCOUNT_LEDGER),
    ws_client.new_queue_accessor(IbkrWsKey.PNL),
]

for request in requests:
    while not ws_client.subscribe(**request):
        time.sleep(1)

while ws_client.running:
    try:
        for qa in queue_accessors:
            while not qa.empty():
                print(str(qa), qa.get())

        time.sleep(1)
    except KeyboardInterrupt:
        print('KeyboardInterrupt')
        break

for request in requests:
    ws_client.unsubscribe(**request)

ws_client.shutdown()

In V2:

from ibind import IbkrWsClientV2, QueueSink
from ibind.subscriptions import MarketDataSubscription, OrdersSubscription, AccountLedgerSubscription, AccountSummarySubscription, PnlSubscription, TradesSubscription, MarketHistorySubscription, SubscriptionHandle

queue_sink = QueueSink()
ws_client = IbkrWsClientV2(cacert=cacert, account_id=account_id, sink=queue_sink)

ws_client.start()

subs = [
    TradesSubscription(),
    MarketDataSubscription(conid='265598', fields=('55', '71', '84', '86', '88', '85', '87', '7295', '7296', '70'), expiry_seconds=14 * 60),
    OrdersSubscription(),
    AccountSummarySubscription(account_id=account_id),
    AccountLedgerSubscription(account_id=account_id),
    PnlSubscription(),
]

sub_handles: List[SubscriptionHandle] = []
for sub in subs:
    handle = ws_client.subscribe(sub)
    sub_handles.append(handle)

ws_client.wait_all(sub_handles, timeout=10) # returns list of failed handles or empty list if all succeed

try:
    while ws_client.is_running():
        for sub in subs:
            while not queue_sink.empty(sub.event_type):
                ev = queue_sink.get(sub.event_type)
                print(ev)
        time.sleep(1)
except KeyboardInterrupt:
    print('Interrupt')

unsub_handles: List[SubscriptionHandle] = []
for sub in subs:
    handle = ws_client.unsubscribe(sub)
    unsub_handles.append(handle)

ws_client.wait_all(unsub_handles, timeout=10)

ws_client.shutdown()

Sink types:

from ibind import events, IbkrWsClientV2, LogSink, QueueSink, CallbackSink, CompositeSink

# Queue Sink - queue-based event consumer
queue_sink = QueueSink()

# Callback Sink - callback-based event consumer
callback_sink = CallbackSink()

def on_market_data(event: events.MarketData):
    print(event)

def on_market_history(event: events.MarketHistory):
    print(event)

def on_lifecycle(event: events.LifecycleEvent):
    print(event)

callback_sink.on(events.MarketData, on_market_data)
callback_sink.on(events.MarketHistory, on_market_data)
callback_sink.on(events.WsOpen, on_lifecycle)
callback_sink.on(events.WsClose, on_lifecycle)
callback_sink.on(events.WsError, on_lifecycle)
callback_sink.on(events.WsAuthenticated, on_lifecycle)
callback_sink.on(events.WsReady, on_lifecycle)
callback_sink.on(events.WsDegraded, on_lifecycle)

# Log Sink - useful for debugging
log_sink = LogSink()

# Composite Sink - allows us to use all above sinks at once
composite_sink = CompositeSink(callback_sink, queue_sink, log_sink)

ws_client = IbkrWsClientV2(cacert=cacert, account_id=account_id, sink=composite_sink)

Roadmap:

  • WebSocket client refactor (in progress...)
  • Tests refactor
  • New documentation
  • Migration guide

I'm very much open to feedback - please feel free to share it here if it is related to the refactor, or as a new issue if you'd like to suggest other changes/enhancements.

You can install and test this version out as 0.2.1rc1.

Voyz added 23 commits April 29, 2026 18:16
…hreading/lifecycle model, making (un)subscribing actions idempotent and introducing Pydantic models at input and output
…and changed subscription_controller._bindings key from Subscription to new the binding_key.

- added SubscriptionResolver which allow SubscriptionController to automatically detect binding_keys that need confirmation on (un)subscriptions
- finished implementing ibkr_subscriptions
# Conflicts:
#	ibind/ibkr_ws_v2/ibkr_events.py
…amed 'channel' to 'topic', implemented QueueSink as replacement of QueueController/Accessor

chore(ws_v2): cleaned up ws_runtime and ws_transport
fix(ws_v2): fixed TransportEvent attempts
refactor(ws_v2): renamed ClientInternalEvents to LifecycleEvents
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: auto-refresh IBKR WS smd subscriptions on 15-min server-side expiry

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.