Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4dc59b9

Browse filesBrowse files
addaleaxMylesBorins
authored andcommitted
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase. Originally landed in the QUIC repo Original review metadata: ``` PR-URL: nodejs/quic#165 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com> ``` Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: #31871 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
1 parent 478450d commit 4dc59b9
Copy full SHA for 4dc59b9

File tree

Expand file treeCollapse file tree

3 files changed

+265
-60
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+265
-60
lines changed
Open diff view settings
Collapse file

‎lib/dgram.js‎

Copy file name to clipboardExpand all lines: lib/dgram.js
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
232232
this.on('listening', onListening);
233233
}
234234

235-
if (port instanceof UDP) {
235+
if (port !== null &&
236+
typeof port === 'object' &&
237+
typeof port.recvStart === 'function') {
236238
replaceHandle(this, port);
237239
startListening(this);
238240
return this;
Collapse file

‎src/udp_wrap.cc‎

Copy file name to clipboardExpand all lines: src/udp_wrap.cc
+151-54Lines changed: 151 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
6969
}
7070

7171

72-
inline bool SendWrap::have_callback() const {
72+
bool SendWrap::have_callback() const {
7373
return have_callback_;
7474
}
7575

76+
UDPListener::~UDPListener() {
77+
if (wrap_ != nullptr)
78+
wrap_->set_listener(nullptr);
79+
}
80+
81+
UDPWrapBase::~UDPWrapBase() {
82+
set_listener(nullptr);
83+
}
84+
85+
UDPListener* UDPWrapBase::listener() const {
86+
CHECK_NOT_NULL(listener_);
87+
return listener_;
88+
}
89+
90+
void UDPWrapBase::set_listener(UDPListener* listener) {
91+
if (listener_ != nullptr)
92+
listener_->wrap_ = nullptr;
93+
listener_ = listener;
94+
if (listener_ != nullptr) {
95+
CHECK_NULL(listener_->wrap_);
96+
listener_->wrap_ = this;
97+
}
98+
}
99+
100+
UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
101+
CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
102+
return static_cast<UDPWrapBase*>(
103+
obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
104+
}
105+
106+
void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
107+
env->SetProtoMethod(t, "recvStart", RecvStart);
108+
env->SetProtoMethod(t, "recvStop", RecvStop);
109+
}
76110

77111
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
78112
: HandleWrap(env,
79113
object,
80114
reinterpret_cast<uv_handle_t*>(&handle_),
81115
AsyncWrap::PROVIDER_UDPWRAP) {
116+
object->SetAlignedPointerInInternalField(
117+
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
118+
82119
int r = uv_udp_init(env->event_loop(), &handle_);
83120
CHECK_EQ(r, 0); // can't fail anyway
121+
122+
set_listener(this);
84123
}
85124

86125

@@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
91130
Environment* env = Environment::GetCurrent(context);
92131

93132
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
94-
t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
133+
t->InstanceTemplate()->SetInternalFieldCount(
134+
UDPWrapBase::kInternalFieldCount);
95135
Local<String> udpString =
96136
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
97137
t->SetClassName(udpString);
@@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
112152
Local<FunctionTemplate>(),
113153
attributes);
114154

155+
UDPWrapBase::AddMethods(env, t);
115156
env->SetProtoMethod(t, "open", Open);
116157
env->SetProtoMethod(t, "bind", Bind);
117158
env->SetProtoMethod(t, "connect", Connect);
@@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
120161
env->SetProtoMethod(t, "connect6", Connect6);
121162
env->SetProtoMethod(t, "send6", Send6);
122163
env->SetProtoMethod(t, "disconnect", Disconnect);
123-
env->SetProtoMethod(t, "recvStart", RecvStart);
124-
env->SetProtoMethod(t, "recvStop", RecvStop);
125164
env->SetProtoMethod(t, "getpeername",
126165
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
127166
env->SetProtoMethod(t, "getsockname",
@@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
220259
flags);
221260
}
222261

262+
if (err == 0)
263+
wrap->listener()->OnAfterBind();
264+
223265
args.GetReturnValue().Set(err);
224266
}
225267

@@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
464506
CHECK(args[3]->IsBoolean());
465507
}
466508

467-
Local<Object> req_wrap_obj = args[0].As<Object>();
468509
Local<Array> chunks = args[1].As<Array>();
469510
// it is faster to fetch the length of the
470511
// array in js-land
471512
size_t count = args[2].As<Uint32>()->Value();
472-
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
473-
474-
size_t msg_size = 0;
475513

476514
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
477515

@@ -482,7 +520,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
482520
size_t length = Buffer::Length(chunk);
483521

484522
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
485-
msg_size += length;
486523
}
487524

488525
int err = 0;
@@ -492,14 +529,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
492529
const unsigned short port = args[3].As<Uint32>()->Value();
493530
node::Utf8Value address(env->isolate(), args[4]);
494531
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
495-
if (err == 0) {
532+
if (err == 0)
496533
addr = reinterpret_cast<sockaddr*>(&addr_storage);
497-
}
498534
}
499535

500-
uv_buf_t* bufs_ptr = *bufs;
501-
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
502-
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
536+
if (err == 0) {
537+
wrap->current_send_req_wrap_ = args[0].As<Object>();
538+
wrap->current_send_has_callback_ =
539+
sendto ? args[5]->IsTrue() : args[3]->IsTrue();
540+
541+
err = wrap->Send(*bufs, count, addr);
542+
543+
wrap->current_send_req_wrap_.Clear();
544+
wrap->current_send_has_callback_ = false;
545+
}
546+
547+
args.GetReturnValue().Set(err);
548+
}
549+
550+
ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
551+
size_t count,
552+
const sockaddr* addr) {
553+
if (IsHandleClosing()) return UV_EBADF;
554+
555+
size_t msg_size = 0;
556+
for (size_t i = 0; i < count; i++)
557+
msg_size += bufs_ptr[i].len;
558+
559+
int err = 0;
560+
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
561+
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
503562
if (err == UV_ENOSYS || err == UV_EAGAIN) {
504563
err = 0;
505564
} else if (err >= 0) {
@@ -517,28 +576,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
517576
CHECK_EQ(static_cast<size_t>(err), msg_size);
518577
// + 1 so that the JS side can distinguish 0-length async sends from
519578
// 0-length sync sends.
520-
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
521-
return;
579+
return msg_size + 1;
522580
}
523581
}
524582
}
525583

526584
if (err == 0) {
527-
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
528-
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
529-
req_wrap->msg_size = msg_size;
530-
531-
err = req_wrap->Dispatch(uv_udp_send,
532-
&wrap->handle_,
533-
bufs_ptr,
534-
count,
535-
addr,
536-
OnSend);
585+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
586+
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
587+
if (req_wrap == nullptr) return UV_ENOSYS;
588+
589+
err = req_wrap->Dispatch(
590+
uv_udp_send,
591+
&handle_,
592+
bufs_ptr,
593+
count,
594+
addr,
595+
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
596+
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
597+
self->listener()->OnSendDone(
598+
ReqWrap<uv_udp_send_t>::from_req(req), status);
599+
}});
537600
if (err)
538601
delete req_wrap;
539602
}
540603

541-
args.GetReturnValue().Set(err);
604+
return err;
605+
}
606+
607+
608+
ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
609+
SendWrap* req_wrap = new SendWrap(env(),
610+
current_send_req_wrap_,
611+
current_send_has_callback_);
612+
req_wrap->msg_size = msg_size;
613+
return req_wrap;
542614
}
543615

544616

@@ -552,31 +624,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
552624
}
553625

554626

555-
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
556-
UDPWrap* wrap;
557-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
558-
args.Holder(),
559-
args.GetReturnValue().Set(UV_EBADF));
560-
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
627+
AsyncWrap* UDPWrap::GetAsyncWrap() {
628+
return this;
629+
}
630+
631+
int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
632+
return uv_udp_getpeername(&handle_, name, namelen);
633+
}
634+
635+
int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
636+
return uv_udp_getsockname(&handle_, name, namelen);
637+
}
638+
639+
void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
640+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
641+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
642+
}
643+
644+
int UDPWrap::RecvStart() {
645+
if (IsHandleClosing()) return UV_EBADF;
646+
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
561647
// UV_EALREADY means that the socket is already bound but that's okay
562648
if (err == UV_EALREADY)
563649
err = 0;
564-
args.GetReturnValue().Set(err);
650+
return err;
565651
}
566652

567653

568-
void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
569-
UDPWrap* wrap;
570-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
571-
args.Holder(),
572-
args.GetReturnValue().Set(UV_EBADF));
573-
int r = uv_udp_recv_stop(&wrap->handle_);
574-
args.GetReturnValue().Set(r);
654+
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
655+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
656+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
657+
}
658+
659+
int UDPWrap::RecvStop() {
660+
if (IsHandleClosing()) return UV_EBADF;
661+
return uv_udp_recv_stop(&handle_);
575662
}
576663

577664

578-
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
579-
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
665+
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
666+
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
580667
if (req_wrap->have_callback()) {
581668
Environment* env = req_wrap->env();
582669
HandleScope handle_scope(env->isolate());
@@ -593,43 +680,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
593680
void UDPWrap::OnAlloc(uv_handle_t* handle,
594681
size_t suggested_size,
595682
uv_buf_t* buf) {
596-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
597-
*buf = wrap->env()->AllocateManaged(suggested_size).release();
683+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
684+
reinterpret_cast<uv_udp_t*>(handle));
685+
*buf = wrap->listener()->OnAlloc(suggested_size);
686+
}
687+
688+
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
689+
return env()->AllocateManaged(suggested_size).release();
598690
}
599691

600692
void UDPWrap::OnRecv(uv_udp_t* handle,
601693
ssize_t nread,
602-
const uv_buf_t* buf_,
603-
const struct sockaddr* addr,
694+
const uv_buf_t* buf,
695+
const sockaddr* addr,
604696
unsigned int flags) {
605-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
606-
Environment* env = wrap->env();
697+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
698+
wrap->listener()->OnRecv(nread, *buf, addr, flags);
699+
}
607700

608-
AllocatedBuffer buf(env, *buf_);
701+
void UDPWrap::OnRecv(ssize_t nread,
702+
const uv_buf_t& buf_,
703+
const sockaddr* addr,
704+
unsigned int flags) {
705+
Environment* env = this->env();
706+
AllocatedBuffer buf(env, buf_);
609707
if (nread == 0 && addr == nullptr) {
610708
return;
611709
}
612710

613711
HandleScope handle_scope(env->isolate());
614712
Context::Scope context_scope(env->context());
615713

616-
Local<Object> wrap_obj = wrap->object();
617714
Local<Value> argv[] = {
618715
Integer::New(env->isolate(), nread),
619-
wrap_obj,
716+
object(),
620717
Undefined(env->isolate()),
621718
Undefined(env->isolate())
622719
};
623720

624721
if (nread < 0) {
625-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
722+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
626723
return;
627724
}
628725

629726
buf.Resize(nread);
630727
argv[2] = buf.ToBuffer().ToLocalChecked();
631728
argv[3] = AddressToJS(env, addr);
632-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
729+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
633730
}
634731

635732
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.