diff --git a/generate/templates/manual/include/async_libgit2_queue_worker.h b/generate/templates/manual/include/async_libgit2_queue_worker.h new file mode 100644 index 000000000..f3ddf2fb3 --- /dev/null +++ b/generate/templates/manual/include/async_libgit2_queue_worker.h @@ -0,0 +1,33 @@ +#ifndef ASYNC_LIBGIT2_QUEUE_WORKER_H +#define ASYNC_LIBGIT2_QUEUE_WORKER_H + +#include +#include +#include "../include/thread_pool.h" +#include "../include/nodegit.h" + + +// Runs WorkComplete of the scheduled AsyncWorker, +// and destroys it. This is run in the uv_default_loop event loop. +NAN_INLINE void AsyncLibgit2Complete (void* data) { + Nan::AsyncWorker *worker = static_cast(data); + worker->WorkComplete(); + worker->Destroy(); +} + +// Runs Execute of the scheduled AyncWorker on the dedicated libgit2 thread / +// event loop, and schedules the WorkComplete callback to run on the +// uv_default_loop event loop +NAN_INLINE void AsyncLibgit2Execute (void *vworker) { + // execute the worker + Nan::AsyncWorker *worker = static_cast(vworker); + worker->Execute(); +} + +// Schedules the AsyncWorker to run on the dedicated libgit2 thread / event loop, +// and on completion AsyncLibgit2Complete on the default loop +NAN_INLINE void AsyncLibgit2QueueWorker (Nan::AsyncWorker* worker) { + libgit2ThreadPool.QueueWork(AsyncLibgit2Execute, AsyncLibgit2Complete, worker); +} + +#endif diff --git a/generate/templates/manual/include/nodegit.h b/generate/templates/manual/include/nodegit.h new file mode 100644 index 000000000..4408b8ea7 --- /dev/null +++ b/generate/templates/manual/include/nodegit.h @@ -0,0 +1,8 @@ +#ifndef NODEGIT_H +#define NODEGIT_H + +#include "thread_pool.h" + +extern ThreadPool libgit2ThreadPool; + +#endif diff --git a/generate/templates/manual/include/thread_pool.h b/generate/templates/manual/include/thread_pool.h new file mode 100644 index 000000000..6d29b3ddf --- /dev/null +++ b/generate/templates/manual/include/thread_pool.h @@ -0,0 +1,45 @@ +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#include +#include + +class ThreadPool { + typedef void (*Callback) (void *); + struct Work { + Callback workCallback; + Callback loopCallback; + void *data; + + Work(Callback workCallback, Callback loopCallback, void *data) + : workCallback(workCallback), loopCallback(loopCallback), data(data) { + } + }; + + // work to be performed on the threadpool + std::queue workQueue; + uv_mutex_t workMutex; + uv_sem_t workSemaphore; + int workInProgressCount; + + // completion callbacks to be performed on the loop + std::queue loopQueue; + uv_mutex_t loopMutex; + uv_async_t loopAsync; + + static void RunEventQueue(void *threadPool); + void RunEventQueue(); + static void RunLoopCallbacks(uv_async_t* handle); + void RunLoopCallbacks(); +public: + // Initializes thread pool and spins up the requested number of threads + // The provided loop will be used for completion callbacks, whenever + // queued work is completed + ThreadPool(int numberOfThreads, uv_loop_t *loop); + // Queues work on the thread pool, followed by completion call scheduled + // on the loop provided in the constructor. + // QueueWork should be called on the loop provided in the constructor. + void QueueWork(Callback workCallback, Callback loopCallback, void *data); +}; + +#endif diff --git a/generate/templates/manual/src/thread_pool.cc b/generate/templates/manual/src/thread_pool.cc new file mode 100644 index 000000000..abf7a29a8 --- /dev/null +++ b/generate/templates/manual/src/thread_pool.cc @@ -0,0 +1,85 @@ +#include "../include/thread_pool.h" + +ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) { + uv_mutex_init(&workMutex); + uv_sem_init(&workSemaphore, 0); + + uv_async_init(loop, &loopAsync, RunLoopCallbacks); + loopAsync.data = this; + uv_unref((uv_handle_t *)&loopAsync); + uv_mutex_init(&loopMutex); + + workInProgressCount = 0; + + for(int i=0; i(threadPool)->RunEventQueue(); +} + +void ThreadPool::RunEventQueue() { + for ( ; ; ) { + // wait until there is work to do + uv_sem_wait(&workSemaphore); + uv_mutex_lock(&workMutex); + // the semaphore should guarantee that queue is not empty + Work work = workQueue.front(); + workQueue.pop(); + uv_mutex_unlock(&workMutex); + + // perform the queued work + (*work.workCallback)(work.data); + + // schedule the callback on the loop + uv_mutex_lock(&loopMutex); + loopQueue.push(work); + uv_mutex_unlock(&loopMutex); + uv_async_send(&loopAsync); + } +} + +void ThreadPool::RunLoopCallbacks(uv_async_t* handle) { + static_cast(handle->data)->RunLoopCallbacks(); +} + +void ThreadPool::RunLoopCallbacks() { + // uv_async_send can coalesce calls, so we are not guaranteed one + // RunLoopCallbacks per uv_async_send call + // so we always process the entire loopQueue + int callbacksCompleted = 0; + uv_mutex_lock(&loopMutex); + while(!loopQueue.empty()) { + Work work = loopQueue.front(); + loopQueue.pop(); + uv_mutex_unlock(&loopMutex); + // perform the queued loop callback + (*work.loopCallback)(work.data); + callbacksCompleted++; + uv_mutex_lock(&loopMutex); + } + uv_mutex_unlock(&loopMutex); + + uv_mutex_lock(&workMutex); + // if there is no ongoing work / completion processing, node doesn't need + // to be prevented from terminating + workInProgressCount -= callbacksCompleted; + if(!workInProgressCount) { + uv_unref((uv_handle_t *)&loopAsync); + } + uv_mutex_unlock(&workMutex); +} diff --git a/generate/templates/partials/async_function.cc b/generate/templates/partials/async_function.cc index 7b564adb2..edfae89df 100644 --- a/generate/templates/partials/async_function.cc +++ b/generate/templates/partials/async_function.cc @@ -74,7 +74,7 @@ NAN_METHOD({{ cppClassName }}::{{ cppFunctionName }}) { {%endif%} {%endeach%} - Nan::AsyncQueueWorker(worker); + AsyncLibgit2QueueWorker(worker); return; } diff --git a/generate/templates/templates/binding.gyp b/generate/templates/templates/binding.gyp index 9a2813579..dbc60874a 100644 --- a/generate/templates/templates/binding.gyp +++ b/generate/templates/templates/binding.gyp @@ -22,6 +22,7 @@ "src/convenient_patch.cc", "src/convenient_hunk.cc", "src/str_array_converter.cc", + "src/thread_pool.cc", {% each %} {% if type != "enum" %} "src/{{ name }}.cc", diff --git a/generate/templates/templates/class_content.cc b/generate/templates/templates/class_content.cc index b8084baa6..834361884 100644 --- a/generate/templates/templates/class_content.cc +++ b/generate/templates/templates/class_content.cc @@ -12,6 +12,7 @@ extern "C" { #include "../include/functions/copy.h" #include "../include/{{ filename }}.h" #include "nodegit_wrapper.cc" +#include "../include/async_libgit2_queue_worker.h" {% each dependencies as dependency %} #include "{{ dependency }}" diff --git a/generate/templates/templates/nodegit.cc b/generate/templates/templates/nodegit.cc index 189a4d622..2cde41c12 100644 --- a/generate/templates/templates/nodegit.cc +++ b/generate/templates/templates/nodegit.cc @@ -10,6 +10,7 @@ #include "../include/init_ssh2.h" #include "../include/lock_master.h" +#include "../include/nodegit.h" #include "../include/wrapper.h" #include "../include/promise_completion.h" #include "../include/functions/copy.h" @@ -80,6 +81,8 @@ void OpenSSL_ThreadSetup() { CRYPTO_set_id_callback(OpenSSL_IDCallback); } +ThreadPool libgit2ThreadPool(10, uv_default_loop()); + extern "C" void init(Local target) { // Initialize thread safety in openssl and libssh2 OpenSSL_ThreadSetup();