Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions 31 generate/templates/manual/include/async_baton.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,48 @@
// or field properties of configuration objects whose values are callbacks)
struct AsyncBaton {
uv_sem_t semaphore;

virtual ~AsyncBaton() {}
};

void deleteBaton(AsyncBaton *baton);

template<typename ResultT>
struct AsyncBatonWithResult : public AsyncBaton {
ResultT result;
ResultT defaultResult; // result returned if the callback doesn't return anything valid
void (*onCompletion)(AsyncBaton *);

AsyncBatonWithResult(const ResultT &defaultResult)
: defaultResult(defaultResult) {
uv_sem_init(&semaphore, 0);
}

~AsyncBatonWithResult() {
uv_sem_destroy(&semaphore);
}

void Done() {
// signal completion
uv_sem_post(&semaphore);
if (onCompletion) {
onCompletion(this);
} else {
// signal completion
uv_sem_post(&semaphore);
}
}

ResultT ExecuteAsync(ThreadPool::Callback asyncCallback) {
ResultT ExecuteAsync(ThreadPool::Callback asyncCallback, void (*onCompletion)(AsyncBaton *) = NULL) {
result = 0;
this->onCompletion = onCompletion;
if (!onCompletion) {
uv_sem_init(&semaphore, 0);
}

{
LockMaster::TemporaryUnlock temporaryUnlock;

libgit2ThreadPool.ExecuteReverseCallback(asyncCallback, this);

// wait for completion
uv_sem_wait(&semaphore);
if (!onCompletion) {
// wait for completion
uv_sem_wait(&semaphore);
uv_sem_destroy(&semaphore);
}
}

return result;
Expand Down
12 changes: 11 additions & 1 deletion 12 generate/templates/manual/include/callback_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ class CallbackWrapper {
int throttle; // in milliseconds - if > 0, calls to the JS callback will be throttled
uint64_t lastCallTime;

// false will trigger the callback and not wait for the callback to finish
// in this case, the underlying libgit2 function will immediately be given
// the default result
bool waitForResult;

public:
CallbackWrapper() {
jsCallback = NULL;
Expand All @@ -33,12 +38,17 @@ class CallbackWrapper {
return jsCallback;
}

void SetCallback(Nan::Callback* callback, int throttle = 0) {
void SetCallback(Nan::Callback* callback, int throttle = 0, bool waitForResult = true) {
if(jsCallback) {
delete jsCallback;
}
jsCallback = callback;
this->throttle = throttle;
this->waitForResult = waitForResult;
}

bool ShouldWaitForResult() {
return waitForResult;
}

bool WillBeThrottled() {
Expand Down
31 changes: 13 additions & 18 deletions 31 generate/templates/manual/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ class ThreadPool {
}
};

struct ReverseCall {
Callback reverseCallback;
struct LoopCallback {
Callback callback;
void *data;
bool isWork;

ReverseCall(Callback reverseCallback, void *data)
: reverseCallback(reverseCallback), data(data) {
LoopCallback(Callback callback, void *data, bool isWork)
: callback(callback), data(data), isWork(isWork) {
}
};

Expand All @@ -34,22 +35,17 @@ class ThreadPool {
uv_sem_t workSemaphore;
int workInProgressCount;

// completion callbacks to be performed on the loop
std::queue<Work> completionQueue;
uv_mutex_t completionMutex;
uv_async_t completionAsync;

// async callback made from the threadpool, executed in the loop
std::queue<ReverseCall> reverseQueue;
uv_mutex_t reverseMutex;
uv_async_t reverseAsync;
// completion and async callbacks to be performed on the loop
std::queue<LoopCallback> loopQueue;
uv_mutex_t loopMutex;
uv_async_t loopAsync;

static void RunEventQueue(void *threadPool);
void RunEventQueue();
static void RunCompletionCallbacks(uv_async_t* handle);
void RunCompletionCallbacks();
static void RunReverseCallbacks(uv_async_t *handle);
void RunReverseCallbacks();
static void RunLoopCallbacks(uv_async_t* handle);
void RunLoopCallbacks();

void QueueLoopCallback(Callback callback, void *data, bool isWork);

public:
// Initializes thread pool and spins up the requested number of threads
Expand All @@ -61,7 +57,6 @@ class ThreadPool {
// QueueWork should be called on the loop provided in the constructor.
void QueueWork(Callback workCallback, Callback completionCallback, void *data);
// Queues a callback on the loop provided in the constructor
// these block the calling thread's execution until the callback completes
void ExecuteReverseCallback(Callback reverseCallback, void *data);
};

Expand Down
5 changes: 5 additions & 0 deletions 5 generate/templates/manual/src/async_baton.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "../include/async_baton.h"

void deleteBaton(AsyncBaton *baton) {
delete baton;
}
112 changes: 44 additions & 68 deletions 112 generate/templates/manual/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) {
uv_mutex_init(&workMutex);
uv_sem_init(&workSemaphore, 0);

uv_async_init(loop, &completionAsync, RunCompletionCallbacks);
completionAsync.data = this;
uv_unref((uv_handle_t *)&completionAsync);
uv_mutex_init(&completionMutex);

uv_async_init(loop, &reverseAsync, RunReverseCallbacks);
reverseAsync.data = this;
uv_unref((uv_handle_t *)&reverseAsync);
uv_mutex_init(&reverseMutex);
uv_async_init(loop, &loopAsync, RunLoopCallbacks);
loopAsync.data = this;
uv_unref((uv_handle_t *)&loopAsync);
uv_mutex_init(&loopMutex);

workInProgressCount = 0;

Expand All @@ -26,25 +21,29 @@ void ThreadPool::QueueWork(Callback workCallback, Callback completionCallback, v
uv_mutex_lock(&workMutex);
// there is work on the thread pool - reference the handle so
// node doesn't terminate
uv_ref((uv_handle_t *)&completionAsync);
uv_ref((uv_handle_t *)&loopAsync);
workQueue.push(Work(workCallback, completionCallback, data));
workInProgressCount++;
uv_mutex_unlock(&workMutex);
uv_sem_post(&workSemaphore);
}

void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) {
void ThreadPool::QueueLoopCallback(Callback callback, void *data, bool isWork) {
// push the callback into the queue
uv_mutex_lock(&reverseMutex);
ReverseCall reverseCall(reverseCallback, data);
bool queueWasEmpty = reverseQueue.empty();
reverseQueue.push(reverseCall);
// we only trigger RunReverseCallbacks via the reverseAsync handle if the queue
// was empty. Otherwise, we depend on RunReverseCallbacks to re-trigger itself
uv_mutex_lock(&loopMutex);
LoopCallback loopCallback(callback, data, isWork);
bool queueWasEmpty = loopQueue.empty();
loopQueue.push(loopCallback);
// we only trigger RunLoopCallbacks via the loopAsync handle if the queue
// was empty. Otherwise, we depend on RunLoopCallbacks to re-trigger itself
if (queueWasEmpty) {
uv_async_send(&reverseAsync);
uv_async_send(&loopAsync);
}
uv_mutex_unlock(&reverseMutex);
uv_mutex_unlock(&loopMutex);
}

void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) {
QueueLoopCallback(reverseCallback, data, false);
}

void ThreadPool::RunEventQueue(void *threadPool) {
Expand All @@ -64,63 +63,40 @@ void ThreadPool::RunEventQueue() {
// perform the queued work
(*work.workCallback)(work.data);

// schedule the callback on the loop
uv_mutex_lock(&completionMutex);
completionQueue.push(work);
uv_mutex_unlock(&completionMutex);
uv_async_send(&completionAsync);
// schedule the completion callback on the loop
QueueLoopCallback(work.completionCallback, work.data, true);
}
}

void ThreadPool::RunCompletionCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunCompletionCallbacks();
void ThreadPool::RunLoopCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunLoopCallbacks();
}

void ThreadPool::RunCompletionCallbacks() {
// uv_async_send can coalesce calls, so we are not guaranteed one
// RunCompletionCallbacks per uv_async_send call
// so we always process the entire completionQueue
int callbacksCompleted = 0;
uv_mutex_lock(&completionMutex);
while(!completionQueue.empty()) {
Work work = completionQueue.front();
completionQueue.pop();
uv_mutex_unlock(&completionMutex);
// perform the queued loop callback
(*work.completionCallback)(work.data);
callbacksCompleted++;
uv_mutex_lock(&completionMutex);
void ThreadPool::RunLoopCallbacks() {
// get the next callback to run
uv_mutex_lock(&loopMutex);
LoopCallback loopCallback = loopQueue.front();
uv_mutex_unlock(&loopMutex);

// perform the queued loop callback
(*loopCallback.callback)(loopCallback.data);

// pop the queue, and if necessary, re-trigger RunLoopCallbacks
uv_mutex_lock(&loopMutex);
loopQueue.pop();
if (!loopQueue.empty()) {
uv_async_send(&loopAsync);
}
uv_mutex_unlock(&completionMutex);
uv_mutex_unlock(&loopMutex);

uv_mutex_lock(&workMutex);
// if there is no ongoing work / completion processing, node doesn't need
// to be prevented from terminating
workInProgressCount -= callbacksCompleted;
if(!workInProgressCount) {
uv_unref((uv_handle_t *)&completionAsync);
}
uv_mutex_unlock(&workMutex);
}

void ThreadPool::RunReverseCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunReverseCallbacks();
}

void ThreadPool::RunReverseCallbacks() {
// get the next callback to run
uv_mutex_lock(&reverseMutex);
ReverseCall reverseCall = reverseQueue.front();
uv_mutex_unlock(&reverseMutex);

// execute callback
(*reverseCall.reverseCallback)(reverseCall.data);

// pop the queue, and if necessary, re-trigger RunReverseCallbacks
uv_mutex_lock(&reverseMutex);
reverseQueue.pop();
if (!reverseQueue.empty()) {
uv_async_send(&reverseAsync);
if (loopCallback.isWork) {
uv_mutex_lock(&workMutex);
workInProgressCount --;
if(!workInProgressCount) {
uv_unref((uv_handle_t *)&loopAsync);
}
uv_mutex_unlock(&workMutex);
}
uv_mutex_unlock(&reverseMutex);
}
31 changes: 24 additions & 7 deletions 31 generate/templates/partials/field_accessors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
{% elsif field.isCallbackFunction %}
Nan::Callback *callback = NULL;
int throttle = {%if field.return.throttle %}{{ field.return.throttle }}{%else%}0{%endif%};
bool waitForResult = true;

if (value->IsFunction()) {
callback = new Nan::Callback(value.As<Function>());
Expand All @@ -59,13 +60,20 @@
Local<Value> objectCallback = maybeObjectCallback.ToLocalChecked();
if (objectCallback->IsFunction()) {
callback = new Nan::Callback(objectCallback.As<Function>());

Nan::MaybeLocal<Value> maybeObjectThrottle = Nan::Get(object, Nan::New("throttle").ToLocalChecked());
if(!maybeObjectThrottle.IsEmpty()) {
Local<Value> objectThrottle = maybeObjectThrottle.ToLocalChecked();
if (objectThrottle->IsNumber()) {
throttle = (int)objectThrottle.As<Number>()->Value();
}
}

Nan::MaybeLocal<Value> maybeObjectWaitForResult = Nan::Get(object, Nan::New("waitForResult").ToLocalChecked());
if(!maybeObjectWaitForResult.IsEmpty()) {
Local<Value> objectWaitForResult = maybeObjectWaitForResult.ToLocalChecked();
waitForResult = (bool)objectWaitForResult->BooleanValue();
}
}
}
}
Expand All @@ -74,7 +82,7 @@
wrapper->raw->{{ field.name }} = ({{ field.cType }}){{ field.name }}_cppCallback;
}

wrapper->{{ field.name }}.SetCallback(callback, throttle);
wrapper->{{ field.name }}.SetCallback(callback, throttle, waitForResult);
}

{% elsif field.payloadFor %}
Expand Down Expand Up @@ -111,19 +119,28 @@
{{ arg.cType }} {{ arg.name}}{% if not arg.lastArg %},{% endif %}
{% endeach %}
) {
{{ field.name|titleCase }}Baton baton({{ field.return.noResults }});
{{ field.name|titleCase }}Baton *baton =
new {{ field.name|titleCase }}Baton({{ field.return.noResults }});

{% each field.args|argsInfo as arg %}
baton.{{ arg.name }} = {{ arg.name }};
baton->{{ arg.name }} = {{ arg.name }};
{% endeach %}

{{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(&baton);
{{ cppClassName }}* instance = {{ field.name }}_getInstanceFromBaton(baton);

{{ field.return.type }} result;

if (instance->{{ field.name }}.WillBeThrottled()) {
return baton.defaultResult;
result = baton->defaultResult;
delete baton;
} else if (instance->{{ field.name }}.ShouldWaitForResult()) {
result = baton->ExecuteAsync({{ field.name }}_async);
delete baton;
} else {
result = baton->defaultResult;
baton->ExecuteAsync({{ field.name }}_async, deleteBaton);
}

return baton.ExecuteAsync({{ field.name }}_async);
return result;
}

void {{ cppClassName }}::{{ field.name }}_async(void *untypedBaton) {
Expand Down
1 change: 1 addition & 0 deletions 1 generate/templates/templates/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
},

"sources": [
"src/async_baton.cc",
"src/lock_master.cc",
"src/nodegit.cc",
"src/init_ssh2.cc",
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.