Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f62967c

Browse filesBrowse files
addaleaxcodebytere
authored andcommitted
src: enable StreamPipe for generic StreamBases
Originally landed in the nodejs/quic repo and used there for file sending. Original review: ``` PR-URL: nodejs/quic#150 Reviewed-By: James M Snell <jasnell@gmail.com> ``` PR-URL: #31869 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
1 parent b9f3bfe commit f62967c
Copy full SHA for f62967c

File tree

Expand file treeCollapse file tree

2 files changed

+47
-16
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+47
-16
lines changed
Open diff view settings
Collapse file

‎src/stream_pipe.cc‎

Copy file name to clipboardExpand all lines: src/stream_pipe.cc
+43-13Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
2525
source->PushStreamListener(&readable_listener_);
2626
sink->PushStreamListener(&writable_listener_);
2727

28-
CHECK(sink->HasWantsWrite());
28+
uses_wants_write_ = sink->HasWantsWrite();
2929

3030
// Set up links between this object and the source/sink objects.
3131
// In particular, this makes sure that they are garbage collected as a group,
@@ -66,7 +66,8 @@ void StreamPipe::Unpipe(bool is_in_deletion) {
6666
is_closed_ = true;
6767
is_reading_ = false;
6868
source()->RemoveStreamListener(&readable_listener_);
69-
sink()->RemoveStreamListener(&writable_listener_);
69+
if (pending_writes_ == 0)
70+
sink()->RemoveStreamListener(&writable_listener_);
7071

7172
if (is_in_deletion) return;
7273

@@ -126,13 +127,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
126127
// EOF or error; stop reading and pass the error to the previous listener
127128
// (which might end up in JS).
128129
pipe->is_eof_ = true;
130+
// Cache `sink()` here because the previous listener might do things
131+
// that eventually lead to an `Unpipe()` call.
132+
StreamBase* sink = pipe->sink();
129133
stream()->ReadStop();
130134
CHECK_NOT_NULL(previous_listener_);
131135
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
132136
// If we’re not writing, close now. Otherwise, we’ll do that in
133137
// `OnStreamAfterWrite()`.
134-
if (!pipe->is_writing_) {
135-
pipe->ShutdownWritable();
138+
if (pipe->pending_writes_ == 0) {
139+
sink->Shutdown();
136140
pipe->Unpipe();
137141
}
138142
return;
@@ -142,32 +146,40 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
142146
}
143147

144148
void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
149+
CHECK(uses_wants_write_ || pending_writes_ == 0);
145150
uv_buf_t buffer = uv_buf_init(buf.data(), nread);
146151
StreamWriteResult res = sink()->Write(&buffer, 1);
152+
pending_writes_++;
147153
if (!res.async) {
148154
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
149155
} else {
150-
is_writing_ = true;
151156
is_reading_ = false;
152157
res.wrap->SetAllocatedStorage(std::move(buf));
153158
if (source() != nullptr)
154159
source()->ReadStop();
155160
}
156161
}
157162

158-
void StreamPipe::ShutdownWritable() {
159-
sink()->Shutdown();
160-
}
161-
162163
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
163164
int status) {
164165
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
165-
pipe->is_writing_ = false;
166+
pipe->pending_writes_--;
167+
if (pipe->is_closed_) {
168+
if (pipe->pending_writes_ == 0) {
169+
Environment* env = pipe->env();
170+
HandleScope handle_scope(env->isolate());
171+
Context::Scope context_scope(env->context());
172+
pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
173+
stream()->RemoveStreamListener(this);
174+
}
175+
return;
176+
}
177+
166178
if (pipe->is_eof_) {
167179
HandleScope handle_scope(pipe->env()->isolate());
168180
InternalCallbackScope callback_scope(pipe,
169181
InternalCallbackScope::kSkipTaskQueues);
170-
pipe->ShutdownWritable();
182+
pipe->sink()->Shutdown();
171183
pipe->Unpipe();
172184
return;
173185
}
@@ -179,6 +191,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
179191
prev->OnStreamAfterWrite(w, status);
180192
return;
181193
}
194+
195+
if (!pipe->uses_wants_write_) {
196+
OnStreamWantsWrite(65536);
197+
}
182198
}
183199

184200
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
@@ -202,6 +218,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
202218
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
203219
pipe->sink_destroyed_ = true;
204220
pipe->is_eof_ = true;
221+
pipe->pending_writes_ = 0;
205222
pipe->Unpipe();
206223
}
207224

@@ -242,8 +259,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
242259
StreamPipe* pipe;
243260
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
244261
pipe->is_closed_ = false;
245-
if (pipe->wanted_data_ > 0)
246-
pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
262+
pipe->writable_listener_.OnStreamWantsWrite(65536);
247263
}
248264

249265
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
@@ -252,6 +268,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
252268
pipe->Unpipe();
253269
}
254270

271+
void StreamPipe::IsClosed(const FunctionCallbackInfo<Value>& args) {
272+
StreamPipe* pipe;
273+
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
274+
args.GetReturnValue().Set(pipe->is_closed_);
275+
}
276+
277+
void StreamPipe::PendingWrites(const FunctionCallbackInfo<Value>& args) {
278+
StreamPipe* pipe;
279+
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
280+
args.GetReturnValue().Set(pipe->pending_writes_);
281+
}
282+
255283
namespace {
256284

257285
void InitializeStreamPipe(Local<Object> target,
@@ -266,6 +294,8 @@ void InitializeStreamPipe(Local<Object> target,
266294
FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
267295
env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
268296
env->SetProtoMethod(pipe, "start", StreamPipe::Start);
297+
env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed);
298+
env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites);
269299
pipe->Inherit(AsyncWrap::GetConstructorTemplate(env));
270300
pipe->SetClassName(stream_pipe_string);
271301
pipe->InstanceTemplate()->SetInternalFieldCount(1);
Collapse file

‎src/stream_pipe.h‎

Copy file name to clipboardExpand all lines: src/stream_pipe.h
+4-3Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ class StreamPipe : public AsyncWrap {
1717
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
1818
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
1919
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
20+
static void IsClosed(const v8::FunctionCallbackInfo<v8::Value>& args);
21+
static void PendingWrites(const v8::FunctionCallbackInfo<v8::Value>& args);
2022

2123
SET_NO_MEMORY_INFO()
2224
SET_MEMORY_INFO_NAME(StreamPipe)
@@ -26,14 +28,13 @@ class StreamPipe : public AsyncWrap {
2628
inline StreamBase* source();
2729
inline StreamBase* sink();
2830

29-
inline void ShutdownWritable();
30-
31+
int pending_writes_ = 0;
3132
bool is_reading_ = false;
32-
bool is_writing_ = false;
3333
bool is_eof_ = false;
3434
bool is_closed_ = true;
3535
bool sink_destroyed_ = false;
3636
bool source_destroyed_ = false;
37+
bool uses_wants_write_ = false;
3738

3839
// Set a default value so that when we’re coming from Start(), we know
3940
// that we don’t want to read just yet.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.