From e12afd6cd8eff15f01c59813f3cffcf5e126b950 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 22 Jan 2025 13:12:51 -0800 Subject: [PATCH 1/9] rearrange a bit --- inst/include/RcppParallel/TBB.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/inst/include/RcppParallel/TBB.h b/inst/include/RcppParallel/TBB.h index f276f75e7..4b638b89c 100644 --- a/inst/include/RcppParallel/TBB.h +++ b/inst/include/RcppParallel/TBB.h @@ -228,14 +228,14 @@ inline void tbbParallelFor(std::size_t begin, std::size_t end, Worker& worker, std::size_t grainSize = 1, - int numThreads = tbb::task_arena::automatic) + int numThreads = -1) { ThreadStackSizeControl control; - tbb::task_arena arena(numThreads); tbb::task_group group; - TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); + + tbb::task_arena arena(numThreads); arena.execute(executor); } @@ -244,14 +244,14 @@ inline void tbbParallelReduce(std::size_t begin, std::size_t end, Reducer& reducer, std::size_t grainSize = 1, - int numThreads = tbb::task_arena::automatic) + int numThreads = -1) { ThreadStackSizeControl control; - tbb::task_arena arena(numThreads); tbb::task_group group; - TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); + + tbb::task_arena arena(numThreads); arena.execute(executor); } From 8964f980f81db18a4f1cfcb07301e6cf368dba73 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 22 Jan 2025 13:21:45 -0800 Subject: [PATCH 2/9] no more inline stack size control --- RcppParallel.Rproj | 1 + inst/include/RcppParallel/Common.h | 16 ++++++++++-- inst/include/RcppParallel/TBB.h | 42 ------------------------------ src/tbb.cpp | 42 ++++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 44 deletions(-) create mode 100644 src/tbb.cpp diff --git a/RcppParallel.Rproj b/RcppParallel.Rproj index 6a05b3721..413f6f714 100644 --- a/RcppParallel.Rproj +++ b/RcppParallel.Rproj @@ -1,4 +1,5 @@ Version: 1.0 +ProjectId: f1ae2e5b-eb1d-4d93-9ae1-29e364a5eaa6 RestoreWorkspace: No SaveWorkspace: No diff --git a/inst/include/RcppParallel/Common.h b/inst/include/RcppParallel/Common.h index 0e4c0e280..7f2eded16 100644 --- a/inst/include/RcppParallel/Common.h +++ b/inst/include/RcppParallel/Common.h @@ -41,7 +41,6 @@ inline int resolveValue(const char* envvar, // to from the void* passed to the worker thread (required because // the tinythreads interface allows to pass only a void* to the // thread main rather than a generic type / template) - struct Worker { // construct and destruct (delete virtually) @@ -58,9 +57,22 @@ struct Worker }; // Tag type used for disambiguating splitting constructors - struct Split {}; +// Used for controlling the stack size for threads / tasks within a scope. +class ThreadStackSizeControl +{ +public: + ThreadStackSizeControl(); + ~ThreadStackSizeControl(); + +private: + // COPYING: not copyable + ThreadStackSizeControl(const ThreadStackSizeControl&); + ThreadStackSizeControl& operator=(const ThreadStackSizeControl&); +}; + + } // namespace RcppParallel diff --git a/inst/include/RcppParallel/TBB.h b/inst/include/RcppParallel/TBB.h index 4b638b89c..e113ef6f7 100644 --- a/inst/include/RcppParallel/TBB.h +++ b/inst/include/RcppParallel/TBB.h @@ -13,8 +13,6 @@ namespace RcppParallel { -namespace { - struct TBBWorker { explicit TBBWorker(Worker& worker) : worker_(worker) {} @@ -184,46 +182,6 @@ class TBBArenaParallelReduceExecutor std::size_t grainSize_; }; -class ThreadStackSizeControl -{ -public: - - ThreadStackSizeControl() - : control_(nullptr) - { - int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); - if (stackSize > 0) - { - control_ = new tbb::global_control( - tbb::global_control::thread_stack_size, - stackSize - ); - } - } - - ~ThreadStackSizeControl() - { - if (control_ != nullptr) - { - delete control_; - control_ = nullptr; - } - } - -private: - - // COPYING: not copyable - ThreadStackSizeControl(const ThreadStackSizeControl&); - ThreadStackSizeControl& operator=(const ThreadStackSizeControl&); - - // private members - tbb::global_control* control_; - -}; - -} // anonymous namespace - - inline void tbbParallelFor(std::size_t begin, std::size_t end, Worker& worker, diff --git a/src/tbb.cpp b/src/tbb.cpp new file mode 100644 index 000000000..e6660288e --- /dev/null +++ b/src/tbb.cpp @@ -0,0 +1,42 @@ + + +#include + +#ifndef TBB_PREVIEW_GLOBAL_CONTROL +# define TBB_PREVIEW_GLOBAL_CONTROL 1 +#endif + +#include +#include +#include + +namespace RcppParallel { + +tbb::global_control* s_globalControl = nullptr; + +ThreadStackSizeControl::ThreadStackSizeControl() +{ +#ifdef RCPP_PARALLEL_USE_TBB + int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); + if (stackSize > 0) + { + s_globalControl = new tbb::global_control( + tbb::global_control::thread_stack_size, + stackSize + ); + } +#endif +} + +ThreadStackSizeControl::~ThreadStackSizeControl() +{ +#ifdef RCPP_PARALLEL_USE_TBB + if (s_globalControl != nullptr) + { + delete s_globalControl; + s_globalControl = nullptr; + } +#endif +} + +} // end namespace RcppParallel From aeacb939930c2c44214473967343cb96fc3e9100 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 22 Jan 2025 13:47:30 -0800 Subject: [PATCH 3/9] move parallel-for stuff into cpp --- R/zzz.R | 2 +- inst/include/RcppParallel/TBB.h | 82 ++---------------------------- src/tbb.cpp | 88 +++++++++++++++++++++++++++++---- 3 files changed, 85 insertions(+), 87 deletions(-) diff --git a/R/zzz.R b/R/zzz.R index 31c40a3e0..3116939c2 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -37,7 +37,7 @@ loadTbbLibrary <- function(name) { .tbbMallocProxyDllInfo <<- loadTbbLibrary("tbbmalloc_proxy") # load RcppParallel library if available - .dllInfo <<- library.dynam("RcppParallel", pkgname, libname) + .dllInfo <<- library.dynam("RcppParallel", pkgname, libname, local = FALSE) } diff --git a/inst/include/RcppParallel/TBB.h b/inst/include/RcppParallel/TBB.h index e113ef6f7..4b6094228 100644 --- a/inst/include/RcppParallel/TBB.h +++ b/inst/include/RcppParallel/TBB.h @@ -54,37 +54,6 @@ struct TBBReducer Reducer& reducer_; }; -class TBBParallelForExecutor -{ -public: - - TBBParallelForExecutor(Worker& worker, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : worker_(worker), - begin_(begin), - end_(end), - grainSize_(grainSize) - { - } - - void operator()() const - { - TBBWorker tbbWorker(worker_); - tbb::parallel_for( - tbb::blocked_range(begin_, end_, grainSize_), - tbbWorker - ); - } - -private: - Worker& worker_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; - template class TBBParallelReduceExecutor { @@ -117,38 +86,6 @@ class TBBParallelReduceExecutor std::size_t grainSize_; }; -class TBBArenaParallelForExecutor -{ -public: - - TBBArenaParallelForExecutor(tbb::task_group& group, - Worker& worker, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : group_(group), - worker_(worker), - begin_(begin), - end_(end), - grainSize_(grainSize) - { - } - - void operator()() const - { - TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); - group_.run_and_wait(executor); - } - -private: - - tbb::task_group& group_; - Worker& worker_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; - template class TBBArenaParallelReduceExecutor { @@ -182,20 +119,11 @@ class TBBArenaParallelReduceExecutor std::size_t grainSize_; }; -inline void tbbParallelFor(std::size_t begin, - std::size_t end, - Worker& worker, - std::size_t grainSize = 1, - int numThreads = -1) -{ - ThreadStackSizeControl control; - - tbb::task_group group; - TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); - - tbb::task_arena arena(numThreads); - arena.execute(executor); -} +void tbbParallelFor(std::size_t begin, + std::size_t end, + Worker& worker, + std::size_t grainSize = 1, + int numThreads = -1); template inline void tbbParallelReduce(std::size_t begin, diff --git a/src/tbb.cpp b/src/tbb.cpp index e6660288e..fd4d30f34 100644 --- a/src/tbb.cpp +++ b/src/tbb.cpp @@ -1,14 +1,6 @@ - #include - -#ifndef TBB_PREVIEW_GLOBAL_CONTROL -# define TBB_PREVIEW_GLOBAL_CONTROL 1 -#endif - -#include -#include -#include +#include namespace RcppParallel { @@ -39,4 +31,82 @@ ThreadStackSizeControl::~ThreadStackSizeControl() #endif } +class TBBParallelForExecutor +{ +public: + + TBBParallelForExecutor(Worker& worker, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : worker_(worker), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBWorker tbbWorker(worker_); + tbb::parallel_for( + tbb::blocked_range(begin_, end_, grainSize_), + tbbWorker + ); + } + +private: + Worker& worker_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +class TBBArenaParallelForExecutor +{ +public: + + TBBArenaParallelForExecutor(tbb::task_group& group, + Worker& worker, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : group_(group), + worker_(worker), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); + group_.run_and_wait(executor); + } + +private: + + tbb::task_group& group_; + Worker& worker_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +void tbbParallelFor(std::size_t begin, + std::size_t end, + Worker& worker, + std::size_t grainSize, + int numThreads) +{ + ThreadStackSizeControl control; + + tbb::task_group group; + TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); + + tbb::task_arena arena(numThreads); + arena.execute(executor); +} + } // end namespace RcppParallel From 8e9a431962dd9456e7a37491190a983f8518cc12 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 22 Jan 2025 19:37:31 -0800 Subject: [PATCH 4/9] move parallel reducer out of header --- RcppParallel.Rproj | 6 +- inst/include/RcppParallel/Common.h | 30 ++--- inst/include/RcppParallel/TBB.h | 184 +++++++++++++---------------- inst/skeleton/vector-sum.cpp | 22 ++-- inst/tests/cpp/innerproduct.cpp | 30 ++--- inst/tests/cpp/sum.cpp | 35 +++--- src/tbb.cpp | 154 ++++++++++++++++++++++-- 7 files changed, 290 insertions(+), 171 deletions(-) diff --git a/RcppParallel.Rproj b/RcppParallel.Rproj index 413f6f714..e315dc5d8 100644 --- a/RcppParallel.Rproj +++ b/RcppParallel.Rproj @@ -13,7 +13,11 @@ Encoding: UTF-8 RnwWeave: Sweave LaTeX: pdfLaTeX +AutoAppendNewline: Yes +StripTrailingWhitespace: Yes + BuildType: Package -PackageInstallArgs: --with-keep.source --clean +PackageCleanBeforeInstall: No +PackageInstallArgs: --with-keep.source PackageCheckArgs: --as-cran PackageRoxygenize: rd,collate,namespace diff --git a/inst/include/RcppParallel/Common.h b/inst/include/RcppParallel/Common.h index 7f2eded16..8901d28ec 100644 --- a/inst/include/RcppParallel/Common.h +++ b/inst/include/RcppParallel/Common.h @@ -15,57 +15,57 @@ inline int resolveValue(const char* envvar, // if the requested value is non-zero and not the default, we can use it if (requestedValue != defaultValue && requestedValue > 0) return requestedValue; - + // otherwise, try reading the default from associated envvar // if the environment variable is unset, use the default const char* var = getenv(envvar); if (var == NULL) return defaultValue; - + // try to convert the string to a number // if an error occurs during conversion, just use default errno = 0; char* end; long value = strtol(var, &end, 10); - + // check for conversion failure if (end == var || *end != '\0' || errno == ERANGE) return defaultValue; - - // okay, return the parsed environment variable value + + // okay, return the parsed environment variable value return value; } +// Tag type used for disambiguating splitting constructors +struct Split {}; + // Work executed within a background thread. We implement dynamic // dispatch using vtables so we can have a stable type to cast // to from the void* passed to the worker thread (required because // the tinythreads interface allows to pass only a void* to the // thread main rather than a generic type / template) -struct Worker -{ +struct Worker +{ // construct and destruct (delete virtually) Worker() {} virtual ~Worker() {} - + // dispatch work over a range of values - virtual void operator()(std::size_t begin, std::size_t end) = 0; - - // disable copying and assignment + virtual void operator()(std::size_t begin, std::size_t end) = 0; + private: + // disable copying and assignment Worker(const Worker&); void operator=(const Worker&); }; -// Tag type used for disambiguating splitting constructors -struct Split {}; - // Used for controlling the stack size for threads / tasks within a scope. class ThreadStackSizeControl { public: ThreadStackSizeControl(); ~ThreadStackSizeControl(); - + private: // COPYING: not copyable ThreadStackSizeControl(const ThreadStackSizeControl&); diff --git a/inst/include/RcppParallel/TBB.h b/inst/include/RcppParallel/TBB.h index 4b6094228..d7f8d03fc 100644 --- a/inst/include/RcppParallel/TBB.h +++ b/inst/include/RcppParallel/TBB.h @@ -13,110 +13,91 @@ namespace RcppParallel { -struct TBBWorker -{ - explicit TBBWorker(Worker& worker) : worker_(worker) {} - - void operator()(const tbb::blocked_range& r) const { - worker_(r.begin(), r.end()); - } +// This class is primarily used to implement type erasure. The goals here were: +// +// 1. Hide the tbb symbols / implementation details from client R packages. +// That is, they should get the tools they need only via RcppParallel. +// +// 2. Do this in a way that preserves binary compatibility with pre-existing +// classes that make use of parallelReduce(). +// +// 3. Ensure that those packages, when re-compiled without source changes, +// can still function as expected. +// +// The downside here is that all the indirection through std::function<> +// and the requirement for RTTI is probably expensive, but I couldn't find +// a better way forward that could also preserve binary compatibility with +// existing pre-built pacakges. +// +// Hopefully, in a future release, we can do away with this wrapper, once +// packages have been rebuilt and no longer implicitly depend on TBB internals. +struct ReducerWrapper { -private: - Worker& worker_; -}; - -template -struct TBBReducer -{ - explicit TBBReducer(Reducer& reducer) - : pSplitReducer_(NULL), reducer_(reducer) - { - } - - TBBReducer(TBBReducer& tbbReducer, tbb::split) - : pSplitReducer_(new Reducer(tbbReducer.reducer_, RcppParallel::Split())), - reducer_(*pSplitReducer_) + template + ReducerWrapper(T* reducer) { - } - - virtual ~TBBReducer() { delete pSplitReducer_; } + self_ = reinterpret_cast(reducer); + owned_ = false; - void operator()(const tbb::blocked_range& r) { - reducer_(r.begin(), r.end()); - } - - void join(const TBBReducer& tbbReducer) { - reducer_.join(tbbReducer.reducer_); + work_ = [&](void* self, std::size_t begin, std::size_t end) + { + (*reinterpret_cast(self))(begin, end); + }; + + split_ = [&](void* object, Split split) + { + return new T(*reinterpret_cast(object), split); + }; + + join_ = [&](void* self, void* other) + { + (*reinterpret_cast(self)).join(*reinterpret_cast(other)); + }; + + deleter_ = [&](void* object) + { + delete (T*) object; + }; } - -private: - Reducer* pSplitReducer_; - Reducer& reducer_; -}; -template -class TBBParallelReduceExecutor -{ -public: - - TBBParallelReduceExecutor(Reducer& reducer, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : reducer_(reducer), - begin_(begin), - end_(end), - grainSize_(grainSize) + ~ReducerWrapper() { + if (owned_) + { + deleter_(self_); + self_ = nullptr; + } } - - void operator()() const + + void operator()(std::size_t begin, std::size_t end) const { - TBBReducer tbbReducer(reducer_); - tbb::parallel_reduce( - tbb::blocked_range(begin_, end_, grainSize_), - tbbReducer - ); + work_(self_, begin, end); } - -private: - Reducer& reducer_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -template -class TBBArenaParallelReduceExecutor -{ -public: - - TBBArenaParallelReduceExecutor(tbb::task_group& group, - Reducer& reducer, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : group_(group), - reducer_(reducer), - begin_(begin), - end_(end), - grainSize_(grainSize) + ReducerWrapper(const ReducerWrapper& rhs, Split split) { + self_ = rhs.split_(rhs.self_, split); + owned_ = true; + + work_ = rhs.work_; + split_ = rhs.split_; + join_ = rhs.join_; + deleter_ = rhs.deleter_; } - - void operator()() const + + void join(const ReducerWrapper& rhs) const { - TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_); - group_.run_and_wait(executor); + join_(self_, rhs.self_); } - + private: - - tbb::task_group& group_; - Reducer& reducer_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; + void* self_ = nullptr; + bool owned_ = false; + + std::function work_; + std::function split_; + std::function join_; + std::function deleter_; }; void tbbParallelFor(std::size_t begin, @@ -125,20 +106,21 @@ void tbbParallelFor(std::size_t begin, std::size_t grainSize = 1, int numThreads = -1); +void tbbParallelReduceImpl(std::size_t begin, + std::size_t end, + ReducerWrapper& wrapper, + std::size_t grainSize = 1, + int numThreads = -1); + template -inline void tbbParallelReduce(std::size_t begin, - std::size_t end, - Reducer& reducer, - std::size_t grainSize = 1, - int numThreads = -1) +void tbbParallelReduce(std::size_t begin, + std::size_t end, + Reducer& reducer, + std::size_t grainSize = 1, + int numThreads = -1) { - ThreadStackSizeControl control; - - tbb::task_group group; - TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); - - tbb::task_arena arena(numThreads); - arena.execute(executor); + ReducerWrapper wrapper(&reducer); + tbbParallelReduceImpl(begin, end, wrapper, grainSize, numThreads); } } // namespace RcppParallel diff --git a/inst/skeleton/vector-sum.cpp b/inst/skeleton/vector-sum.cpp index 30622e519..57b541659 100644 --- a/inst/skeleton/vector-sum.cpp +++ b/inst/skeleton/vector-sum.cpp @@ -19,37 +19,37 @@ using namespace Rcpp; using namespace RcppParallel; struct Sum : public Worker -{ +{ // source vector const RVector input; - + // accumulated value double value; - + // constructors Sum(const NumericVector input) : input(input), value(0) {} Sum(const Sum& sum, Split) : input(sum.input), value(0) {} - + // accumulate just the element of the range I've been asked to void operator()(std::size_t begin, std::size_t end) { value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); } - + // join my value with that of another Sum - void join(const Sum& rhs) { - value += rhs.value; + void join(const Sum& rhs) { + value += rhs.value; } }; // [[Rcpp::export]] double parallelVectorSum(NumericVector x) { - - // declare the SumBody instance + + // declare the SumBody instance Sum sum(x); - + // call parallel_reduce to start the work parallelReduce(0, x.length(), sum); - + // return the computed sum return sum.value; } diff --git a/inst/tests/cpp/innerproduct.cpp b/inst/tests/cpp/innerproduct.cpp index 7a1957205..fee3e41ce 100644 --- a/inst/tests/cpp/innerproduct.cpp +++ b/inst/tests/cpp/innerproduct.cpp @@ -19,43 +19,43 @@ double innerProduct(NumericVector x, NumericVector y) { using namespace RcppParallel; struct InnerProduct : public Worker -{ +{ // source vectors const RVector x; const RVector y; - + // product that I have accumulated double product; - + // constructors - InnerProduct(const NumericVector x, const NumericVector y) + InnerProduct(const NumericVector x, const NumericVector y) : x(x), y(y), product(0) {} - InnerProduct(const InnerProduct& innerProduct, Split) + InnerProduct(const InnerProduct& innerProduct, Split) : x(innerProduct.x), y(innerProduct.y), product(0) {} - + // process just the elements of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { - product += std::inner_product(x.begin() + begin, - x.begin() + end, - y.begin() + begin, + product += std::inner_product(x.begin() + begin, + x.begin() + end, + y.begin() + begin, 0.0); } - + // join my value with that of another InnerProduct - void join(const InnerProduct& rhs) { - product += rhs.product; + void join(const InnerProduct& rhs) { + product += rhs.product; } }; // [[Rcpp::export]] double parallelInnerProduct(NumericVector x, NumericVector y) { - + // declare the InnerProduct instance that takes a pointer to the vector data InnerProduct innerProduct(x, y); - + // call paralleReduce to start the work parallelReduce(0, x.length(), innerProduct); - + // return the computed product return innerProduct.product; } diff --git a/inst/tests/cpp/sum.cpp b/inst/tests/cpp/sum.cpp index aec4895f9..52e256de0 100644 --- a/inst/tests/cpp/sum.cpp +++ b/inst/tests/cpp/sum.cpp @@ -3,7 +3,7 @@ * @author JJ Allaire * @license GPL (>= 2) */ - + #include #include @@ -12,37 +12,42 @@ using namespace RcppParallel; using namespace Rcpp; struct Sum : public Worker -{ +{ // source vector const RVector input; - + // accumulated value double value; - + // constructors Sum(const NumericVector input) : input(input), value(0) {} - Sum(const Sum& sum, Split) : input(sum.input), value(0) {} - + Sum(const Sum& sum, Split) : input(sum.input), value(0) { + printf("ctor: invoking split constructor\n"); + } + // accumulate just the element of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { - value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); + double extra = std::accumulate(input.begin() + begin, input.begin() + end, 0.0); + value += extra; + printf("work: added %f (value is now %f)\n", extra, value); } - + // join my value with that of another Sum - void join(const Sum& rhs) { - value += rhs.value; - } + void join(const Sum& rhs) { + value += rhs.value; + printf("join: added %f (value is now %f)\n", rhs.value, value); + } }; // [[Rcpp::export]] double parallelVectorSum(NumericVector x) { - - // declare the SumBody instance + + // declare the SumBody instance Sum sum(x); - + // call parallel_reduce to start the work parallelReduce(0, x.length(), sum); - + // return the computed sum return sum.value; } diff --git a/src/tbb.cpp b/src/tbb.cpp index fd4d30f34..9aafc8c61 100644 --- a/src/tbb.cpp +++ b/src/tbb.cpp @@ -1,4 +1,6 @@ +#if RCPP_PARALLEL_USE_TBB + #include #include @@ -6,9 +8,22 @@ namespace RcppParallel { tbb::global_control* s_globalControl = nullptr; +// TBB Tools ---- + +struct TBBWorker +{ + explicit TBBWorker(Worker& worker) : worker_(worker) {} + + void operator()(const tbb::blocked_range& r) const { + worker_(r.begin(), r.end()); + } + +private: + Worker& worker_; +}; + ThreadStackSizeControl::ThreadStackSizeControl() { -#ifdef RCPP_PARALLEL_USE_TBB int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); if (stackSize > 0) { @@ -17,24 +32,24 @@ ThreadStackSizeControl::ThreadStackSizeControl() stackSize ); } -#endif } ThreadStackSizeControl::~ThreadStackSizeControl() { -#ifdef RCPP_PARALLEL_USE_TBB if (s_globalControl != nullptr) { delete s_globalControl; s_globalControl = nullptr; } -#endif } + +// TBB Parallel For ---- + class TBBParallelForExecutor { public: - + TBBParallelForExecutor(Worker& worker, std::size_t begin, std::size_t end, @@ -45,7 +60,7 @@ class TBBParallelForExecutor grainSize_(grainSize) { } - + void operator()() const { TBBWorker tbbWorker(worker_); @@ -54,7 +69,7 @@ class TBBParallelForExecutor tbbWorker ); } - + private: Worker& worker_; std::size_t begin_; @@ -65,7 +80,7 @@ class TBBParallelForExecutor class TBBArenaParallelForExecutor { public: - + TBBArenaParallelForExecutor(tbb::task_group& group, Worker& worker, std::size_t begin, @@ -78,15 +93,15 @@ class TBBArenaParallelForExecutor grainSize_(grainSize) { } - + void operator()() const { TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); group_.run_and_wait(executor); } - + private: - + tbb::task_group& group_; Worker& worker_; std::size_t begin_; @@ -101,12 +116,125 @@ void tbbParallelFor(std::size_t begin, int numThreads) { ThreadStackSizeControl control; - + tbb::task_group group; TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); - + + tbb::task_arena arena(numThreads); + arena.execute(executor); +} + + +// TBB Parallel Reduce ---- + +struct TBBReducer +{ + explicit TBBReducer(ReducerWrapper& reducer) + : pSplitReducer_(NULL), reducer_(reducer) + { + } + + TBBReducer(TBBReducer& tbbReducer, tbb::split) + : pSplitReducer_(new ReducerWrapper(tbbReducer.reducer_, RcppParallel::Split())), + reducer_(*pSplitReducer_) + { + } + + virtual ~TBBReducer() { delete pSplitReducer_; } + + void operator()(const tbb::blocked_range& r) + { + reducer_(r.begin(), r.end()); + } + + void join(const TBBReducer& tbbReducer) + { + reducer_.join(tbbReducer.reducer_); + } + +private: + ReducerWrapper* pSplitReducer_; + ReducerWrapper& reducer_; +}; + +class TBBParallelReduceExecutor +{ +public: + + TBBParallelReduceExecutor(ReducerWrapper& reducer, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : reducer_(reducer), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBReducer tbbReducer(reducer_); + tbb::parallel_reduce( + tbb::blocked_range(begin_, end_, grainSize_), + tbbReducer + ); + } + +private: + ReducerWrapper& reducer_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +class TBBArenaParallelReduceExecutor +{ +public: + + TBBArenaParallelReduceExecutor(tbb::task_group& group, + ReducerWrapper& reducer, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : group_(group), + reducer_(reducer), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_); + group_.run_and_wait(executor); + } + +private: + + tbb::task_group& group_; + ReducerWrapper& reducer_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +void tbbParallelReduceImpl(std::size_t begin, + std::size_t end, + ReducerWrapper& reducer, + std::size_t grainSize, + int numThreads) +{ + ThreadStackSizeControl control; + + tbb::task_group group; + TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); + tbb::task_arena arena(numThreads); arena.execute(executor); } } // end namespace RcppParallel + +#endif /* RCPP_PARALLEL_USE_TBB */ From d34c550a41fbdac00a5fcde420ef2c20207b5821 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Wed, 22 Jan 2025 19:47:22 -0800 Subject: [PATCH 5/9] revert some test changes --- inst/tests/cpp/sum.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/inst/tests/cpp/sum.cpp b/inst/tests/cpp/sum.cpp index 52e256de0..db47c699e 100644 --- a/inst/tests/cpp/sum.cpp +++ b/inst/tests/cpp/sum.cpp @@ -21,21 +21,16 @@ struct Sum : public Worker // constructors Sum(const NumericVector input) : input(input), value(0) {} - Sum(const Sum& sum, Split) : input(sum.input), value(0) { - printf("ctor: invoking split constructor\n"); - } + Sum(const Sum& sum, Split) : input(sum.input), value(0) {} // accumulate just the element of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { - double extra = std::accumulate(input.begin() + begin, input.begin() + end, 0.0); - value += extra; - printf("work: added %f (value is now %f)\n", extra, value); + value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); } // join my value with that of another Sum void join(const Sum& rhs) { value += rhs.value; - printf("join: added %f (value is now %f)\n", rhs.value, value); } }; From 19cd92b023b34a97f315af1614c0e2d97ccc2565 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Thu, 23 Jan 2025 09:45:18 -0800 Subject: [PATCH 6/9] prepare for release --- DESCRIPTION | 2 +- NEWS.md | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index e3887f5c8..9607549b0 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: RcppParallel Type: Package Title: Parallel Programming Tools for 'Rcpp' -Version: 5.1.9.9000 +Version: 5.1.10 Authors@R: c( person("JJ", "Allaire", role = c("aut"), email = "jj@rstudio.com"), person("Romain", "Francois", role = c("aut", "cph")), diff --git a/NEWS.md b/NEWS.md index 8a7df9ceb..b1f873ea7 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,9 @@ -## RcppParallel 5.1.10 (UNRELEASED) +## RcppParallel 5.1.10 +* Fixed an issue where packages linking to RcppParallel could inadverently + depend on internals of the TBB library available during compilation, even + if the package did not explicitly use TBB itself. ## RcppParallel 5.1.9 From 23f8af9049854785238d9f0bda1d051ca91c6272 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Thu, 23 Jan 2025 15:18:41 -0800 Subject: [PATCH 7/9] need to link to RcppParallel in plugin on Windows --- R/flags.R | 18 +++++++------- R/tbb.R | 72 +++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 55 insertions(+), 35 deletions(-) diff --git a/R/flags.R b/R/flags.R index 32f641840..6ae1f8280 100644 --- a/R/flags.R +++ b/R/flags.R @@ -1,32 +1,32 @@ #' Compilation flags for RcppParallel -#' +#' #' Output the compiler or linker flags required to build against RcppParallel. -#' +#' #' These functions are typically called from `Makevars` as follows: -#' +#' #' ``` #' PKG_LIBS += $(shell "${R_HOME}/bin/Rscript" -e "RcppParallel::LdFlags()") #' ``` -#' +#' #' On Windows, the flags ensure that the package links with the built-in TBB #' library. On Linux and macOS, the output is empty, because TBB is loaded #' dynamically on load by `RcppParallel`. -#' +#' #' \R packages using RcppParallel should also add the following to their #' `NAMESPACE` file: -#' +#' #' ``` #' importFrom(RcppParallel, RcppParallelLibs) #' ``` -#' +#' #' This is necessary to ensure that \pkg{RcppParallel} (and so, TBB) is loaded #' and available. -#' +#' #' @name flags #' @rdname flags #' @aliases RcppParallelLibs LdFlags CxxFlags -#' +#' #' @return Returns \code{NULL}, invisibly. These functions are called for #' their side effects (writing the associated flags to stdout). #' diff --git a/R/tbb.R b/R/tbb.R index 4c8fc14cf..87362db93 100644 --- a/R/tbb.R +++ b/R/tbb.R @@ -1,26 +1,26 @@ #' Get the Path to a TBB Library -#' +#' #' Retrieve the path to a TBB library. This can be useful for \R packages #' using RcppParallel that wish to use, or re-use, the version of TBB that #' RcppParallel has been configured to use. -#' +#' #' @param name #' The name of the TBB library to be resolved. Normally, this is one of #' `tbb`, `tbbmalloc`, or `tbbmalloc_proxy`. When `NULL`, the library #' path containing the TBB libraries is returned instead. -#' +#' #' @export tbbLibraryPath <- function(name = NULL) { - + # library paths for different OSes sysname <- Sys.info()[["sysname"]] - + # find root for TBB install tbbRoot <- Sys.getenv("TBB_LIB", unset = tbbRoot()) if (is.null(name)) return(tbbRoot) - + # form library names tbbLibNames <- list( "Darwin" = paste0("lib", name, ".dylib"), @@ -28,12 +28,12 @@ tbbLibraryPath <- function(name = NULL) { "SunOS" = paste0("lib", name, ".so"), "Linux" = paste0("lib", name, c(".so.2", ".so")) ) - + # skip systems that we know not to be compatible isCompatible <- !is_sparc() && !is.null(tbbLibNames[[sysname]]) if (!isCompatible) return(NULL) - + # find the request library (if any) libNames <- tbbLibNames[[sysname]] for (libName in libNames) { @@ -41,13 +41,13 @@ tbbLibraryPath <- function(name = NULL) { if (file.exists(tbbName)) return(tbbName) } - + } tbbCxxFlags <- function() { - + flags <- character() - + # opt-in to TBB on Windows if (is_windows()) { flags <- c(flags, "-DRCPP_PARALLEL_USE_TBB=1") @@ -57,37 +57,57 @@ tbbCxxFlags <- function() { flags <- c(flags, "-DTBB_USE_GCC_BUILTINS") } } - + # if TBB_INC is set, apply those library paths tbbInc <- Sys.getenv("TBB_INC", unset = TBB_INC) if (nzchar(tbbInc)) { - + # add include path flags <- c(flags, paste0("-I", asBuildPath(tbbInc))) - + # prefer new interface if version.h exists versionPath <- file.path(tbbInc, "tbb/version.h") if (file.exists(versionPath)) flags <- c(flags, "-DTBB_INTERFACE_NEW") - + } - + # return flags as string paste(flags, collapse = " ") - + } # Return the linker flags required for TBB on this platform tbbLdFlags <- function() { - + + tbbFlags <- tbbLdFlagsImpl() + + if (is_windows()) { + libDir <- system.file("libs", .Platform$r_arch, package = "RcppParallel") + libName <- paste0("RcppParallel", .Platform$dynlib.ext) + newFlags <- sprintf("-L%s -lRcppParallel", shQuote(libDir)) + tbbFlags <- paste(newFlags, tbbFlags) + } + + tbbFlags + +} + +tbbLdFlagsImpl <- function() { + # shortcut if TBB_LIB defined tbbLib <- Sys.getenv("TBB_LINK_LIB", Sys.getenv("TBB_LIB", unset = TBB_LIB)) if (nzchar(tbbLib)) { - fmt <- if (is_windows()) "-L%1$s -ltbb -ltbbmalloc" - else "-L%1$s -Wl,-rpath,%1$s -ltbb -ltbbmalloc" + + fmt <- if (is_windows()) { + "-L%1$s -ltbb -ltbbmalloc" + } else { + "-L%1$s -Wl,-rpath,%1$s -ltbb -ltbbmalloc" + } + return(sprintf(fmt, asBuildPath(tbbLib))) } - + # on Mac, Windows and Solaris, we need to explicitly link (#206) needsExplicitFlags <- is_mac() || is_windows() || (is_solaris() && !is_sparc()) if (needsExplicitFlags) { @@ -95,20 +115,20 @@ tbbLdFlags <- function() { libFlag <- paste0("-L", libPath) return(paste(libFlag, "-ltbb", "-ltbbmalloc")) } - + # nothing required on other platforms "" - + } tbbRoot <- function() { - + if (nzchar(TBB_LIB)) return(TBB_LIB) - + rArch <- .Platform$r_arch parts <- c("lib", if (nzchar(rArch)) rArch) libDir <- paste(parts, collapse = "/") system.file(libDir, package = "RcppParallel") - + } From 8ce382b4f15ba9519a9a3f6bfd01434c3f490bcf Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Thu, 23 Jan 2025 15:22:09 -0800 Subject: [PATCH 8/9] compiler warnings --- inst/include/RcppParallel.h | 14 +++++++------- inst/include/RcppParallel/Common.h | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/inst/include/RcppParallel.h b/inst/include/RcppParallel.h index cbd5ef457..af26afc90 100644 --- a/inst/include/RcppParallel.h +++ b/inst/include/RcppParallel.h @@ -6,7 +6,7 @@ #include "RcppParallel/TinyThread.h" // Use TBB only where it's known to compile and work correctly -// (NOTE: Windows TBB is temporarily opt-in for packages for +// (NOTE: Windows TBB is temporarily opt-in for packages for // compatibility with CRAN packages not previously configured // to link to TBB in Makevars.win) #ifndef RCPP_PARALLEL_USE_TBB @@ -32,14 +32,14 @@ namespace RcppParallel { inline void parallelFor(std::size_t begin, - std::size_t end, + std::size_t end, Worker& worker, std::size_t grainSize = 1, int numThreads = -1) { - grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1u); + grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1)); numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1); - + #if RCPP_PARALLEL_USE_TBB if (internal::backend() == internal::BACKEND_TBB) tbbParallelFor(begin, end, worker, grainSize, numThreads); @@ -52,14 +52,14 @@ inline void parallelFor(std::size_t begin, template inline void parallelReduce(std::size_t begin, - std::size_t end, + std::size_t end, Reducer& reducer, std::size_t grainSize = 1, int numThreads = -1) { - grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1); + grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1)); numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1); - + #if RCPP_PARALLEL_USE_TBB if (internal::backend() == internal::BACKEND_TBB) tbbParallelReduce(begin, end, reducer, grainSize, numThreads); diff --git a/inst/include/RcppParallel/Common.h b/inst/include/RcppParallel/Common.h index 8901d28ec..7f74ad079 100644 --- a/inst/include/RcppParallel/Common.h +++ b/inst/include/RcppParallel/Common.h @@ -13,7 +13,7 @@ inline int resolveValue(const char* envvar, U defaultValue) { // if the requested value is non-zero and not the default, we can use it - if (requestedValue != defaultValue && requestedValue > 0) + if (requestedValue != static_cast(defaultValue) && requestedValue > 0) return requestedValue; // otherwise, try reading the default from associated envvar From 0b05c24f7cd5b6c7ef9417c79d45ca342fbe7ef6 Mon Sep 17 00:00:00 2001 From: Kevin Ushey Date: Mon, 27 Jan 2025 12:06:00 -0800 Subject: [PATCH 9/9] Update DESCRIPTION --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 3c5e244ae..d51de8a09 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: RcppParallel Type: Package Title: Parallel Programming Tools for 'Rcpp' -Version: 5.1.10 +Version: 5.1.10.9000 Authors@R: c( person("Kevin", "Ushey", role = c("aut", "cre"), email = "kevin@rstudio.com", comment = c(ORCID = "0000-0003-2880-7407")),