Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7f3a85e

Browse filesBrowse files
jasnelladuh95
authored andcommitted
quic: fix a handful of bugs and missing functionality
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6 PR-URL: #62387 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent 45c1ebd commit 7f3a85e
Copy full SHA for 7f3a85e

31 files changed

+597-282Lines changed: 597 additions & 282 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎lib/internal/blob.js‎

Copy file name to clipboardExpand all lines: lib/internal/blob.js
+63-50Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -443,63 +443,76 @@ function createBlobReaderStream(reader) {
443443
// There really should only be one read at a time so using an
444444
// array here is purely defensive.
445445
this.pendingPulls = [];
446+
// Register a wakeup callback that the C++ side can invoke
447+
// when new data is available after a STATUS_BLOCK.
448+
reader.setWakeup(() => {
449+
if (this.pendingPulls.length > 0) {
450+
this.readNext(c);
451+
}
452+
});
446453
},
447454
pull(c) {
448455
const { promise, resolve, reject } = PromiseWithResolvers();
449456
this.pendingPulls.push({ resolve, reject });
450-
const readNext = () => {
451-
reader.pull((status, buffer) => {
452-
// If pendingPulls is empty here, the stream had to have
453-
// been canceled, and we don't really care about the result.
454-
// We can simply exit.
455-
if (this.pendingPulls.length === 0) {
456-
return;
457-
}
458-
if (status === 0) {
459-
// EOS
460-
c.close();
461-
// This is to signal the end for byob readers
462-
// see https://streams.spec.whatwg.org/#example-rbs-pull
463-
c.byobRequest?.respond(0);
464-
const pending = this.pendingPulls.shift();
465-
pending.resolve();
466-
return;
467-
} else if (status < 0) {
468-
// The read could fail for many different reasons when reading
469-
// from a non-memory resident blob part (e.g. file-backed blob).
470-
// The error details the system error code.
471-
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
472-
const pending = this.pendingPulls.shift();
473-
c.error(error);
474-
pending.reject(error);
457+
this.readNext(c);
458+
return promise;
459+
},
460+
readNext(c) {
461+
reader.pull((status, buffer) => {
462+
// If pendingPulls is empty here, the stream had to have
463+
// been canceled, and we don't really care about the result.
464+
// We can simply exit.
465+
if (this.pendingPulls.length === 0) {
466+
return;
467+
}
468+
if (status === 0) {
469+
// EOS
470+
c.close();
471+
// This is to signal the end for byob readers
472+
// see https://streams.spec.whatwg.org/#example-rbs-pull
473+
c.byobRequest?.respond(0);
474+
const pending = this.pendingPulls.shift();
475+
pending.resolve();
476+
return;
477+
} else if (status < 0) {
478+
// The read could fail for many different reasons when reading
479+
// from a non-memory resident blob part (e.g. file-backed blob).
480+
// The error details the system error code.
481+
const error =
482+
lazyDOMException('The blob could not be read',
483+
'NotReadableError');
484+
const pending = this.pendingPulls.shift();
485+
c.error(error);
486+
pending.reject(error);
487+
return;
488+
} else if (status === 2) {
489+
// STATUS_BLOCK: No data available yet. The wakeup callback
490+
// registered in start() will re-invoke readNext when data
491+
// arrives.
492+
return;
493+
}
494+
// ReadableByteStreamController.enqueue errors if we submit a
495+
// 0-length buffer. We need to check for that here.
496+
if (buffer !== undefined && buffer.byteLength !== 0) {
497+
c.enqueue(new Uint8Array(buffer));
498+
}
499+
// We keep reading until we either reach EOS, some error, or
500+
// we hit the flow rate of the stream (c.desiredSize).
501+
// We use setImmediate here because we have to allow the event
502+
// loop to turn in order to process any pending i/o. Using
503+
// queueMicrotask won't allow the event loop to turn.
504+
setImmediate(() => {
505+
if (c.desiredSize < 0) {
506+
// A manual backpressure check.
507+
if (this.pendingPulls.length !== 0) {
508+
const pending = this.pendingPulls.shift();
509+
pending.resolve();
510+
}
475511
return;
476512
}
477-
// ReadableByteStreamController.enqueue errors if we submit a 0-length
478-
// buffer. We need to check for that here.
479-
if (buffer !== undefined && buffer.byteLength !== 0) {
480-
c.enqueue(new Uint8Array(buffer));
481-
}
482-
// We keep reading until we either reach EOS, some error, or we
483-
// hit the flow rate of the stream (c.desiredSize).
484-
// We use set immediate here because we have to allow the event
485-
// loop to turn in order to process any pending i/o. Using
486-
// queueMicrotask won't allow the event loop to turn.
487-
setImmediate(() => {
488-
if (c.desiredSize < 0) {
489-
// A manual backpressure check.
490-
if (this.pendingPulls.length !== 0) {
491-
// A case of waiting pull finished (= not yet canceled)
492-
const pending = this.pendingPulls.shift();
493-
pending.resolve();
494-
}
495-
return;
496-
}
497-
readNext();
498-
});
513+
this.readNext(c);
499514
});
500-
};
501-
readNext();
502-
return promise;
515+
});
503516
},
504517
cancel(reason) {
505518
// Reject any currently pending pulls here.
Collapse file

‎lib/internal/quic/quic.js‎

Copy file name to clipboardExpand all lines: lib/internal/quic/quic.js
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,18 @@ setCallbacks({
475475
this[kOwner][kSessionTicket](ticket);
476476
},
477477

478+
/**
479+
* Called when the client receives a NEW_TOKEN frame from the server.
480+
* The token can be used for future connections to the same server
481+
* address to skip address validation.
482+
* @param {Buffer} token The opaque token data
483+
* @param {SocketAddress} address The remote server address
484+
*/
485+
onSessionNewToken(token, address) {
486+
debug('session new token callback', this[kOwner]);
487+
// TODO(@jasnell): Emit to JS for storage and future reconnection use
488+
},
489+
478490
/**
479491
* Called when the session receives a session version negotiation request
480492
* @param {*} version
Collapse file

‎lib/internal/quic/state.js‎

Copy file name to clipboardExpand all lines: lib/internal/quic/state.js
-9Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ const {
7777
IDX_STATE_STREAM_FIN_RECEIVED,
7878
IDX_STATE_STREAM_READ_ENDED,
7979
IDX_STATE_STREAM_WRITE_ENDED,
80-
IDX_STATE_STREAM_PAUSED,
8180
IDX_STATE_STREAM_RESET,
8281
IDX_STATE_STREAM_HAS_OUTBOUND,
8382
IDX_STATE_STREAM_HAS_READER,
@@ -113,7 +112,6 @@ assert(IDX_STATE_STREAM_FIN_SENT !== undefined);
113112
assert(IDX_STATE_STREAM_FIN_RECEIVED !== undefined);
114113
assert(IDX_STATE_STREAM_READ_ENDED !== undefined);
115114
assert(IDX_STATE_STREAM_WRITE_ENDED !== undefined);
116-
assert(IDX_STATE_STREAM_PAUSED !== undefined);
117115
assert(IDX_STATE_STREAM_RESET !== undefined);
118116
assert(IDX_STATE_STREAM_HAS_OUTBOUND !== undefined);
119117
assert(IDX_STATE_STREAM_HAS_READER !== undefined);
@@ -475,11 +473,6 @@ class QuicStreamState {
475473
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_WRITE_ENDED);
476474
}
477475

478-
/** @type {boolean} */
479-
get paused() {
480-
if (this.#handle.byteLength === 0) return undefined;
481-
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_PAUSED);
482-
}
483476

484477
/** @type {boolean} */
485478
get reset() {
@@ -561,7 +554,6 @@ class QuicStreamState {
561554
finReceived: this.finReceived,
562555
readEnded: this.readEnded,
563556
writeEnded: this.writeEnded,
564-
paused: this.paused,
565557
reset: this.reset,
566558
hasOutbound: this.hasOutbound,
567559
hasReader: this.hasReader,
@@ -590,7 +582,6 @@ class QuicStreamState {
590582
finReceived: this.finReceived,
591583
readEnded: this.readEnded,
592584
writeEnded: this.writeEnded,
593-
paused: this.paused,
594585
reset: this.reset,
595586
hasOutbound: this.hasOutbound,
596587
hasReader: this.hasReader,
Collapse file

‎src/node_blob.cc‎

Copy file name to clipboardExpand all lines: src/node_blob.cc
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
320320
Blob::Reader::kInternalFieldCount);
321321
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
322322
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
323+
SetProtoMethod(env->isolate(), tmpl, "setWakeup", SetWakeup);
323324
env->set_blob_reader_constructor_template(tmpl);
324325
}
325326
return tmpl;
@@ -410,6 +411,21 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
410411
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
411412
}
412413

414+
void Blob::Reader::SetWakeup(
415+
const FunctionCallbackInfo<Value>& args) {
416+
Blob::Reader* reader;
417+
ASSIGN_OR_RETURN_UNWRAP(&reader, args.This());
418+
CHECK(args[0]->IsFunction());
419+
reader->wakeup_.Reset(args.GetIsolate(), args[0].As<Function>());
420+
}
421+
422+
void Blob::Reader::NotifyPull() {
423+
if (wakeup_.IsEmpty() || !env()->can_call_into_js()) return;
424+
HandleScope handle_scope(env()->isolate());
425+
Local<Function> fn = wakeup_.Get(env()->isolate());
426+
MakeCallback(fn, 0, nullptr);
427+
}
428+
413429
BaseObjectPtr<BaseObject>
414430
Blob::BlobTransferData::Deserialize(
415431
Environment* env,
@@ -590,6 +606,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
590606
registry->Register(Blob::GetDataObject);
591607
registry->Register(Blob::RevokeObjectURL);
592608
registry->Register(Blob::Reader::Pull);
609+
registry->Register(Blob::Reader::SetWakeup);
593610
registry->Register(Concat);
594611
registry->Register(BlobFromFilePath);
595612
}
Collapse file

‎src/node_blob.h‎

Copy file name to clipboardExpand all lines: src/node_blob.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class Blob : public BaseObject {
8282
static BaseObjectPtr<Reader> Create(Environment* env,
8383
BaseObjectPtr<Blob> blob);
8484
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
85+
static void SetWakeup(const v8::FunctionCallbackInfo<v8::Value>& args);
86+
void NotifyPull();
8587

8688
explicit Reader(Environment* env,
8789
v8::Local<v8::Object> obj,
@@ -95,6 +97,7 @@ class Blob : public BaseObject {
9597
std::shared_ptr<DataQueue::Reader> inner_;
9698
BaseObjectPtr<Blob> strong_ptr_;
9799
bool eos_ = false;
100+
v8::Global<v8::Function> wakeup_;
98101
};
99102

100103
BaseObject::TransferMode GetTransferMode() const override;
Collapse file

‎src/quic/application.cc‎

Copy file name to clipboardExpand all lines: src/quic/application.cc
+6-19Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ ssize_t Session::Application::WriteVStream(PathStorage* path,
452452
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
453453
return ngtcp2_conn_writev_stream(*session_,
454454
&path->path,
455+
// TODO(@jasnell): ECN blocked on libuv
455456
nullptr,
456457
dest,
457458
max_packet_size,
@@ -511,10 +512,13 @@ class DefaultApplication final : public Session::Application {
511512
stream_data->count = 0;
512513
stream_data->fin = 0;
513514
stream_data->stream.reset();
514-
stream_data->remaining = 0;
515515
Debug(&session(), "Default application getting stream data");
516516
DCHECK_NOT_NULL(stream_data);
517517
// If the queue is empty, there aren't any streams with data yet
518+
519+
// If the connection-level flow control window is exhausted,
520+
// there is no point in pulling stream data.
521+
if (!session().max_data_left()) return 0;
518522
if (stream_queue_.IsEmpty()) return 0;
519523

520524
const auto get_length = [](auto vec, size_t count) {
@@ -554,9 +558,7 @@ class DefaultApplication final : public Session::Application {
554558

555559
if (count > 0) {
556560
stream->Schedule(&stream_queue_);
557-
stream_data->remaining = get_length(data, count);
558561
} else {
559-
stream_data->remaining = 0;
560562
}
561563

562564
// Not calling done here because we defer committing
@@ -581,15 +583,6 @@ class DefaultApplication final : public Session::Application {
581583

582584
void ResumeStream(int64_t id) override { ScheduleStream(id); }
583585

584-
bool ShouldSetFin(const StreamData& stream_data) override {
585-
auto const is_empty = [](const ngtcp2_vec* vec, size_t cnt) {
586-
size_t i = 0;
587-
for (size_t n = 0; n < cnt; n++) i += vec[n].len;
588-
return i > 0;
589-
};
590-
591-
return stream_data.stream && is_empty(stream_data, stream_data.count);
592-
}
593586

594587
void BlockStream(int64_t id) override {
595588
if (auto stream = session().FindStream(id)) [[likely]] {
@@ -598,10 +591,9 @@ class DefaultApplication final : public Session::Application {
598591
}
599592

600593
bool StreamCommit(StreamData* stream_data, size_t datalen) override {
601-
if (datalen == 0) return true;
602594
DCHECK_NOT_NULL(stream_data);
603595
CHECK(stream_data->stream);
604-
stream_data->stream->Commit(datalen);
596+
stream_data->stream->Commit(datalen, stream_data->fin);
605597
return true;
606598
}
607599

@@ -616,11 +608,6 @@ class DefaultApplication final : public Session::Application {
616608
}
617609
}
618610

619-
void UnscheduleStream(int64_t id) {
620-
if (auto stream = session().FindStream(id)) [[likely]] {
621-
stream->Unschedule();
622-
}
623-
}
624611

625612
Stream::Queue stream_queue_;
626613
};
Collapse file

‎src/quic/application.h‎

Copy file name to clipboardExpand all lines: src/quic/application.h
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ class Session::Application : public MemoryRetainer {
120120

121121
virtual int GetStreamData(StreamData* data) = 0;
122122
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
123-
virtual bool ShouldSetFin(const StreamData& data) = 0;
124123

125124
inline Environment* env() const { return session().env(); }
126125
inline Session& session() {
@@ -148,14 +147,18 @@ class Session::Application : public MemoryRetainer {
148147
struct Session::Application::StreamData final {
149148
// The actual number of vectors in the struct, up to kMaxVectorCount.
150149
size_t count = 0;
151-
size_t remaining = 0;
152150
// The stream identifier. If this is a negative value then no stream is
153151
// identified.
154152
int64_t id = -1;
155153
int fin = 0;
156154
ngtcp2_vec data[kMaxVectorCount]{};
157155
BaseObjectPtr<Stream> stream;
158156

157+
static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) &&
158+
alignof(ngtcp2_vec) == alignof(nghttp3_vec) &&
159+
offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) &&
160+
offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len),
161+
"ngtcp2_vec and nghttp3_vec must have identical layout");
159162
inline operator nghttp3_vec*() {
160163
return reinterpret_cast<nghttp3_vec*>(data);
161164
}
Collapse file

‎src/quic/bindingdata.h‎

Copy file name to clipboardExpand all lines: src/quic/bindingdata.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class Packet;
4343
V(session_datagram_status, SessionDatagramStatus) \
4444
V(session_handshake, SessionHandshake) \
4545
V(session_new, SessionNew) \
46+
V(session_new_token, SessionNewToken) \
4647
V(session_path_validation, SessionPathValidation) \
4748
V(session_ticket, SessionTicket) \
4849
V(session_version_negotiation, SessionVersionNegotiation) \
@@ -70,6 +71,7 @@ class Packet;
7071
V(cubic, "cubic") \
7172
V(disable_stateless_reset, "disableStatelessReset") \
7273
V(enable_connect_protocol, "enableConnectProtocol") \
74+
V(enable_early_data, "enableEarlyData") \
7375
V(enable_datagrams, "enableDatagrams") \
7476
V(enable_tls_trace, "tlsTrace") \
7577
V(endpoint, "Endpoint") \
@@ -121,6 +123,7 @@ class Packet;
121123
V(stream, "Stream") \
122124
V(success, "success") \
123125
V(tls_options, "tls") \
126+
V(token, "token") \
124127
V(token_expiration, "tokenExpiration") \
125128
V(token_secret, "tokenSecret") \
126129
V(transport_params, "transportParams") \
Collapse file

‎src/quic/cid.cc‎

Copy file name to clipboardExpand all lines: src/quic/cid.cc
+6-2Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,14 @@ const CID CID::kInvalid{};
8585
// CID::Hash
8686

8787
size_t CID::Hash::operator()(const CID& cid) const {
88+
// Uses the Boost hash_combine strategy: XOR each byte with the golden
89+
// ratio constant 0x9e3779b9 (derived from the fractional part of the
90+
// golden ratio, (sqrt(5)-1)/2 * 2^32) plus bit-shifted accumulator
91+
// state. This provides good avalanche properties for short byte
92+
// sequences like connection IDs (1-20 bytes).
8893
size_t hash = 0;
8994
for (size_t n = 0; n < cid.length(); n++) {
90-
hash ^= std::hash<uint8_t>{}(cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) +
91-
(hash >> 2));
95+
hash ^= cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) + (hash >> 2);
9296
}
9397
return hash;
9498
}
Collapse file

‎src/quic/data.cc‎

Copy file name to clipboardExpand all lines: src/quic/data.cc
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ using v8::Undefined;
2828
using v8::Value;
2929

3030
namespace quic {
31-
int DebugIndentScope::indent_ = 0;
31+
thread_local int DebugIndentScope::indent_ = 0;
3232

3333
Path::Path(const SocketAddress& local, const SocketAddress& remote) {
3434
ngtcp2_addr_init(&this->local, local.data(), local.length());

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.