Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3fb7fc9

Browse filesBrowse files
jasnellRafaelGSS
authored andcommitted
quic: further implementation details
PR-URL: #48244 Reviewed-By: Yagiz Nizipli <yagiz.nizipli@sentry.io> Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent 297cb6f commit 3fb7fc9
Copy full SHA for 3fb7fc9

File tree

Expand file treeCollapse file tree

7 files changed

+1229
-38
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+1229
-38
lines changed
Open diff view settings
Collapse file

‎src/dataqueue/queue.cc‎

Copy file name to clipboardExpand all lines: src/dataqueue/queue.cc
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,27 @@ class DataQueueImpl final : public DataQueue,
162162
"entries", entries_, "std::vector<std::unique_ptr<Entry>>");
163163
}
164164

165+
void addBackpressureListener(BackpressureListener* listener) override {
166+
if (idempotent_) return;
167+
DCHECK_NOT_NULL(listener);
168+
backpressure_listeners_.insert(listener);
169+
}
170+
171+
void removeBackpressureListener(BackpressureListener* listener) override {
172+
if (idempotent_) return;
173+
DCHECK_NOT_NULL(listener);
174+
backpressure_listeners_.erase(listener);
175+
}
176+
177+
void NotifyBackpressure(size_t amount) {
178+
if (idempotent_) return;
179+
for (auto& listener : backpressure_listeners_) listener->EntryRead(amount);
180+
}
181+
182+
bool HasBackpressureListeners() const noexcept {
183+
return !backpressure_listeners_.empty();
184+
}
185+
165186
std::shared_ptr<Reader> get_reader() override;
166187
SET_MEMORY_INFO_NAME(DataQueue)
167188
SET_SELF_SIZE(DataQueueImpl)
@@ -173,6 +194,8 @@ class DataQueueImpl final : public DataQueue,
173194
std::optional<uint64_t> capped_size_ = std::nullopt;
174195
bool locked_to_reader_ = false;
175196

197+
std::unordered_set<BackpressureListener*> backpressure_listeners_;
198+
176199
friend class DataQueue;
177200
friend class IdempotentDataQueueReader;
178201
friend class NonIdempotentDataQueueReader;
@@ -433,6 +456,17 @@ class NonIdempotentDataQueueReader final
433456
return;
434457
}
435458

459+
// If there is a backpressure listener, lets report on how much data
460+
// was actually read.
461+
if (data_queue_->HasBackpressureListeners()) {
462+
// How much did we actually read?
463+
size_t read = 0;
464+
for (uint64_t n = 0; n < count; n++) {
465+
read += vecs[n].len;
466+
}
467+
data_queue_->NotifyBackpressure(read);
468+
}
469+
436470
// Now that we have updated this readers state, we can forward
437471
// everything on to the outer next.
438472
std::move(next)(status, vecs, count, std::move(done));
Collapse file

‎src/dataqueue/queue.h‎

Copy file name to clipboardExpand all lines: src/dataqueue/queue.h
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ class DataQueue : public MemoryRetainer {
141141
using Done = bob::Done;
142142
};
143143

144+
// A BackpressureListener can be used to receive notifications
145+
// when a non-idempotent DataQueue releases entries as they
146+
// are consumed.
147+
class BackpressureListener {
148+
public:
149+
virtual void EntryRead(size_t amount) = 0;
150+
};
151+
144152
// A DataQueue::Entry represents a logical chunk of data in the queue.
145153
// The entry may or may not represent memory-resident data. It may
146154
// or may not be consumable more than once.
@@ -285,6 +293,10 @@ class DataQueue : public MemoryRetainer {
285293
// been set, maybeCapRemaining() will return std::nullopt.
286294
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
287295

296+
// BackpressureListeners only work on non-idempotent DataQueues.
297+
virtual void addBackpressureListener(BackpressureListener* listener) = 0;
298+
virtual void removeBackpressureListener(BackpressureListener* listener) = 0;
299+
288300
static void Initialize(Environment* env, v8::Local<v8::Object> target);
289301
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
290302
};
Collapse file

‎src/quic/application.cc‎

Copy file name to clipboardExpand all lines: src/quic/application.cc
+5-6Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "node_bob.h"
12
#include "uv.h"
23
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
34

@@ -79,7 +80,7 @@ void Session::Application::AcknowledgeStreamData(Stream* stream,
7980

8081
void Session::Application::BlockStream(int64_t id) {
8182
auto stream = session().FindStream(id);
82-
if (stream) stream->Blocked();
83+
if (stream) stream->EmitBlocked();
8384
}
8485

8586
bool Session::Application::CanAddHeader(size_t current_count,
@@ -233,7 +234,7 @@ void Session::Application::SendPendingData() {
233234
// and no more outbound data can be sent.
234235
CHECK_LE(ndatalen, 0);
235236
auto stream = session_->FindStream(stream_data.id);
236-
if (stream) stream->End();
237+
if (stream) stream->EndWritable();
237238
continue;
238239
}
239240
case NGTCP2_ERR_WRITE_MORE: {
@@ -360,10 +361,8 @@ class DefaultApplication final : public Session::Application {
360361
stream_data->data,
361362
arraysize(stream_data->data),
362363
kMaxVectorCount);
363-
switch (ret) {
364-
case bob::Status::STATUS_EOS:
365-
stream_data->fin = 1;
366-
break;
364+
if (ret == bob::Status::STATUS_EOS) {
365+
stream_data->fin = 1;
367366
}
368367
} else {
369368
stream_data->fin = 1;
Collapse file

‎src/quic/bindingdata.cc‎

Copy file name to clipboardExpand all lines: src/quic/bindingdata.cc
+6-2Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,12 @@ CallbackScopeBase::CallbackScopeBase(Environment* env)
203203
: env(env), context_scope(env->context()), try_catch(env->isolate()) {}
204204

205205
CallbackScopeBase::~CallbackScopeBase() {
206-
if (try_catch.HasCaught() && !try_catch.HasTerminated()) {
207-
errors::TriggerUncaughtException(env->isolate(), try_catch);
206+
if (try_catch.HasCaught()) {
207+
if (!try_catch.HasTerminated() && env->can_call_into_js()) {
208+
errors::TriggerUncaughtException(env->isolate(), try_catch);
209+
} else {
210+
try_catch.ReThrow();
211+
}
208212
}
209213
}
210214

Collapse file

‎src/quic/bindingdata.h‎

Copy file name to clipboardExpand all lines: src/quic/bindingdata.h
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ constexpr size_t kMaxVectorCount = 16;
103103
V(session_version_negotiation, SessionVersionNegotiation) \
104104
V(session_path_validation, SessionPathValidation) \
105105
V(stream_close, StreamClose) \
106-
V(stream_error, StreamError) \
107106
V(stream_created, StreamCreated) \
108107
V(stream_reset, StreamReset) \
109108
V(stream_headers, StreamHeaders) \
@@ -304,6 +303,8 @@ struct CallbackScopeBase {
304303
~CallbackScopeBase();
305304
};
306305

306+
// Maintains a strong reference to BaseObject type ptr to keep it alive during
307+
// a MakeCallback during which it might be destroyed.
307308
template <typename T>
308309
struct CallbackScope final : public CallbackScopeBase {
309310
BaseObjectPtr<T> ref;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.