From 9c01821796dee6608aafd6cb7af4a0fbe1d83504 Mon Sep 17 00:00:00 2001 From: Stjepan Rajko Date: Mon, 30 Jan 2017 10:04:44 -0700 Subject: [PATCH 1/5] Add support for reverse callbacks to ThreadPool --- .../templates/manual/include/thread_pool.h | 37 ++++++-- generate/templates/manual/src/thread_pool.cc | 90 +++++++++++++------ 2 files changed, 93 insertions(+), 34 deletions(-) diff --git a/generate/templates/manual/include/thread_pool.h b/generate/templates/manual/include/thread_pool.h index 6d29b3ddf..bcb3590d7 100644 --- a/generate/templates/manual/include/thread_pool.h +++ b/generate/templates/manual/include/thread_pool.h @@ -8,11 +8,20 @@ class ThreadPool { typedef void (*Callback) (void *); struct Work { Callback workCallback; - Callback loopCallback; + Callback completionCallback; void *data; - Work(Callback workCallback, Callback loopCallback, void *data) - : workCallback(workCallback), loopCallback(loopCallback), data(data) { + Work(Callback workCallback, Callback completionCallback, void *data) + : workCallback(workCallback), completionCallback(completionCallback), data(data) { + } + }; + + struct ReverseCall { + Callback reverseCallback; + void *data; + + ReverseCall(Callback reverseCallback, void *data) + : reverseCallback(reverseCallback), data(data) { } }; @@ -23,14 +32,21 @@ class ThreadPool { int workInProgressCount; // completion callbacks to be performed on the loop - std::queue loopQueue; - uv_mutex_t loopMutex; - uv_async_t loopAsync; + std::queue completionQueue; + uv_mutex_t completionMutex; + uv_async_t completionAsync; + + // async callback made from the threadpool, executed in the loop + std::queue reverseQueue; + uv_mutex_t reverseMutex; + uv_async_t reverseAsync; static void RunEventQueue(void *threadPool); void RunEventQueue(); - static void RunLoopCallbacks(uv_async_t* handle); - void RunLoopCallbacks(); + static void RunCompletionCallbacks(uv_async_t* handle); + void RunCompletionCallbacks(); + static void RunReverseCallbacks(uv_async_t *handle); + void RunReverseCallbacks(); public: // Initializes thread pool and spins up the requested number of threads // The provided loop will be used for completion callbacks, whenever @@ -39,7 +55,10 @@ class ThreadPool { // Queues work on the thread pool, followed by completion call scheduled // on the loop provided in the constructor. // QueueWork should be called on the loop provided in the constructor. - void QueueWork(Callback workCallback, Callback loopCallback, void *data); + void QueueWork(Callback workCallback, Callback completionCallback, void *data); + // Queues a callback on the loop provided in the constructor + // these block the calling thread's execution until the callback completes + void ExecuteReverseCallback(Callback reverseCallback, void *data); }; #endif diff --git a/generate/templates/manual/src/thread_pool.cc b/generate/templates/manual/src/thread_pool.cc index abf7a29a8..c89da76a3 100644 --- a/generate/templates/manual/src/thread_pool.cc +++ b/generate/templates/manual/src/thread_pool.cc @@ -4,10 +4,14 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) { uv_mutex_init(&workMutex); uv_sem_init(&workSemaphore, 0); - uv_async_init(loop, &loopAsync, RunLoopCallbacks); - loopAsync.data = this; - uv_unref((uv_handle_t *)&loopAsync); - uv_mutex_init(&loopMutex); + uv_async_init(loop, &completionAsync, RunCompletionCallbacks); + completionAsync.data = this; + uv_unref((uv_handle_t *)&completionAsync); + uv_mutex_init(&completionMutex); + + uv_async_init(loop, &reverseAsync, RunReverseCallbacks); + reverseAsync.data = this; + uv_mutex_init(&reverseMutex); workInProgressCount = 0; @@ -17,17 +21,31 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) { } } -void ThreadPool::QueueWork(Callback workCallback, Callback loopCallback, void *data) { +void ThreadPool::QueueWork(Callback workCallback, Callback completionCallback, void *data) { uv_mutex_lock(&workMutex); // there is work on the thread pool - reference the handle so // node doesn't terminate - uv_ref((uv_handle_t *)&loopAsync); - workQueue.push(Work(workCallback, loopCallback, data)); + uv_ref((uv_handle_t *)&completionAsync); + workQueue.push(Work(workCallback, completionCallback, data)); workInProgressCount++; uv_mutex_unlock(&workMutex); uv_sem_post(&workSemaphore); } +void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) { + // push the callback into the queue + uv_mutex_lock(&reverseMutex); + ReverseCall reverseCall(reverseCallback, data); + bool queueWasEmpty = reverseQueue.empty(); + reverseQueue.push(reverseCall); + // we only trigger RunReverseCallbacks via the reverseAsync handle if the queue + // was empty. Otherwise, we depend on RunReverseCallbacks to re-trigger itself + if (queueWasEmpty) { + uv_async_send(&reverseAsync); + } + uv_mutex_unlock(&reverseMutex); +} + void ThreadPool::RunEventQueue(void *threadPool) { static_cast(threadPool)->RunEventQueue(); } @@ -46,40 +64,62 @@ void ThreadPool::RunEventQueue() { (*work.workCallback)(work.data); // schedule the callback on the loop - uv_mutex_lock(&loopMutex); - loopQueue.push(work); - uv_mutex_unlock(&loopMutex); - uv_async_send(&loopAsync); + uv_mutex_lock(&completionMutex); + completionQueue.push(work); + uv_mutex_unlock(&completionMutex); + uv_async_send(&completionAsync); } } -void ThreadPool::RunLoopCallbacks(uv_async_t* handle) { - static_cast(handle->data)->RunLoopCallbacks(); +void ThreadPool::RunCompletionCallbacks(uv_async_t* handle) { + static_cast(handle->data)->RunCompletionCallbacks(); } -void ThreadPool::RunLoopCallbacks() { +void ThreadPool::RunCompletionCallbacks() { // uv_async_send can coalesce calls, so we are not guaranteed one - // RunLoopCallbacks per uv_async_send call - // so we always process the entire loopQueue + // RunCompletionCallbacks per uv_async_send call + // so we always process the entire completionQueue int callbacksCompleted = 0; - uv_mutex_lock(&loopMutex); - while(!loopQueue.empty()) { - Work work = loopQueue.front(); - loopQueue.pop(); - uv_mutex_unlock(&loopMutex); + uv_mutex_lock(&completionMutex); + while(!completionQueue.empty()) { + Work work = completionQueue.front(); + completionQueue.pop(); + uv_mutex_unlock(&completionMutex); // perform the queued loop callback - (*work.loopCallback)(work.data); + (*work.completionCallback)(work.data); callbacksCompleted++; - uv_mutex_lock(&loopMutex); + uv_mutex_lock(&completionMutex); } - uv_mutex_unlock(&loopMutex); + uv_mutex_unlock(&completionMutex); uv_mutex_lock(&workMutex); // if there is no ongoing work / completion processing, node doesn't need // to be prevented from terminating workInProgressCount -= callbacksCompleted; if(!workInProgressCount) { - uv_unref((uv_handle_t *)&loopAsync); + uv_unref((uv_handle_t *)&completionAsync); } uv_mutex_unlock(&workMutex); } + +void ThreadPool::RunReverseCallbacks(uv_async_t* handle) { + static_cast(handle->data)->RunReverseCallbacks(); +} + +void ThreadPool::RunReverseCallbacks() { + // get the next callback to run + uv_mutex_lock(&reverseMutex); + ReverseCall reverseCall = reverseQueue.front(); + uv_mutex_unlock(&reverseMutex); + + // execute callback + (*reverseCall.reverseCallback)(reverseCall.data); + + // pop the queue, and if necessary, re-trigger RunReverseCallbacks + uv_mutex_lock(&reverseMutex); + reverseQueue.pop(); + if (!reverseQueue.empty()) { + uv_async_send(&reverseAsync); + } + uv_mutex_unlock(&reverseMutex); +} From c0c383fc3a82e3a2028b7e7e92f00aa11c8dc56f Mon Sep 17 00:00:00 2001 From: Stjepan Rajko Date: Mon, 30 Jan 2017 13:42:06 -0700 Subject: [PATCH 2/5] Use ExecuteReverseCallback --- .../templates/manual/include/async_baton.h | 28 +++++++++++-------- .../templates/manual/include/thread_pool.h | 4 +++ .../templates/partials/callback_helpers.cc | 12 ++++---- .../templates/partials/field_accessors.cc | 14 ++++------ generate/templates/templates/class_header.h | 2 +- generate/templates/templates/struct_header.h | 2 +- 6 files changed, 33 insertions(+), 29 deletions(-) diff --git a/generate/templates/manual/include/async_baton.h b/generate/templates/manual/include/async_baton.h index fee87c4c1..cfae13b31 100644 --- a/generate/templates/manual/include/async_baton.h +++ b/generate/templates/manual/include/async_baton.h @@ -5,15 +5,13 @@ #include #include "lock_master.h" -#include "functions/sleep_for_ms.h" +#include "nodegit.h" // Base class for Batons used for callbacks (for example, // JS functions passed as callback parameters, // or field properties of configuration objects whose values are callbacks) struct AsyncBaton { - uv_async_t req; - - bool done; + uv_sem_t semaphore; }; template @@ -23,22 +21,28 @@ struct AsyncBatonWithResult : public AsyncBaton { AsyncBatonWithResult(const ResultT &defaultResult) : defaultResult(defaultResult) { + uv_sem_init(&semaphore, 0); + } + + ~AsyncBatonWithResult() { + uv_sem_destroy(&semaphore); + } + + void Done() { + // signal completion + uv_sem_post(&semaphore); } - ResultT ExecuteAsync(uv_async_cb asyncCallback) { + ResultT ExecuteAsync(ThreadPool::Callback asyncCallback) { result = 0; - req.data = this; - done = false; - uv_async_init(uv_default_loop(), &req, asyncCallback); { LockMaster::TemporaryUnlock temporaryUnlock; - uv_async_send(&req); + libgit2ThreadPool.ExecuteReverseCallback(asyncCallback, this); - while(!done) { - sleep_for_ms(1); - } + // wait for completion + uv_sem_wait(&semaphore); } return result; diff --git a/generate/templates/manual/include/thread_pool.h b/generate/templates/manual/include/thread_pool.h index bcb3590d7..a1eeabbfd 100644 --- a/generate/templates/manual/include/thread_pool.h +++ b/generate/templates/manual/include/thread_pool.h @@ -5,7 +5,10 @@ #include class ThreadPool { +public: typedef void (*Callback) (void *); + +private: struct Work { Callback workCallback; Callback completionCallback; @@ -47,6 +50,7 @@ class ThreadPool { void RunCompletionCallbacks(); static void RunReverseCallbacks(uv_async_t *handle); void RunReverseCallbacks(); + public: // Initializes thread pool and spins up the requested number of threads // The provided loop will be used for completion callbacks, whenever diff --git a/generate/templates/partials/callback_helpers.cc b/generate/templates/partials/callback_helpers.cc index c4b435787..1a1e6b1be 100644 --- a/generate/templates/partials/callback_helpers.cc +++ b/generate/templates/partials/callback_helpers.cc @@ -13,13 +13,13 @@ baton->{{ arg.name }} = {{ arg.name }}; {% endeach %} - return baton->ExecuteAsync((uv_async_cb) {{ cppFunctionName }}_{{ cbFunction.name }}_async); + return baton->ExecuteAsync({{ cppFunctionName }}_{{ cbFunction.name }}_async); } -void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(uv_async_t* req, int status) { +void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(void *untypedBaton) { Nan::HandleScope scope; - {{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton* baton = static_cast<{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton*>(req->data); + {{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton* baton = static_cast<{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton*>(untypedBaton); {% each cbFunction.args|argsInfo as arg %} {% if arg | isPayload %} @@ -57,8 +57,6 @@ void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(uv_as Nan::TryCatch tryCatch; Local result = callback->Call({{ cbFunction.args|jsArgsCount }}, argv); - uv_close((uv_handle_t*) &baton->req, NULL); - if(PromiseCompletion::ForwardIfPromise(result, baton, {{ cppFunctionName }}_{{ cbFunction.name }}_promiseCompleted)) { return; } @@ -88,7 +86,7 @@ void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(uv_as } {% endeach %} - baton->done = true; + baton->Done(); } void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_promiseCompleted(bool isFulfilled, AsyncBaton *_baton, v8::Local result) { @@ -132,7 +130,7 @@ void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_promiseComp baton->result = {{ cbFunction.return.error }}; } - baton->done = true; + baton->Done(); } {%endif%} {%endeach%} diff --git a/generate/templates/partials/field_accessors.cc b/generate/templates/partials/field_accessors.cc index 0098d06b4..ca7f3781b 100644 --- a/generate/templates/partials/field_accessors.cc +++ b/generate/templates/partials/field_accessors.cc @@ -124,13 +124,13 @@ return baton->defaultResult; } - return baton->ExecuteAsync((uv_async_cb) {{ field.name }}_async); + return baton->ExecuteAsync({{ field.name }}_async); } - void {{ cppClassName }}::{{ field.name }}_async(uv_async_t* req, int status) { + void {{ cppClassName }}::{{ field.name }}_async(void *untypedBaton) { Nan::HandleScope scope; - {{ field.name|titleCase }}Baton* baton = static_cast<{{ field.name|titleCase }}Baton*>(req->data); + {{ field.name|titleCase }}Baton* baton = static_cast<{{ field.name|titleCase }}Baton*>(untypedBaton); {{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(baton); if (instance->{{ field.name }}.GetCallback()->IsEmpty()) { @@ -138,7 +138,7 @@ baton->result = baton->defaultResult; // no results acquired {% endif %} - baton->done = true; + baton->Done(); return; } @@ -179,8 +179,6 @@ Nan::TryCatch tryCatch; Local result = instance->{{ field.name }}.GetCallback()->Call({{ field.args|jsArgsCount }}, argv); - uv_close((uv_handle_t*) &baton->req, NULL); - if(PromiseCompletion::ForwardIfPromise(result, baton, {{ cppClassName }}::{{ field.name }}_promiseCompleted)) { return; } @@ -209,7 +207,7 @@ baton->result = baton->defaultResult; } {% endeach %} - baton->done = true; + baton->Done(); } void {{ cppClassName }}::{{ field.name }}_promiseCompleted(bool isFulfilled, AsyncBaton *_baton, v8::Local result) { @@ -253,7 +251,7 @@ baton->result = {{ field.return.error }}; } - baton->done = true; + baton->Done(); } {% endif %} {% endif %} diff --git a/generate/templates/templates/class_header.h b/generate/templates/templates/class_header.h index aff51243a..6f19f517f 100644 --- a/generate/templates/templates/class_header.h +++ b/generate/templates/templates/class_header.h @@ -67,7 +67,7 @@ class {{ cppClassName }} : public {% endeach %} ); - static void {{ function.cppFunctionName }}_{{ arg.name }}_async(uv_async_t* req, int status); + static void {{ function.cppFunctionName }}_{{ arg.name }}_async(void *baton); static void {{ function.cppFunctionName }}_{{ arg.name }}_promiseCompleted(bool isFulfilled, AsyncBaton *_baton, v8::Local result); struct {{ function.cppFunctionName }}_{{ arg.name|titleCase }}Baton : public AsyncBatonWithResult<{{ arg.return.type }}> { {% each arg.args|argsInfo as cbArg %} diff --git a/generate/templates/templates/struct_header.h b/generate/templates/templates/struct_header.h index 122a55c0a..dd8f8570e 100644 --- a/generate/templates/templates/struct_header.h +++ b/generate/templates/templates/struct_header.h @@ -44,7 +44,7 @@ class {{ cppClassName }} : public NodeGitWrapper<{{ cppClassName }}Traits> { {% endeach %} ); - static void {{ field.name }}_async(uv_async_t* req, int status); + static void {{ field.name }}_async(void *baton); static void {{ field.name }}_promiseCompleted(bool isFulfilled, AsyncBaton *_baton, v8::Local result); struct {{ field.name|titleCase }}Baton : public AsyncBatonWithResult<{{ field.return.type }}> { {% each field.args|argsInfo as arg %} From 884900ff362413d663522bdd8218f5395ef134fd Mon Sep 17 00:00:00 2001 From: Stjepan Rajko Date: Mon, 30 Jan 2017 16:13:20 -0700 Subject: [PATCH 3/5] Allocate baton on stack --- generate/templates/partials/callback_helpers.cc | 7 +++---- generate/templates/partials/field_accessors.cc | 11 +++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/generate/templates/partials/callback_helpers.cc b/generate/templates/partials/callback_helpers.cc index 1a1e6b1be..acd44f424 100644 --- a/generate/templates/partials/callback_helpers.cc +++ b/generate/templates/partials/callback_helpers.cc @@ -6,14 +6,13 @@ {{ arg.cType }} {{ arg.name}}{% if not arg.lastArg %},{% endif %} {% endeach %} ) { - {{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton* baton = - new {{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton({{ cbFunction.return.noResults }}); + {{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton baton({{ cbFunction.return.noResults }}); {% each cbFunction.args|argsInfo as arg %} - baton->{{ arg.name }} = {{ arg.name }}; + baton.{{ arg.name }} = {{ arg.name }}; {% endeach %} - return baton->ExecuteAsync({{ cppFunctionName }}_{{ cbFunction.name }}_async); + return baton.ExecuteAsync({{ cppFunctionName }}_{{ cbFunction.name }}_async); } void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(void *untypedBaton) { diff --git a/generate/templates/partials/field_accessors.cc b/generate/templates/partials/field_accessors.cc index ca7f3781b..f9f9065a0 100644 --- a/generate/templates/partials/field_accessors.cc +++ b/generate/templates/partials/field_accessors.cc @@ -111,20 +111,19 @@ {{ arg.cType }} {{ arg.name}}{% if not arg.lastArg %},{% endif %} {% endeach %} ) { - {{ field.name|titleCase }}Baton* baton = - new {{ field.name|titleCase }}Baton({{ field.return.noResults }}); + {{ field.name|titleCase }}Baton baton({{ field.return.noResults }}); {% each field.args|argsInfo as arg %} - baton->{{ arg.name }} = {{ arg.name }}; + baton.{{ arg.name }} = {{ arg.name }}; {% endeach %} - {{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(baton); + {{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(&baton); if (instance->{{ field.name }}.WillBeThrottled()) { - return baton->defaultResult; + return baton.defaultResult; } - return baton->ExecuteAsync({{ field.name }}_async); + return baton.ExecuteAsync({{ field.name }}_async); } void {{ cppClassName }}::{{ field.name }}_async(void *untypedBaton) { From acb6c41a7a82972f67cbcaf888febceff03a3fa3 Mon Sep 17 00:00:00 2001 From: Stjepan Rajko Date: Mon, 30 Jan 2017 16:17:05 -0700 Subject: [PATCH 4/5] Remove sleep_for_ms --- .../manual/include/functions/sleep_for_ms.h | 6 ------ .../manual/src/functions/sleep_for_ms.cc | 16 ---------------- generate/templates/templates/binding.gyp | 1 - 3 files changed, 23 deletions(-) delete mode 100644 generate/templates/manual/include/functions/sleep_for_ms.h delete mode 100644 generate/templates/manual/src/functions/sleep_for_ms.cc diff --git a/generate/templates/manual/include/functions/sleep_for_ms.h b/generate/templates/manual/include/functions/sleep_for_ms.h deleted file mode 100644 index 903299268..000000000 --- a/generate/templates/manual/include/functions/sleep_for_ms.h +++ /dev/null @@ -1,6 +0,0 @@ -#ifndef SLEEP_FOR_MS_H -#define SLEEP_FOR_MS_H - -void sleep_for_ms(int milliseconds); - -#endif diff --git a/generate/templates/manual/src/functions/sleep_for_ms.cc b/generate/templates/manual/src/functions/sleep_for_ms.cc deleted file mode 100644 index 11b6a72f6..000000000 --- a/generate/templates/manual/src/functions/sleep_for_ms.cc +++ /dev/null @@ -1,16 +0,0 @@ -#ifdef WIN32 -#include -#else -#include -#endif // win32 - -void sleep_for_ms(int milliseconds) { - #ifdef WIN32 - Sleep(milliseconds); - #else - struct timespec t; - t.tv_sec = 0; - t.tv_nsec = milliseconds * 1000000; // 1 milliseconds == 1,000,000 nanoseconds - nanosleep(&t, NULL); - #endif -} diff --git a/generate/templates/templates/binding.gyp b/generate/templates/templates/binding.gyp index 457221dcd..142ae7f05 100644 --- a/generate/templates/templates/binding.gyp +++ b/generate/templates/templates/binding.gyp @@ -18,7 +18,6 @@ "src/promise_completion.cc", "src/wrapper.cc", "src/functions/copy.cc", - "src/functions/sleep_for_ms.cc", "src/convenient_patch.cc", "src/convenient_hunk.cc", "src/str_array_converter.cc", From 6d7762b9ce475bfa5db9249598f6629db636d458 Mon Sep 17 00:00:00 2001 From: Stjepan Rajko Date: Mon, 6 Feb 2017 08:22:12 -0700 Subject: [PATCH 5/5] Unreference handle to allow node to terminate --- generate/templates/manual/src/thread_pool.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/generate/templates/manual/src/thread_pool.cc b/generate/templates/manual/src/thread_pool.cc index c89da76a3..d8bedc96e 100644 --- a/generate/templates/manual/src/thread_pool.cc +++ b/generate/templates/manual/src/thread_pool.cc @@ -11,6 +11,7 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) { uv_async_init(loop, &reverseAsync, RunReverseCallbacks); reverseAsync.data = this; + uv_unref((uv_handle_t *)&reverseAsync); uv_mutex_init(&reverseMutex); workInProgressCount = 0;