Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions 28 generate/templates/manual/include/async_baton.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
#include <nan.h>

#include "lock_master.h"
#include "functions/sleep_for_ms.h"
#include "nodegit.h"

// Base class for Batons used for callbacks (for example,
// JS functions passed as callback parameters,
// or field properties of configuration objects whose values are callbacks)
struct AsyncBaton {
uv_async_t req;

bool done;
uv_sem_t semaphore;
};

template<typename ResultT>
Expand All @@ -23,22 +21,28 @@ struct AsyncBatonWithResult : public AsyncBaton {

AsyncBatonWithResult(const ResultT &defaultResult)
: defaultResult(defaultResult) {
uv_sem_init(&semaphore, 0);
}

~AsyncBatonWithResult() {
uv_sem_destroy(&semaphore);
}

void Done() {
// signal completion
uv_sem_post(&semaphore);
}

ResultT ExecuteAsync(uv_async_cb asyncCallback) {
ResultT ExecuteAsync(ThreadPool::Callback asyncCallback) {
result = 0;
req.data = this;
done = false;

uv_async_init(uv_default_loop(), &req, asyncCallback);
{
LockMaster::TemporaryUnlock temporaryUnlock;

uv_async_send(&req);
libgit2ThreadPool.ExecuteReverseCallback(asyncCallback, this);

while(!done) {
sleep_for_ms(1);
}
// wait for completion
uv_sem_wait(&semaphore);
}

return result;
Expand Down
6 changes: 0 additions & 6 deletions 6 generate/templates/manual/include/functions/sleep_for_ms.h

This file was deleted.

41 changes: 32 additions & 9 deletions 41 generate/templates/manual/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,26 @@
#include <queue>

class ThreadPool {
public:
typedef void (*Callback) (void *);

private:
struct Work {
Callback workCallback;
Callback loopCallback;
Callback completionCallback;
void *data;

Work(Callback workCallback, Callback loopCallback, void *data)
: workCallback(workCallback), loopCallback(loopCallback), data(data) {
Work(Callback workCallback, Callback completionCallback, void *data)
: workCallback(workCallback), completionCallback(completionCallback), data(data) {
}
};

struct ReverseCall {
Callback reverseCallback;
void *data;

ReverseCall(Callback reverseCallback, void *data)
: reverseCallback(reverseCallback), data(data) {
}
};

Expand All @@ -23,14 +35,22 @@ class ThreadPool {
int workInProgressCount;

// completion callbacks to be performed on the loop
std::queue<Work> loopQueue;
uv_mutex_t loopMutex;
uv_async_t loopAsync;
std::queue<Work> completionQueue;
uv_mutex_t completionMutex;
uv_async_t completionAsync;

// async callback made from the threadpool, executed in the loop
std::queue<ReverseCall> reverseQueue;
uv_mutex_t reverseMutex;
uv_async_t reverseAsync;

static void RunEventQueue(void *threadPool);
void RunEventQueue();
static void RunLoopCallbacks(uv_async_t* handle);
void RunLoopCallbacks();
static void RunCompletionCallbacks(uv_async_t* handle);
void RunCompletionCallbacks();
static void RunReverseCallbacks(uv_async_t *handle);
void RunReverseCallbacks();

public:
// Initializes thread pool and spins up the requested number of threads
// The provided loop will be used for completion callbacks, whenever
Expand All @@ -39,7 +59,10 @@ class ThreadPool {
// Queues work on the thread pool, followed by completion call scheduled
// on the loop provided in the constructor.
// QueueWork should be called on the loop provided in the constructor.
void QueueWork(Callback workCallback, Callback loopCallback, void *data);
void QueueWork(Callback workCallback, Callback completionCallback, void *data);
// Queues a callback on the loop provided in the constructor
// these block the calling thread's execution until the callback completes
void ExecuteReverseCallback(Callback reverseCallback, void *data);
};

#endif
16 changes: 0 additions & 16 deletions 16 generate/templates/manual/src/functions/sleep_for_ms.cc

This file was deleted.

91 changes: 66 additions & 25 deletions 91 generate/templates/manual/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) {
uv_mutex_init(&workMutex);
uv_sem_init(&workSemaphore, 0);

uv_async_init(loop, &loopAsync, RunLoopCallbacks);
loopAsync.data = this;
uv_unref((uv_handle_t *)&loopAsync);
uv_mutex_init(&loopMutex);
uv_async_init(loop, &completionAsync, RunCompletionCallbacks);
completionAsync.data = this;
uv_unref((uv_handle_t *)&completionAsync);
uv_mutex_init(&completionMutex);

uv_async_init(loop, &reverseAsync, RunReverseCallbacks);
reverseAsync.data = this;
uv_unref((uv_handle_t *)&reverseAsync);
uv_mutex_init(&reverseMutex);

workInProgressCount = 0;

Expand All @@ -17,17 +22,31 @@ ThreadPool::ThreadPool(int numberOfThreads, uv_loop_t *loop) {
}
}

void ThreadPool::QueueWork(Callback workCallback, Callback loopCallback, void *data) {
void ThreadPool::QueueWork(Callback workCallback, Callback completionCallback, void *data) {
uv_mutex_lock(&workMutex);
// there is work on the thread pool - reference the handle so
// node doesn't terminate
uv_ref((uv_handle_t *)&loopAsync);
workQueue.push(Work(workCallback, loopCallback, data));
uv_ref((uv_handle_t *)&completionAsync);
workQueue.push(Work(workCallback, completionCallback, data));
workInProgressCount++;
uv_mutex_unlock(&workMutex);
uv_sem_post(&workSemaphore);
}

void ThreadPool::ExecuteReverseCallback(Callback reverseCallback, void *data) {
// push the callback into the queue
uv_mutex_lock(&reverseMutex);
ReverseCall reverseCall(reverseCallback, data);
bool queueWasEmpty = reverseQueue.empty();
reverseQueue.push(reverseCall);
// we only trigger RunReverseCallbacks via the reverseAsync handle if the queue
// was empty. Otherwise, we depend on RunReverseCallbacks to re-trigger itself
if (queueWasEmpty) {
uv_async_send(&reverseAsync);
}
uv_mutex_unlock(&reverseMutex);
}

void ThreadPool::RunEventQueue(void *threadPool) {
static_cast<ThreadPool *>(threadPool)->RunEventQueue();
}
Expand All @@ -46,40 +65,62 @@ void ThreadPool::RunEventQueue() {
(*work.workCallback)(work.data);

// schedule the callback on the loop
uv_mutex_lock(&loopMutex);
loopQueue.push(work);
uv_mutex_unlock(&loopMutex);
uv_async_send(&loopAsync);
uv_mutex_lock(&completionMutex);
completionQueue.push(work);
uv_mutex_unlock(&completionMutex);
uv_async_send(&completionAsync);
}
}

void ThreadPool::RunLoopCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunLoopCallbacks();
void ThreadPool::RunCompletionCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunCompletionCallbacks();
}

void ThreadPool::RunLoopCallbacks() {
void ThreadPool::RunCompletionCallbacks() {
// uv_async_send can coalesce calls, so we are not guaranteed one
// RunLoopCallbacks per uv_async_send call
// so we always process the entire loopQueue
// RunCompletionCallbacks per uv_async_send call
// so we always process the entire completionQueue
int callbacksCompleted = 0;
uv_mutex_lock(&loopMutex);
while(!loopQueue.empty()) {
Work work = loopQueue.front();
loopQueue.pop();
uv_mutex_unlock(&loopMutex);
uv_mutex_lock(&completionMutex);
while(!completionQueue.empty()) {
Work work = completionQueue.front();
completionQueue.pop();
uv_mutex_unlock(&completionMutex);
// perform the queued loop callback
(*work.loopCallback)(work.data);
(*work.completionCallback)(work.data);
callbacksCompleted++;
uv_mutex_lock(&loopMutex);
uv_mutex_lock(&completionMutex);
}
uv_mutex_unlock(&loopMutex);
uv_mutex_unlock(&completionMutex);

uv_mutex_lock(&workMutex);
// if there is no ongoing work / completion processing, node doesn't need
// to be prevented from terminating
workInProgressCount -= callbacksCompleted;
if(!workInProgressCount) {
uv_unref((uv_handle_t *)&loopAsync);
uv_unref((uv_handle_t *)&completionAsync);
}
uv_mutex_unlock(&workMutex);
}

void ThreadPool::RunReverseCallbacks(uv_async_t* handle) {
static_cast<ThreadPool *>(handle->data)->RunReverseCallbacks();
}

void ThreadPool::RunReverseCallbacks() {
// get the next callback to run
uv_mutex_lock(&reverseMutex);
ReverseCall reverseCall = reverseQueue.front();
uv_mutex_unlock(&reverseMutex);

// execute callback
(*reverseCall.reverseCallback)(reverseCall.data);

// pop the queue, and if necessary, re-trigger RunReverseCallbacks
uv_mutex_lock(&reverseMutex);
reverseQueue.pop();
if (!reverseQueue.empty()) {
uv_async_send(&reverseAsync);
}
uv_mutex_unlock(&reverseMutex);
}
17 changes: 7 additions & 10 deletions 17 generate/templates/partials/callback_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@
{{ arg.cType }} {{ arg.name}}{% if not arg.lastArg %},{% endif %}
{% endeach %}
) {
{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton* baton =
new {{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton({{ cbFunction.return.noResults }});
{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton baton({{ cbFunction.return.noResults }});

{% each cbFunction.args|argsInfo as arg %}
baton->{{ arg.name }} = {{ arg.name }};
baton.{{ arg.name }} = {{ arg.name }};
{% endeach %}

return baton->ExecuteAsync((uv_async_cb) {{ cppFunctionName }}_{{ cbFunction.name }}_async);
return baton.ExecuteAsync({{ cppFunctionName }}_{{ cbFunction.name }}_async);
}

void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(uv_async_t* req, int status) {
void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(void *untypedBaton) {
Nan::HandleScope scope;

{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton* baton = static_cast<{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton*>(req->data);
{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton* baton = static_cast<{{ cppFunctionName }}_{{ cbFunction.name|titleCase }}Baton*>(untypedBaton);

{% each cbFunction.args|argsInfo as arg %}
{% if arg | isPayload %}
Expand Down Expand Up @@ -57,8 +56,6 @@ void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(uv_as
Nan::TryCatch tryCatch;
Local<v8::Value> result = callback->Call({{ cbFunction.args|jsArgsCount }}, argv);

uv_close((uv_handle_t*) &baton->req, NULL);

if(PromiseCompletion::ForwardIfPromise(result, baton, {{ cppFunctionName }}_{{ cbFunction.name }}_promiseCompleted)) {
return;
}
Expand Down Expand Up @@ -88,7 +85,7 @@ void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_async(uv_as
}
{% endeach %}

baton->done = true;
baton->Done();
}

void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_promiseCompleted(bool isFulfilled, AsyncBaton *_baton, v8::Local<v8::Value> result) {
Expand Down Expand Up @@ -132,7 +129,7 @@ void {{ cppClassName }}::{{ cppFunctionName }}_{{ cbFunction.name }}_promiseComp

baton->result = {{ cbFunction.return.error }};
}
baton->done = true;
baton->Done();
}
{%endif%}
{%endeach%}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.