Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 57186e5

Browse filesBrowse files
committed
quic: fix a handful of bugs and missing functionality
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Opus 4.6 PR-URL: #62387 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent e9b5214 commit 57186e5
Copy full SHA for 57186e5

31 files changed

+597-282Lines changed: 597 additions & 282 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎lib/internal/blob.js‎

Copy file name to clipboardExpand all lines: lib/internal/blob.js
+63-50Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -439,63 +439,76 @@ function createBlobReaderStream(reader) {
439439
// There really should only be one read at a time so using an
440440
// array here is purely defensive.
441441
this.pendingPulls = [];
442+
// Register a wakeup callback that the C++ side can invoke
443+
// when new data is available after a STATUS_BLOCK.
444+
reader.setWakeup(() => {
445+
if (this.pendingPulls.length > 0) {
446+
this.readNext(c);
447+
}
448+
});
442449
},
443450
pull(c) {
444451
const { promise, resolve, reject } = PromiseWithResolvers();
445452
this.pendingPulls.push({ resolve, reject });
446-
const readNext = () => {
447-
reader.pull((status, buffer) => {
448-
// If pendingPulls is empty here, the stream had to have
449-
// been canceled, and we don't really care about the result.
450-
// We can simply exit.
451-
if (this.pendingPulls.length === 0) {
452-
return;
453-
}
454-
if (status === 0) {
455-
// EOS
456-
c.close();
457-
// This is to signal the end for byob readers
458-
// see https://streams.spec.whatwg.org/#example-rbs-pull
459-
c.byobRequest?.respond(0);
460-
const pending = this.pendingPulls.shift();
461-
pending.resolve();
462-
return;
463-
} else if (status < 0) {
464-
// The read could fail for many different reasons when reading
465-
// from a non-memory resident blob part (e.g. file-backed blob).
466-
// The error details the system error code.
467-
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
468-
const pending = this.pendingPulls.shift();
469-
c.error(error);
470-
pending.reject(error);
453+
this.readNext(c);
454+
return promise;
455+
},
456+
readNext(c) {
457+
reader.pull((status, buffer) => {
458+
// If pendingPulls is empty here, the stream had to have
459+
// been canceled, and we don't really care about the result.
460+
// We can simply exit.
461+
if (this.pendingPulls.length === 0) {
462+
return;
463+
}
464+
if (status === 0) {
465+
// EOS
466+
c.close();
467+
// This is to signal the end for byob readers
468+
// see https://streams.spec.whatwg.org/#example-rbs-pull
469+
c.byobRequest?.respond(0);
470+
const pending = this.pendingPulls.shift();
471+
pending.resolve();
472+
return;
473+
} else if (status < 0) {
474+
// The read could fail for many different reasons when reading
475+
// from a non-memory resident blob part (e.g. file-backed blob).
476+
// The error details the system error code.
477+
const error =
478+
lazyDOMException('The blob could not be read',
479+
'NotReadableError');
480+
const pending = this.pendingPulls.shift();
481+
c.error(error);
482+
pending.reject(error);
483+
return;
484+
} else if (status === 2) {
485+
// STATUS_BLOCK: No data available yet. The wakeup callback
486+
// registered in start() will re-invoke readNext when data
487+
// arrives.
488+
return;
489+
}
490+
// ReadableByteStreamController.enqueue errors if we submit a
491+
// 0-length buffer. We need to check for that here.
492+
if (buffer !== undefined && buffer.byteLength !== 0) {
493+
c.enqueue(new Uint8Array(buffer));
494+
}
495+
// We keep reading until we either reach EOS, some error, or
496+
// we hit the flow rate of the stream (c.desiredSize).
497+
// We use setImmediate here because we have to allow the event
498+
// loop to turn in order to process any pending i/o. Using
499+
// queueMicrotask won't allow the event loop to turn.
500+
setImmediate(() => {
501+
if (c.desiredSize < 0) {
502+
// A manual backpressure check.
503+
if (this.pendingPulls.length !== 0) {
504+
const pending = this.pendingPulls.shift();
505+
pending.resolve();
506+
}
471507
return;
472508
}
473-
// ReadableByteStreamController.enqueue errors if we submit a 0-length
474-
// buffer. We need to check for that here.
475-
if (buffer !== undefined && buffer.byteLength !== 0) {
476-
c.enqueue(new Uint8Array(buffer));
477-
}
478-
// We keep reading until we either reach EOS, some error, or we
479-
// hit the flow rate of the stream (c.desiredSize).
480-
// We use set immediate here because we have to allow the event
481-
// loop to turn in order to process any pending i/o. Using
482-
// queueMicrotask won't allow the event loop to turn.
483-
setImmediate(() => {
484-
if (c.desiredSize < 0) {
485-
// A manual backpressure check.
486-
if (this.pendingPulls.length !== 0) {
487-
// A case of waiting pull finished (= not yet canceled)
488-
const pending = this.pendingPulls.shift();
489-
pending.resolve();
490-
}
491-
return;
492-
}
493-
readNext();
494-
});
509+
this.readNext(c);
495510
});
496-
};
497-
readNext();
498-
return promise;
511+
});
499512
},
500513
cancel(reason) {
501514
// Reject any currently pending pulls here.
Collapse file

‎lib/internal/quic/quic.js‎

Copy file name to clipboardExpand all lines: lib/internal/quic/quic.js
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,18 @@ setCallbacks({
477477
this[kOwner][kSessionTicket](ticket);
478478
},
479479

480+
/**
481+
* Called when the client receives a NEW_TOKEN frame from the server.
482+
* The token can be used for future connections to the same server
483+
* address to skip address validation.
484+
* @param {Buffer} token The opaque token data
485+
* @param {SocketAddress} address The remote server address
486+
*/
487+
onSessionNewToken(token, address) {
488+
debug('session new token callback', this[kOwner]);
489+
// TODO(@jasnell): Emit to JS for storage and future reconnection use
490+
},
491+
480492
/**
481493
* Called when the session receives a session version negotiation request
482494
* @param {*} version
Collapse file

‎lib/internal/quic/state.js‎

Copy file name to clipboardExpand all lines: lib/internal/quic/state.js
-9Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ const {
7777
IDX_STATE_STREAM_FIN_RECEIVED,
7878
IDX_STATE_STREAM_READ_ENDED,
7979
IDX_STATE_STREAM_WRITE_ENDED,
80-
IDX_STATE_STREAM_PAUSED,
8180
IDX_STATE_STREAM_RESET,
8281
IDX_STATE_STREAM_HAS_OUTBOUND,
8382
IDX_STATE_STREAM_HAS_READER,
@@ -113,7 +112,6 @@ assert(IDX_STATE_STREAM_FIN_SENT !== undefined);
113112
assert(IDX_STATE_STREAM_FIN_RECEIVED !== undefined);
114113
assert(IDX_STATE_STREAM_READ_ENDED !== undefined);
115114
assert(IDX_STATE_STREAM_WRITE_ENDED !== undefined);
116-
assert(IDX_STATE_STREAM_PAUSED !== undefined);
117115
assert(IDX_STATE_STREAM_RESET !== undefined);
118116
assert(IDX_STATE_STREAM_HAS_OUTBOUND !== undefined);
119117
assert(IDX_STATE_STREAM_HAS_READER !== undefined);
@@ -475,11 +473,6 @@ class QuicStreamState {
475473
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_WRITE_ENDED);
476474
}
477475

478-
/** @type {boolean} */
479-
get paused() {
480-
if (this.#handle.byteLength === 0) return undefined;
481-
return !!DataViewPrototypeGetUint8(this.#handle, IDX_STATE_STREAM_PAUSED);
482-
}
483476

484477
/** @type {boolean} */
485478
get reset() {
@@ -561,7 +554,6 @@ class QuicStreamState {
561554
finReceived: this.finReceived,
562555
readEnded: this.readEnded,
563556
writeEnded: this.writeEnded,
564-
paused: this.paused,
565557
reset: this.reset,
566558
hasOutbound: this.hasOutbound,
567559
hasReader: this.hasReader,
@@ -590,7 +582,6 @@ class QuicStreamState {
590582
finReceived: this.finReceived,
591583
readEnded: this.readEnded,
592584
writeEnded: this.writeEnded,
593-
paused: this.paused,
594585
reset: this.reset,
595586
hasOutbound: this.hasOutbound,
596587
hasReader: this.hasReader,
Collapse file

‎src/node_blob.cc‎

Copy file name to clipboardExpand all lines: src/node_blob.cc
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
320320
Blob::Reader::kInternalFieldCount);
321321
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
322322
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
323+
SetProtoMethod(env->isolate(), tmpl, "setWakeup", SetWakeup);
323324
env->set_blob_reader_constructor_template(tmpl);
324325
}
325326
return tmpl;
@@ -410,6 +411,21 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
410411
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
411412
}
412413

414+
void Blob::Reader::SetWakeup(
415+
const FunctionCallbackInfo<Value>& args) {
416+
Blob::Reader* reader;
417+
ASSIGN_OR_RETURN_UNWRAP(&reader, args.This());
418+
CHECK(args[0]->IsFunction());
419+
reader->wakeup_.Reset(args.GetIsolate(), args[0].As<Function>());
420+
}
421+
422+
void Blob::Reader::NotifyPull() {
423+
if (wakeup_.IsEmpty() || !env()->can_call_into_js()) return;
424+
HandleScope handle_scope(env()->isolate());
425+
Local<Function> fn = wakeup_.Get(env()->isolate());
426+
MakeCallback(fn, 0, nullptr);
427+
}
428+
413429
BaseObjectPtr<BaseObject>
414430
Blob::BlobTransferData::Deserialize(
415431
Environment* env,
@@ -590,6 +606,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
590606
registry->Register(Blob::GetDataObject);
591607
registry->Register(Blob::RevokeObjectURL);
592608
registry->Register(Blob::Reader::Pull);
609+
registry->Register(Blob::Reader::SetWakeup);
593610
registry->Register(Concat);
594611
registry->Register(BlobFromFilePath);
595612
}
Collapse file

‎src/node_blob.h‎

Copy file name to clipboardExpand all lines: src/node_blob.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class Blob : public BaseObject {
8282
static BaseObjectPtr<Reader> Create(Environment* env,
8383
BaseObjectPtr<Blob> blob);
8484
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
85+
static void SetWakeup(const v8::FunctionCallbackInfo<v8::Value>& args);
86+
void NotifyPull();
8587

8688
explicit Reader(Environment* env,
8789
v8::Local<v8::Object> obj,
@@ -95,6 +97,7 @@ class Blob : public BaseObject {
9597
std::shared_ptr<DataQueue::Reader> inner_;
9698
BaseObjectPtr<Blob> strong_ptr_;
9799
bool eos_ = false;
100+
v8::Global<v8::Function> wakeup_;
98101
};
99102

100103
BaseObject::TransferMode GetTransferMode() const override;
Collapse file

‎src/quic/application.cc‎

Copy file name to clipboardExpand all lines: src/quic/application.cc
+6-19Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ ssize_t Session::Application::WriteVStream(PathStorage* path,
452452
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
453453
return ngtcp2_conn_writev_stream(*session_,
454454
&path->path,
455+
// TODO(@jasnell): ECN blocked on libuv
455456
nullptr,
456457
dest,
457458
max_packet_size,
@@ -511,10 +512,13 @@ class DefaultApplication final : public Session::Application {
511512
stream_data->count = 0;
512513
stream_data->fin = 0;
513514
stream_data->stream.reset();
514-
stream_data->remaining = 0;
515515
Debug(&session(), "Default application getting stream data");
516516
DCHECK_NOT_NULL(stream_data);
517517
// If the queue is empty, there aren't any streams with data yet
518+
519+
// If the connection-level flow control window is exhausted,
520+
// there is no point in pulling stream data.
521+
if (!session().max_data_left()) return 0;
518522
if (stream_queue_.IsEmpty()) return 0;
519523

520524
const auto get_length = [](auto vec, size_t count) {
@@ -554,9 +558,7 @@ class DefaultApplication final : public Session::Application {
554558

555559
if (count > 0) {
556560
stream->Schedule(&stream_queue_);
557-
stream_data->remaining = get_length(data, count);
558561
} else {
559-
stream_data->remaining = 0;
560562
}
561563

562564
// Not calling done here because we defer committing
@@ -581,15 +583,6 @@ class DefaultApplication final : public Session::Application {
581583

582584
void ResumeStream(int64_t id) override { ScheduleStream(id); }
583585

584-
bool ShouldSetFin(const StreamData& stream_data) override {
585-
auto const is_empty = [](const ngtcp2_vec* vec, size_t cnt) {
586-
size_t i = 0;
587-
for (size_t n = 0; n < cnt; n++) i += vec[n].len;
588-
return i > 0;
589-
};
590-
591-
return stream_data.stream && is_empty(stream_data, stream_data.count);
592-
}
593586

594587
void BlockStream(int64_t id) override {
595588
if (auto stream = session().FindStream(id)) [[likely]] {
@@ -598,10 +591,9 @@ class DefaultApplication final : public Session::Application {
598591
}
599592

600593
bool StreamCommit(StreamData* stream_data, size_t datalen) override {
601-
if (datalen == 0) return true;
602594
DCHECK_NOT_NULL(stream_data);
603595
CHECK(stream_data->stream);
604-
stream_data->stream->Commit(datalen);
596+
stream_data->stream->Commit(datalen, stream_data->fin);
605597
return true;
606598
}
607599

@@ -616,11 +608,6 @@ class DefaultApplication final : public Session::Application {
616608
}
617609
}
618610

619-
void UnscheduleStream(int64_t id) {
620-
if (auto stream = session().FindStream(id)) [[likely]] {
621-
stream->Unschedule();
622-
}
623-
}
624611

625612
Stream::Queue stream_queue_;
626613
};
Collapse file

‎src/quic/application.h‎

Copy file name to clipboardExpand all lines: src/quic/application.h
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ class Session::Application : public MemoryRetainer {
120120

121121
virtual int GetStreamData(StreamData* data) = 0;
122122
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
123-
virtual bool ShouldSetFin(const StreamData& data) = 0;
124123

125124
inline Environment* env() const { return session().env(); }
126125
inline Session& session() {
@@ -148,14 +147,18 @@ class Session::Application : public MemoryRetainer {
148147
struct Session::Application::StreamData final {
149148
// The actual number of vectors in the struct, up to kMaxVectorCount.
150149
size_t count = 0;
151-
size_t remaining = 0;
152150
// The stream identifier. If this is a negative value then no stream is
153151
// identified.
154152
int64_t id = -1;
155153
int fin = 0;
156154
ngtcp2_vec data[kMaxVectorCount]{};
157155
BaseObjectPtr<Stream> stream;
158156

157+
static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) &&
158+
alignof(ngtcp2_vec) == alignof(nghttp3_vec) &&
159+
offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) &&
160+
offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len),
161+
"ngtcp2_vec and nghttp3_vec must have identical layout");
159162
inline operator nghttp3_vec*() {
160163
return reinterpret_cast<nghttp3_vec*>(data);
161164
}
Collapse file

‎src/quic/bindingdata.h‎

Copy file name to clipboardExpand all lines: src/quic/bindingdata.h
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class Packet;
4343
V(session_datagram_status, SessionDatagramStatus) \
4444
V(session_handshake, SessionHandshake) \
4545
V(session_new, SessionNew) \
46+
V(session_new_token, SessionNewToken) \
4647
V(session_path_validation, SessionPathValidation) \
4748
V(session_ticket, SessionTicket) \
4849
V(session_version_negotiation, SessionVersionNegotiation) \
@@ -70,6 +71,7 @@ class Packet;
7071
V(cubic, "cubic") \
7172
V(disable_stateless_reset, "disableStatelessReset") \
7273
V(enable_connect_protocol, "enableConnectProtocol") \
74+
V(enable_early_data, "enableEarlyData") \
7375
V(enable_datagrams, "enableDatagrams") \
7476
V(enable_tls_trace, "tlsTrace") \
7577
V(endpoint, "Endpoint") \
@@ -121,6 +123,7 @@ class Packet;
121123
V(stream, "Stream") \
122124
V(success, "success") \
123125
V(tls_options, "tls") \
126+
V(token, "token") \
124127
V(token_expiration, "tokenExpiration") \
125128
V(token_secret, "tokenSecret") \
126129
V(transport_params, "transportParams") \
Collapse file

‎src/quic/cid.cc‎

Copy file name to clipboardExpand all lines: src/quic/cid.cc
+6-2Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,14 @@ const CID CID::kInvalid{};
8585
// CID::Hash
8686

8787
size_t CID::Hash::operator()(const CID& cid) const {
88+
// Uses the Boost hash_combine strategy: XOR each byte with the golden
89+
// ratio constant 0x9e3779b9 (derived from the fractional part of the
90+
// golden ratio, (sqrt(5)-1)/2 * 2^32) plus bit-shifted accumulator
91+
// state. This provides good avalanche properties for short byte
92+
// sequences like connection IDs (1-20 bytes).
8893
size_t hash = 0;
8994
for (size_t n = 0; n < cid.length(); n++) {
90-
hash ^= std::hash<uint8_t>{}(cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) +
91-
(hash >> 2));
95+
hash ^= cid.ptr_->data[n] + 0x9e3779b9 + (hash << 6) + (hash >> 2);
9296
}
9397
return hash;
9498
}
Collapse file

‎src/quic/data.cc‎

Copy file name to clipboardExpand all lines: src/quic/data.cc
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ using v8::Undefined;
2828
using v8::Value;
2929

3030
namespace quic {
31-
int DebugIndentScope::indent_ = 0;
31+
thread_local int DebugIndentScope::indent_ = 0;
3232

3333
Path::Path(const SocketAddress& local, const SocketAddress& remote) {
3434
ngtcp2_addr_init(&this->local, local.data(), local.length());

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.