diff --git a/generate/templates/manual/include/async_baton.h b/generate/templates/manual/include/async_baton.h index cfae13b31..5f6874102 100644 --- a/generate/templates/manual/include/async_baton.h +++ b/generate/templates/manual/include/async_baton.h @@ -12,37 +12,48 @@ // or field properties of configuration objects whose values are callbacks) struct AsyncBaton { uv_sem_t semaphore; + + virtual ~AsyncBaton() {} }; +void deleteBaton(AsyncBaton *baton); + template struct AsyncBatonWithResult : public AsyncBaton { ResultT result; ResultT defaultResult; // result returned if the callback doesn't return anything valid + void (*onCompletion)(AsyncBaton *); AsyncBatonWithResult(const ResultT &defaultResult) : defaultResult(defaultResult) { - uv_sem_init(&semaphore, 0); - } - - ~AsyncBatonWithResult() { - uv_sem_destroy(&semaphore); } void Done() { - // signal completion - uv_sem_post(&semaphore); + if (onCompletion) { + onCompletion(this); + } else { + // signal completion + uv_sem_post(&semaphore); + } } - ResultT ExecuteAsync(ThreadPool::Callback asyncCallback) { + ResultT ExecuteAsync(ThreadPool::Callback asyncCallback, void (*onCompletion)(AsyncBaton *) = NULL) { result = 0; + this->onCompletion = onCompletion; + if (!onCompletion) { + uv_sem_init(&semaphore, 0); + } { LockMaster::TemporaryUnlock temporaryUnlock; libgit2ThreadPool.ExecuteReverseCallback(asyncCallback, this); - // wait for completion - uv_sem_wait(&semaphore); + if (!onCompletion) { + // wait for completion + uv_sem_wait(&semaphore); + uv_sem_destroy(&semaphore); + } } return result; diff --git a/generate/templates/manual/include/callback_wrapper.h b/generate/templates/manual/include/callback_wrapper.h index b23a7bb36..af53250c1 100644 --- a/generate/templates/manual/include/callback_wrapper.h +++ b/generate/templates/manual/include/callback_wrapper.h @@ -14,6 +14,11 @@ class CallbackWrapper { int throttle; // in milliseconds - if > 0, calls to the JS callback will be throttled uint64_t lastCallTime; + // false will trigger the callback and not wait for the callback to finish + // in this case, the underlying libgit2 function will immediately be given + // the default result + bool waitForResult; + public: CallbackWrapper() { jsCallback = NULL; @@ -33,12 +38,17 @@ class CallbackWrapper { return jsCallback; } - void SetCallback(Nan::Callback* callback, int throttle = 0) { + void SetCallback(Nan::Callback* callback, int throttle = 0, bool waitForResult = true) { if(jsCallback) { delete jsCallback; } jsCallback = callback; this->throttle = throttle; + this->waitForResult = waitForResult; + } + + bool ShouldWaitForResult() { + return waitForResult; } bool WillBeThrottled() { diff --git a/generate/templates/manual/include/thread_pool.h b/generate/templates/manual/include/thread_pool.h index a1eeabbfd..8a346028d 100644 --- a/generate/templates/manual/include/thread_pool.h +++ b/generate/templates/manual/include/thread_pool.h @@ -19,12 +19,13 @@ class ThreadPool { } }; - struct ReverseCall { - Callback reverseCallback; + struct LoopCallback { + Callback callback; void *data; + bool isWork; - ReverseCall(Callback reverseCallback, void *data) - : reverseCallback(reverseCallback), data(data) { + LoopCallback(Callback callback, void *data, bool isWork) + : callback(callback), data(data), isWork(isWork) { } }; @@ -34,22 +35,17 @@ class ThreadPool { uv_sem_t workSemaphore; int workInProgressCount; - // completion callbacks to be performed on the loop - std::queue completionQueue; - uv_mutex_t completionMutex; - uv_async_t completionAsync; - - // async callback made from the threadpool, executed in the loop - std::queue reverseQueue; - uv_mutex_t reverseMutex; - uv_async_t reverseAsync; + // completion and async callbacks to be performed on the loop + std::queue loopQueue; + uv_mutex_t loopMutex; + uv_async_t loopAsync; static void RunEventQueue(void *threadPool); void RunEventQueue(); - static void RunCompletionCallbacks(uv_async_t* handle); - void RunCompletionCallbacks(); - static void RunReverseCallbacks(uv_async_t *handle); - void RunReverseCallbacks(); + static void RunLoopCallbacks(uv_async_t* handle); + void RunLoopCallbacks(); + + void QueueLoopCallback(Callback callback, void *data, bool isWork); public: // Initializes thread pool and spins up the requested number of threads @@ -61,7 +57,6 @@ class ThreadPool { // QueueWork should be called on the loop provided in the constructor. void QueueWork(Callback workCallback, Callback completionCallback, void *data); // Queues a callback on the loop provided in the constructor - // these block the calling thread's execution until the callback completes void ExecuteReverseCallback(Callback reverseCallback, void *data); }; diff --git a/generate/templates/manual/src/async_baton.cc b/generate/templates/manual/src/async_baton.cc new file mode 100644 index 000000000..590a19c62 --- /dev/null +++ b/generate/templates/manual/src/async_baton.cc @@ -0,0 +1,5 @@ +#include "../include/async_baton.h" + +void deleteBaton(AsyncBaton *baton) { + delete baton; +} diff --git a/generate/templates/manual/src/thread_pool.cc b/generate/templates/manual/src/thread_pool.cc index d8bedc96e..7eadf0421 100644 --- a/generate/templates/manual/src/thread_pool.cc +++ b/generate/templates/manual/src/thread_pool.cc @@ -4,15 +4,10 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) { uv_mutex_init(&workMutex); uv_sem_init(&workSemaphore, 0); - uv_async_init(loop, &completionAsync, RunCompletionCallbacks); - completionAsync.data = this; - uv_unref((uv_handle_t *)&completionAsync); - uv_mutex_init(&completionMutex); - - uv_async_init(loop, &reverseAsync, RunReverseCallbacks); - reverseAsync.data = this; - uv_unref((uv_handle_t *)&reverseAsync); - uv_mutex_init(&reverseMutex); + uv_async_init(loop, &loopAsync, RunLoopCallbacks); + loopAsync.data = this; + uv_unref((uv_handle_t *)&loopAsync); + uv_mutex_init(&loopMutex); workInProgressCount = 0; @@ -26,25 +21,29 @@ void ThreadPool::QueueWork(Callback workCallback, Callback completionCallback, v uv_mutex_lock(&workMutex); // there is work on the thread pool - reference the handle so // node doesn't terminate - uv_ref((uv_handle_t *)&completionAsync); + uv_ref((uv_handle_t *)&loopAsync); workQueue.push(Work(workCallback, completionCallback, data)); workInProgressCount++; uv_mutex_unlock(&workMutex); uv_sem_post(&workSemaphore); } -void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) { +void ThreadPool::QueueLoopCallback(Callback callback, void *data, bool isWork) { // push the callback into the queue - uv_mutex_lock(&reverseMutex); - ReverseCall reverseCall(reverseCallback, data); - bool queueWasEmpty = reverseQueue.empty(); - reverseQueue.push(reverseCall); - // we only trigger RunReverseCallbacks via the reverseAsync handle if the queue - // was empty. Otherwise, we depend on RunReverseCallbacks to re-trigger itself + uv_mutex_lock(&loopMutex); + LoopCallback loopCallback(callback, data, isWork); + bool queueWasEmpty = loopQueue.empty(); + loopQueue.push(loopCallback); + // we only trigger RunLoopCallbacks via the loopAsync handle if the queue + // was empty. Otherwise, we depend on RunLoopCallbacks to re-trigger itself if (queueWasEmpty) { - uv_async_send(&reverseAsync); + uv_async_send(&loopAsync); } - uv_mutex_unlock(&reverseMutex); + uv_mutex_unlock(&loopMutex); +} + +void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) { + QueueLoopCallback(reverseCallback, data, false); } void ThreadPool::RunEventQueue(void *threadPool) { @@ -64,63 +63,40 @@ void ThreadPool::RunEventQueue() { // perform the queued work (*work.workCallback)(work.data); - // schedule the callback on the loop - uv_mutex_lock(&completionMutex); - completionQueue.push(work); - uv_mutex_unlock(&completionMutex); - uv_async_send(&completionAsync); + // schedule the completion callback on the loop + QueueLoopCallback(work.completionCallback, work.data, true); } } -void ThreadPool::RunCompletionCallbacks(uv_async_t* handle) { - static_cast(handle->data)->RunCompletionCallbacks(); +void ThreadPool::RunLoopCallbacks(uv_async_t* handle) { + static_cast(handle->data)->RunLoopCallbacks(); } -void ThreadPool::RunCompletionCallbacks() { - // uv_async_send can coalesce calls, so we are not guaranteed one - // RunCompletionCallbacks per uv_async_send call - // so we always process the entire completionQueue - int callbacksCompleted = 0; - uv_mutex_lock(&completionMutex); - while(!completionQueue.empty()) { - Work work = completionQueue.front(); - completionQueue.pop(); - uv_mutex_unlock(&completionMutex); - // perform the queued loop callback - (*work.completionCallback)(work.data); - callbacksCompleted++; - uv_mutex_lock(&completionMutex); +void ThreadPool::RunLoopCallbacks() { + // get the next callback to run + uv_mutex_lock(&loopMutex); + LoopCallback loopCallback = loopQueue.front(); + uv_mutex_unlock(&loopMutex); + + // perform the queued loop callback + (*loopCallback.callback)(loopCallback.data); + + // pop the queue, and if necessary, re-trigger RunLoopCallbacks + uv_mutex_lock(&loopMutex); + loopQueue.pop(); + if (!loopQueue.empty()) { + uv_async_send(&loopAsync); } - uv_mutex_unlock(&completionMutex); + uv_mutex_unlock(&loopMutex); - uv_mutex_lock(&workMutex); // if there is no ongoing work / completion processing, node doesn't need // to be prevented from terminating - workInProgressCount -= callbacksCompleted; - if(!workInProgressCount) { - uv_unref((uv_handle_t *)&completionAsync); - } - uv_mutex_unlock(&workMutex); -} - -void ThreadPool::RunReverseCallbacks(uv_async_t* handle) { - static_cast(handle->data)->RunReverseCallbacks(); -} - -void ThreadPool::RunReverseCallbacks() { - // get the next callback to run - uv_mutex_lock(&reverseMutex); - ReverseCall reverseCall = reverseQueue.front(); - uv_mutex_unlock(&reverseMutex); - - // execute callback - (*reverseCall.reverseCallback)(reverseCall.data); - - // pop the queue, and if necessary, re-trigger RunReverseCallbacks - uv_mutex_lock(&reverseMutex); - reverseQueue.pop(); - if (!reverseQueue.empty()) { - uv_async_send(&reverseAsync); + if (loopCallback.isWork) { + uv_mutex_lock(&workMutex); + workInProgressCount --; + if(!workInProgressCount) { + uv_unref((uv_handle_t *)&loopAsync); + } + uv_mutex_unlock(&workMutex); } - uv_mutex_unlock(&reverseMutex); } diff --git a/generate/templates/partials/field_accessors.cc b/generate/templates/partials/field_accessors.cc index f9f9065a0..ab93414ed 100644 --- a/generate/templates/partials/field_accessors.cc +++ b/generate/templates/partials/field_accessors.cc @@ -48,6 +48,7 @@ {% elsif field.isCallbackFunction %} Nan::Callback *callback = NULL; int throttle = {%if field.return.throttle %}{{ field.return.throttle }}{%else%}0{%endif%}; + bool waitForResult = true; if (value->IsFunction()) { callback = new Nan::Callback(value.As()); @@ -59,6 +60,7 @@ Local objectCallback = maybeObjectCallback.ToLocalChecked(); if (objectCallback->IsFunction()) { callback = new Nan::Callback(objectCallback.As()); + Nan::MaybeLocal maybeObjectThrottle = Nan::Get(object, Nan::New("throttle").ToLocalChecked()); if(!maybeObjectThrottle.IsEmpty()) { Local objectThrottle = maybeObjectThrottle.ToLocalChecked(); @@ -66,6 +68,12 @@ throttle = (int)objectThrottle.As()->Value(); } } + + Nan::MaybeLocal maybeObjectWaitForResult = Nan::Get(object, Nan::New("waitForResult").ToLocalChecked()); + if(!maybeObjectWaitForResult.IsEmpty()) { + Local objectWaitForResult = maybeObjectWaitForResult.ToLocalChecked(); + waitForResult = (bool)objectWaitForResult->BooleanValue(); + } } } } @@ -74,7 +82,7 @@ wrapper->raw->{{ field.name }} = ({{ field.cType }}){{ field.name }}_cppCallback; } - wrapper->{{ field.name }}.SetCallback(callback, throttle); + wrapper->{{ field.name }}.SetCallback(callback, throttle, waitForResult); } {% elsif field.payloadFor %} @@ -111,19 +119,28 @@ {{ arg.cType }} {{ arg.name}}{% if not arg.lastArg %},{% endif %} {% endeach %} ) { - {{ field.name|titleCase }}Baton baton({{ field.return.noResults }}); + {{ field.name|titleCase }}Baton *baton = + new {{ field.name|titleCase }}Baton({{ field.return.noResults }}); {% each field.args|argsInfo as arg %} - baton.{{ arg.name }} = {{ arg.name }}; + baton->{{ arg.name }} = {{ arg.name }}; {% endeach %} - {{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(&baton); + {{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(baton); + + {{ field.return.type }} result; if (instance->{{ field.name }}.WillBeThrottled()) { - return baton.defaultResult; + result = baton->defaultResult; + delete baton; + } else if (instance->{{ field.name }}.ShouldWaitForResult()) { + result = baton->ExecuteAsync({{ field.name }}_async); + delete baton; + } else { + result = baton->defaultResult; + baton->ExecuteAsync({{ field.name }}_async, deleteBaton); } - - return baton.ExecuteAsync({{ field.name }}_async); + return result; } void {{ cppClassName }}::{{ field.name }}_async(void *untypedBaton) { diff --git a/generate/templates/templates/binding.gyp b/generate/templates/templates/binding.gyp index ece88cd15..10b9b456a 100644 --- a/generate/templates/templates/binding.gyp +++ b/generate/templates/templates/binding.gyp @@ -12,6 +12,7 @@ }, "sources": [ + "src/async_baton.cc", "src/lock_master.cc", "src/nodegit.cc", "src/init_ssh2.cc", diff --git a/test/tests/clone.js b/test/tests/clone.js index c7645357d..48205d03d 100644 --- a/test/tests/clone.js +++ b/test/tests/clone.js @@ -166,6 +166,37 @@ describe("Clone", function() { }); }); + it("can clone without waiting for callback results", function() { + var test = this; + var url = "https://github.com/nodegit/test.git"; + var lastReceivedObjects = 0; + var cloneFinished = false; + var opts = { + fetchOpts: { + callbacks: { + transferProgress: { + waitForResult: false, + callback: function(progress) { + var receivedObjects = progress.receivedObjects(); + assert.false( + cloneFinished, + "callback running after clone completion" + ); + assert.gt(receivedObjects, lastReceivedObjects); + lastReceivedObjects = receivedObjects; + } + } + } + } + }; + + return Clone(url, clonePath, opts).then(function(repo) { + assert.ok(repo instanceof Repository); + cloneFinished = true; + test.repository = repo; + }); + }); + it("can clone using nested function", function() { var test = this; var url = "https://github.com/nodegit/test.git";