feat(flashblocks): add ring buffer replay for WebSocket reconnection#1252
feat(flashblocks): add ring buffer replay for WebSocket reconnection#1252BrianBland wants to merge 6 commits intomainbase/base:mainfrom brianbland/builder-fb-ring-bufferbase/base:brianbland/builder-fb-ring-bufferCopy head branch name to clipboard
Conversation
🟡 Heimdall Review Status
|
| for (pos, val) in snapshot { | ||
| if let Some(p) = pos { | ||
| sent_positions.insert(p); | ||
| } | ||
| self.send_replay_message(val).await?; | ||
| } |
There was a problem hiding this comment.
Phase 1 inserts positions into sent_positions after sending, but the dedup set is only consulted in phase 2. If send_replay_message fails partway through phase 1 (e.g. timeout), the error propagates and the client is disconnected — so partial sends in phase 1 are fine.
However, sentinel entries (pos == None) are sent to the client but cannot be deduplicated in phase 2 since they have no position key. If the same sentinel payload is also in the broadcast channel, the client will receive it twice. This may be acceptable if sentinels are rare/non-existent in practice today, but it's worth a comment documenting the behavior.
443f2c6 to
22db646
Compare
22db646 to
6e3876c
Compare
| if let Some(pos) = parse_flashblock_position(text.as_str()) { | ||
| self.last_position = Some(pos); | ||
| } | ||
|
|
||
| (self.handler)(text.to_string()); |
There was a problem hiding this comment.
last_position is updated before calling (self.handler)(...). If the handler panics (unwinding), the position is committed but the message was never processed downstream. On reconnect, the subscriber would skip this message.
Consider updating last_position after the handler returns, or document that the handler is assumed to be infallible for position-tracking purposes.
| self.ws_pub.publish(&fb_payload).map_err(PayloadBuilderError::other)?; | ||
| let flashblock_byte_size = self | ||
| .ws_pub | ||
| .publish(&fb_payload, ctx.block_number(), 0) |
There was a problem hiding this comment.
The fallback block always passes flashblock_index = 0 here. If a fallback block is published after flashblock index > 0 was already published for the same block number, reconnecting clients that resume from (block_number, 0) would skip this fallback since entries_after uses strictly-greater-than comparison and (N, 0) > (N, 0) is false.
Is the intent that the fallback block replaces/supersedes all prior flashblocks for the same block? If so, this may need a different position or the ring buffer lookup semantics would need to include the cutoff position itself. If fallback blocks are only published when no prior flashblocks exist for that block number, this is fine — but worth a comment clarifying the invariant.
| // Ignore SendError — there may be zero receivers when no clients | ||
| // are connected. The entry is already stored in the ring buffer | ||
| // for replay on future connections. | ||
| let _ = self.pipe.send((position, utf8_bytes)); |
There was a problem hiding this comment.
The old publish() returned an error when there were no receivers (io::ErrorKind::ConnectionAborted). The new code silently discards the SendError here. This is a deliberate behavioral change (the ring buffer preserves the message for future replay), but it means callers can no longer distinguish "published to live subscribers" from "stored in ring buffer only."
If any caller relied on the error to detect zero-subscriber scenarios, this is a silent regression. Worth documenting in the method's doc comment that a successful return does not guarantee any live subscriber received the message.
22da825 to
e9c9bc9
Compare
| payload: &impl Serialize, | ||
| block_number: u64, | ||
| flashblock_index: u64, | ||
| ) -> Result<usize, serde_json::Error> { |
There was a problem hiding this comment.
This is a breaking change to the public API: publish() previously returned io::Result<usize> and now returns Result<usize, serde_json::Error>. The two current call sites in payload.rs handle this fine (one uses .map_err(PayloadBuilderError::other), the other uses .wrap_err(...)), but any out-of-tree consumers matching on io::Error will break at compile time.
Also, the old behavior returned io::Error(ConnectionAborted) when no subscribers were connected; now SendError is silently discarded (line 115). This is intentional for the ring-buffer-based replay model, but worth documenting that Ok(size) no longer implies any live subscriber received the message.
| if let Some(p) = pos { | ||
| sent_positions.insert(p); | ||
| } | ||
| self.send_replay_message(val).await?; |
There was a problem hiding this comment.
Phase 1 inserts positions into sent_positions before calling send_replay_message, but the insert happens regardless of send success since ? would propagate the error and disconnect. This ordering is fine.
However, sentinel entries (pos == None) are sent to the client but cannot be tracked in sent_positions. Today the publisher always passes Some(position) to ring_buffer.push(), so sentinels never exist in practice. But the RingBuffer API advertises sentinel support as a core feature (README, doc comments, tests). If sentinels are never used, consider simplifying the ring buffer to require positions unconditionally (push(position: I, value: V)) — this removes the Option wrapping, eliminates the sentinel code path in replay, and makes the API harder to misuse.
| block_number: u64, | ||
| } | ||
|
|
||
| let extract: PositionExtract = serde_json::from_str(data).ok()?; |
There was a problem hiding this comment.
parse_flashblock_position fully deserializes the top-level index and nested metadata.block_number on every incoming text message. serde_json::from_str must validate the entire JSON even though we only need two fields — for a 500 KB flashblock payload this is non-trivial work.
Consider using serde_json::from_str with #[serde(rename_all = ...)] and #[serde(default)] on a purpose-built struct (which you already do), but also consider serde_json::from_slice with RawValue to short-circuit parsing, or simply string-search for the two numeric fields before falling back to full parse. This is on the hot path for every received flashblock.
Also: the doc comment references FlashblocksPayloadV1 but this is in a different crate (base-alloy-flashblocks). If the wire format changes (e.g., index is renamed or metadata becomes a typed struct with different field names), this parser will silently stop tracking positions, degrading reconnection to full re-sync without any warning. Consider adding an integration test or at minimum a debug_assert! / warning log when position extraction fails for messages that "look like" flashblock payloads.
|
|
||
| let cancel = cancel.clone(); | ||
| let receiver_clone = receiver.resubscribe(); | ||
| let receiver = receiver.resubscribe(); |
There was a problem hiding this comment.
receiver.resubscribe() is called here, before tokio::spawn. The new receiver starts accumulating messages from this instant. The spawned task then performs the handshake (up to 10s), during which messages continue to arrive on this receiver.
This is correct for the two-phase replay protocol — those messages will be deduplicated in phase 2. But under high publish rates with a slow handshake, the broadcast receiver could lag and lose messages before BroadcastLoop::run() even starts. The receiver's buffer is bounded by channel_capacity (default 100). If >100 flashblocks are published during a 10s handshake, the receiver lags and phase 2 will return ReplayError::Lagged, disconnecting the client before it receives any data.
Consider creating the receiver inside the spawned task, after the handshake completes. The ring buffer snapshot in phase 1 already covers messages published before the receiver is created, so no messages would be lost.
| payload = self.blocks.recv() => match payload { | ||
| Ok(payload) => { | ||
| payload = self.receiver.recv() => match payload { | ||
| Ok((_position, data)) => { |
There was a problem hiding this comment.
The _position is discarded in the live broadcast path. If the subscriber lags and receives RecvError::Lagged (line 103), it logs a warning and continues — but the client has a gap in its stream with no way to detect it application-level.
Consider sending the position as part of the WebSocket message (or in a wrapper) so downstream clients can detect gaps independently, rather than relying solely on the server-side lag counter. Alternatively, disconnect lagged clients so they can reconnect with replay, which would be more consistent with the replay-on-reconnect model.
e9c9bc9 to
80d91a1
Compare
Add a generic `base-ring-buffer` crate and integrate it into the flashblock publisher so reconnecting clients can resume from a given position without missing messages. Publisher changes: - Store each published flashblock in a bounded ring buffer keyed by (block_number, flashblock_index) - Accept resume position via query params on the WebSocket upgrade request (?block_number=N&flashblock_index=M) - Two-phase replay on connect: snapshot ring buffer then drain accumulated broadcast messages, deduplicating by position - Add handshake timeout (10s) around the WebSocket upgrade - Add criterion benchmarks for publish throughput Subscriber changes (websocket-proxy): - Track last received (block_number, flashblock_index) per connection - Append resume query params on reconnect Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
80d91a1 to
657f6d9
Compare
Review SummaryThe ring buffer replay mechanism is well-designed overall. The two-phase replay protocol (ring buffer snapshot + broadcast receiver drain) correctly handles the gap between historical and live data, and the dedup logic via Note on prior inline comments: Many of the 29 existing inline comments from previous review runs are stale — they reference issues that have since been fixed in the current code:
Remaining items from prior comments that are still valid observations (not blocking):
No new blocking issues found. The code is well-structured with good test coverage for the replay, parsing, and URI construction paths. |
Add a generic
base-ring-buffercrate and integrate it into the flashblock publisher so reconnecting clients can resume from a given position without missing messages.Publisher changes:
Subscriber changes (websocket-proxy):