Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2347ce8

Browse filesBrowse files
committed
src: unify thread pool work
Instead of using the libuv mechanism directly, provide an internal `ThreadPoolWork` wrapper that takes care of increasing/decreasing the waiting request counter. PR-URL: #19377 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 7153bec commit 2347ce8
Copy full SHA for 2347ce8

File tree

Expand file treeCollapse file tree

4 files changed

+108
-128
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+108
-128
lines changed
Open diff view settings
Collapse file

‎src/node_api.cc‎

Copy file name to clipboardExpand all lines: src/node_api.cc
+28-45Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3338,7 +3338,7 @@ static napi_status ConvertUVErrorCode(int code) {
33383338
}
33393339

33403340
// Wrapper around uv_work_t which calls user-provided callbacks.
3341-
class Work : public node::AsyncResource {
3341+
class Work : public node::AsyncResource, public node::ThreadPoolWork {
33423342
private:
33433343
explicit Work(napi_env env,
33443344
v8::Local<v8::Object> async_resource,
@@ -3349,15 +3349,14 @@ class Work : public node::AsyncResource {
33493349
: AsyncResource(env->isolate,
33503350
async_resource,
33513351
*v8::String::Utf8Value(env->isolate, async_resource_name)),
3352-
_env(env),
3353-
_data(data),
3354-
_execute(execute),
3355-
_complete(complete) {
3356-
memset(&_request, 0, sizeof(_request));
3357-
_request.data = this;
3352+
ThreadPoolWork(node::Environment::GetCurrent(env->isolate)),
3353+
_env(env),
3354+
_data(data),
3355+
_execute(execute),
3356+
_complete(complete) {
33583357
}
33593358

3360-
~Work() { }
3359+
virtual ~Work() { }
33613360

33623361
public:
33633362
static Work* New(napi_env env,
@@ -3374,47 +3373,36 @@ class Work : public node::AsyncResource {
33743373
delete work;
33753374
}
33763375

3377-
static void ExecuteCallback(uv_work_t* req) {
3378-
Work* work = static_cast<Work*>(req->data);
3379-
work->_execute(work->_env, work->_data);
3376+
void DoThreadPoolWork() override {
3377+
_execute(_env, _data);
33803378
}
33813379

3382-
static void CompleteCallback(uv_work_t* req, int status) {
3383-
Work* work = static_cast<Work*>(req->data);
3380+
void AfterThreadPoolWork(int status) {
3381+
if (_complete == nullptr)
3382+
return;
33843383

3385-
if (work->_complete != nullptr) {
3386-
napi_env env = work->_env;
3384+
// Establish a handle scope here so that every callback doesn't have to.
3385+
// Also it is needed for the exception-handling below.
3386+
v8::HandleScope scope(_env->isolate);
33873387

3388-
// Establish a handle scope here so that every callback doesn't have to.
3389-
// Also it is needed for the exception-handling below.
3390-
v8::HandleScope scope(env->isolate);
3391-
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
3392-
env_->DecreaseWaitingRequestCounter();
3388+
CallbackScope callback_scope(this);
33933389

3394-
CallbackScope callback_scope(work);
3390+
NAPI_CALL_INTO_MODULE(_env,
3391+
_complete(_env, ConvertUVErrorCode(status), _data),
3392+
[this] (v8::Local<v8::Value> local_err) {
3393+
// If there was an unhandled exception in the complete callback,
3394+
// report it as a fatal exception. (There is no JavaScript on the
3395+
// callstack that can possibly handle it.)
3396+
v8impl::trigger_fatal_exception(_env, local_err);
3397+
});
33953398

3396-
NAPI_CALL_INTO_MODULE(env,
3397-
work->_complete(env, ConvertUVErrorCode(status), work->_data),
3398-
[env] (v8::Local<v8::Value> local_err) {
3399-
// If there was an unhandled exception in the complete callback,
3400-
// report it as a fatal exception. (There is no JavaScript on the
3401-
// callstack that can possibly handle it.)
3402-
v8impl::trigger_fatal_exception(env, local_err);
3403-
});
3404-
3405-
// Note: Don't access `work` after this point because it was
3406-
// likely deleted by the complete callback.
3407-
}
3408-
}
3409-
3410-
uv_work_t* Request() {
3411-
return &_request;
3399+
// Note: Don't access `work` after this point because it was
3400+
// likely deleted by the complete callback.
34123401
}
34133402

34143403
private:
34153404
napi_env _env;
34163405
void* _data;
3417-
uv_work_t _request;
34183406
napi_async_execute_callback _execute;
34193407
napi_async_complete_callback _complete;
34203408
};
@@ -3491,12 +3479,7 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
34913479

34923480
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
34933481

3494-
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
3495-
env_->IncreaseWaitingRequestCounter();
3496-
CALL_UV(env, uv_queue_work(event_loop,
3497-
w->Request(),
3498-
uvimpl::Work::ExecuteCallback,
3499-
uvimpl::Work::CompleteCallback));
3482+
w->ScheduleWork();
35003483

35013484
return napi_clear_last_error(env);
35023485
}
@@ -3507,7 +3490,7 @@ napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
35073490

35083491
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
35093492

3510-
CALL_UV(env, uv_cancel(reinterpret_cast<uv_req_t*>(w->Request())));
3493+
CALL_UV(env, w->CancelWork());
35113494

35123495
return napi_clear_last_error(env);
35133496
}
Collapse file

‎src/node_crypto.cc‎

Copy file name to clipboardExpand all lines: src/node_crypto.cc
+30-69Lines changed: 30 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4562,7 +4562,7 @@ bool ECDH::IsKeyPairValid() {
45624562
}
45634563

45644564

4565-
class PBKDF2Request : public AsyncWrap {
4565+
class PBKDF2Request : public AsyncWrap, public ThreadPoolWork {
45664566
public:
45674567
PBKDF2Request(Environment* env,
45684568
Local<Object> object,
@@ -4572,6 +4572,7 @@ class PBKDF2Request : public AsyncWrap {
45724572
int keylen,
45734573
int iteration_count)
45744574
: AsyncWrap(env, object, AsyncWrap::PROVIDER_PBKDF2REQUEST),
4575+
ThreadPoolWork(env),
45754576
digest_(digest),
45764577
success_(false),
45774578
pass_(std::move(pass)),
@@ -4580,21 +4581,14 @@ class PBKDF2Request : public AsyncWrap {
45804581
iteration_count_(iteration_count) {
45814582
}
45824583

4583-
uv_work_t* work_req() {
4584-
return &work_req_;
4585-
}
4586-
45874584
size_t self_size() const override { return sizeof(*this); }
45884585

4589-
static void Work(uv_work_t* work_req);
4590-
void Work();
4586+
void DoThreadPoolWork() override;
4587+
void AfterThreadPoolWork(int status) override;
45914588

4592-
static void After(uv_work_t* work_req, int status);
45934589
void After(Local<Value> (*argv)[2]);
4594-
void After();
45954590

45964591
private:
4597-
uv_work_t work_req_;
45984592
const EVP_MD* digest_;
45994593
bool success_;
46004594
MallocedBuffer<char> pass_;
@@ -4604,7 +4598,7 @@ class PBKDF2Request : public AsyncWrap {
46044598
};
46054599

46064600

4607-
void PBKDF2Request::Work() {
4601+
void PBKDF2Request::DoThreadPoolWork() {
46084602
success_ =
46094603
PKCS5_PBKDF2_HMAC(
46104604
pass_.data, pass_.size,
@@ -4617,12 +4611,6 @@ void PBKDF2Request::Work() {
46174611
}
46184612

46194613

4620-
void PBKDF2Request::Work(uv_work_t* work_req) {
4621-
PBKDF2Request* req = ContainerOf(&PBKDF2Request::work_req_, work_req);
4622-
req->Work();
4623-
}
4624-
4625-
46264614
void PBKDF2Request::After(Local<Value> (*argv)[2]) {
46274615
if (success_) {
46284616
(*argv)[0] = Null(env()->isolate());
@@ -4635,7 +4623,12 @@ void PBKDF2Request::After(Local<Value> (*argv)[2]) {
46354623
}
46364624

46374625

4638-
void PBKDF2Request::After() {
4626+
void PBKDF2Request::AfterThreadPoolWork(int status) {
4627+
std::unique_ptr<PBKDF2Request> req(this);
4628+
if (status == UV_ECANCELED)
4629+
return;
4630+
CHECK_EQ(status, 0);
4631+
46394632
HandleScope handle_scope(env()->isolate());
46404633
Context::Scope context_scope(env()->context());
46414634
Local<Value> argv[2];
@@ -4644,17 +4637,6 @@ void PBKDF2Request::After() {
46444637
}
46454638

46464639

4647-
void PBKDF2Request::After(uv_work_t* work_req, int status) {
4648-
std::unique_ptr<PBKDF2Request> req(
4649-
ContainerOf(&PBKDF2Request::work_req_, work_req));
4650-
req->env()->DecreaseWaitingRequestCounter();
4651-
if (status == UV_ECANCELED)
4652-
return;
4653-
CHECK_EQ(status, 0);
4654-
req->After();
4655-
}
4656-
4657-
46584640
void PBKDF2(const FunctionCallbackInfo<Value>& args) {
46594641
Environment* env = Environment::GetCurrent(args);
46604642

@@ -4701,14 +4683,10 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
47014683
if (args[5]->IsFunction()) {
47024684
obj->Set(env->context(), env->ondone_string(), args[5]).FromJust();
47034685

4704-
env->IncreaseWaitingRequestCounter();
4705-
uv_queue_work(env->event_loop(),
4706-
req.release()->work_req(),
4707-
PBKDF2Request::Work,
4708-
PBKDF2Request::After);
4686+
req.release()->ScheduleWork();
47094687
} else {
47104688
env->PrintSyncTrace();
4711-
req->Work();
4689+
req->DoThreadPoolWork();
47124690
Local<Value> argv[2];
47134691
req->After(&argv);
47144692

@@ -4721,7 +4699,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
47214699

47224700

47234701
// Only instantiate within a valid HandleScope.
4724-
class RandomBytesRequest : public AsyncWrap {
4702+
class RandomBytesRequest : public AsyncWrap, public ThreadPoolWork {
47254703
public:
47264704
enum FreeMode { FREE_DATA, DONT_FREE_DATA };
47274705

@@ -4731,16 +4709,13 @@ class RandomBytesRequest : public AsyncWrap {
47314709
char* data,
47324710
FreeMode free_mode)
47334711
: AsyncWrap(env, object, AsyncWrap::PROVIDER_RANDOMBYTESREQUEST),
4712+
ThreadPoolWork(env),
47344713
error_(0),
47354714
size_(size),
47364715
data_(data),
47374716
free_mode_(free_mode) {
47384717
}
47394718

4740-
uv_work_t* work_req() {
4741-
return &work_req_;
4742-
}
4743-
47444719
inline size_t size() const {
47454720
return size_;
47464721
}
@@ -4778,7 +4753,8 @@ class RandomBytesRequest : public AsyncWrap {
47784753

47794754
size_t self_size() const override { return sizeof(*this); }
47804755

4781-
uv_work_t work_req_;
4756+
void DoThreadPoolWork() override;
4757+
void AfterThreadPoolWork(int status) override;
47824758

47834759
private:
47844760
unsigned long error_; // NOLINT(runtime/int)
@@ -4788,21 +4764,17 @@ class RandomBytesRequest : public AsyncWrap {
47884764
};
47894765

47904766

4791-
void RandomBytesWork(uv_work_t* work_req) {
4792-
RandomBytesRequest* req =
4793-
ContainerOf(&RandomBytesRequest::work_req_, work_req);
4794-
4767+
void RandomBytesRequest::DoThreadPoolWork() {
47954768
// Ensure that OpenSSL's PRNG is properly seeded.
47964769
CheckEntropy();
47974770

4798-
const int r = RAND_bytes(reinterpret_cast<unsigned char*>(req->data()),
4799-
req->size());
4771+
const int r = RAND_bytes(reinterpret_cast<unsigned char*>(data_), size_);
48004772

48014773
// RAND_bytes() returns 0 on error.
48024774
if (r == 0) {
4803-
req->set_error(ERR_get_error()); // NOLINT(runtime/int)
4775+
set_error(ERR_get_error()); // NOLINT(runtime/int)
48044776
} else if (r == -1) {
4805-
req->set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
4777+
set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
48064778
}
48074779
}
48084780

@@ -4840,27 +4812,24 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {
48404812
}
48414813

48424814

4843-
void RandomBytesAfter(uv_work_t* work_req, int status) {
4844-
std::unique_ptr<RandomBytesRequest> req(
4845-
ContainerOf(&RandomBytesRequest::work_req_, work_req));
4846-
Environment* env = req->env();
4847-
env->DecreaseWaitingRequestCounter();
4815+
void RandomBytesRequest::AfterThreadPoolWork(int status) {
4816+
std::unique_ptr<RandomBytesRequest> req(this);
48484817
if (status == UV_ECANCELED)
48494818
return;
48504819
CHECK_EQ(status, 0);
4851-
HandleScope handle_scope(env->isolate());
4852-
Context::Scope context_scope(env->context());
4820+
HandleScope handle_scope(env()->isolate());
4821+
Context::Scope context_scope(env()->context());
48534822
Local<Value> argv[2];
4854-
RandomBytesCheck(req.get(), &argv);
4855-
req->MakeCallback(env->ondone_string(), arraysize(argv), argv);
4823+
RandomBytesCheck(this, &argv);
4824+
MakeCallback(env()->ondone_string(), arraysize(argv), argv);
48564825
}
48574826

48584827

48594828
void RandomBytesProcessSync(Environment* env,
48604829
std::unique_ptr<RandomBytesRequest> req,
48614830
Local<Value> (*argv)[2]) {
48624831
env->PrintSyncTrace();
4863-
RandomBytesWork(req->work_req());
4832+
req->DoThreadPoolWork();
48644833
RandomBytesCheck(req.get(), argv);
48654834

48664835
if (!(*argv)[0]->IsNull())
@@ -4887,11 +4856,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
48874856
if (args[1]->IsFunction()) {
48884857
obj->Set(env->context(), env->ondone_string(), args[1]).FromJust();
48894858

4890-
env->IncreaseWaitingRequestCounter();
4891-
uv_queue_work(env->event_loop(),
4892-
req.release()->work_req(),
4893-
RandomBytesWork,
4894-
RandomBytesAfter);
4859+
req.release()->ScheduleWork();
48954860
args.GetReturnValue().Set(obj);
48964861
} else {
48974862
Local<Value> argv[2];
@@ -4927,11 +4892,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
49274892
if (args[3]->IsFunction()) {
49284893
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();
49294894

4930-
env->IncreaseWaitingRequestCounter();
4931-
uv_queue_work(env->event_loop(),
4932-
req.release()->work_req(),
4933-
RandomBytesWork,
4934-
RandomBytesAfter);
4895+
req.release()->ScheduleWork();
49354896
args.GetReturnValue().Set(obj);
49364897
} else {
49374898
Local<Value> argv[2];
Collapse file

‎src/node_internals.h‎

Copy file name to clipboardExpand all lines: src/node_internals.h
+35Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,41 @@ class InternalCallbackScope {
508508
bool closed_ = false;
509509
};
510510

511+
class ThreadPoolWork {
512+
public:
513+
explicit inline ThreadPoolWork(Environment* env) : env_(env) {}
514+
inline void ScheduleWork();
515+
inline int CancelWork();
516+
517+
virtual void DoThreadPoolWork() = 0;
518+
virtual void AfterThreadPoolWork(int status) = 0;
519+
520+
private:
521+
Environment* env_;
522+
uv_work_t work_req_;
523+
};
524+
525+
void ThreadPoolWork::ScheduleWork() {
526+
env_->IncreaseWaitingRequestCounter();
527+
int status = uv_queue_work(
528+
env_->event_loop(),
529+
&work_req_,
530+
[](uv_work_t* req) {
531+
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
532+
self->DoThreadPoolWork();
533+
},
534+
[](uv_work_t* req, int status) {
535+
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
536+
self->env_->DecreaseWaitingRequestCounter();
537+
self->AfterThreadPoolWork(status);
538+
});
539+
CHECK_EQ(status, 0);
540+
}
541+
542+
int ThreadPoolWork::CancelWork() {
543+
return uv_cancel(reinterpret_cast<uv_req_t*>(&work_req_));
544+
}
545+
511546
static inline const char *errno_string(int errorno) {
512547
#define ERRNO_CASE(e) case e: return #e;
513548
switch (errorno) {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.