diff --git a/Eigen/src/ThreadPool/ForkJoin.h b/Eigen/src/ThreadPool/ForkJoin.h index d6ea4ddcd..f67abd33d 100644 --- a/Eigen/src/ThreadPool/ForkJoin.h +++ b/Eigen/src/ThreadPool/ForkJoin.h @@ -31,7 +31,7 @@ namespace Eigen { // where `s_{j+1} - s_{j}` and `end - s_n` are roughly within a factor of two of `granularity`. For a unary // task function `g(k)`, the same operation is applied with // -// f(i,j) = [&](){ for(int k = i; k < j; ++k) g(k); }; +// f(i,j) = [&](){ for(Index k = i; k < j; ++k) g(k); }; // // Note that the parameter `granularity` should be tuned by the user based on the trade-off of running the // given task function sequentially vs. scheduling individual tasks in parallel. An example of a partially @@ -45,51 +45,50 @@ namespace Eigen { // ForkJoinScheduler::ParallelFor(0, num_tasks, granularity, std::move(parallel_task), &thread_pool); // ``` // -// Example usage #2 (asynchronous): +// Example usage #2 (executing multiple tasks asynchronously, each one parallelized with ParallelFor): // ``` // ThreadPool thread_pool(num_threads); -// Barrier barrier(num_tasks * num_async_calls); -// auto done = [&](){barrier.Notify();}; -// for (int k=0; k` or `std::function start`, the `done` callback will be called `end - start` times when all tasks have been - // executed. Otherwise, `done` is called only once. - template - static void ParallelForAsync(int start, int end, int granularity, DoFnType do_func, std::function done, - Eigen::ThreadPool* thread_pool) { + // Runs `do_func` asynchronously for the range [start, end) with a specified + // granularity. `do_func` should be of type `std::function + static void ParallelForAsync(Index start, Index end, Index granularity, DoFnType&& do_func, DoneFnType&& done, + ThreadPool* thread_pool) { if (start >= end) { done(); return; } - ForkJoinScheduler::RunParallelForAsync(start, end, granularity, do_func, done, thread_pool); + thread_pool->Schedule([start, end, granularity, thread_pool, do_func = std::forward(do_func), + done = std::forward(done)]() { + RunParallelFor(start, end, granularity, do_func, thread_pool); + done(); + }); } // Synchronous variant of ParallelForAsync. template - static void ParallelFor(int start, int end, int granularity, DoFnType do_func, Eigen::ThreadPool* thread_pool) { + static void ParallelFor(Index start, Index end, Index granularity, DoFnType&& do_func, ThreadPool* thread_pool) { if (start >= end) return; - auto dummy_done = []() {}; Barrier barrier(1); - thread_pool->Schedule([start, end, granularity, thread_pool, &do_func, &dummy_done, &barrier]() { - ForkJoinScheduler::ParallelForAsync(start, end, granularity, do_func, dummy_done, thread_pool); - barrier.Notify(); - }); + auto done = [&barrier]() { barrier.Notify(); }; + ParallelForAsync(start, end, granularity, do_func, done, thread_pool); barrier.Wait(); } private: // Schedules `right_thunk`, runs `left_thunk`, and runs other tasks until `right_thunk` has finished. template - static void ForkJoin(LeftType&& left_thunk, RightType&& right_thunk, Eigen::ThreadPool* thread_pool) { + static void ForkJoin(LeftType&& left_thunk, RightType&& right_thunk, ThreadPool* thread_pool) { std::atomic right_done(false); auto execute_right = [&right_thunk, &right_done]() { std::forward(right_thunk)(); @@ -97,47 +96,38 @@ class ForkJoinScheduler { }; thread_pool->Schedule(execute_right); std::forward(left_thunk)(); - Eigen::ThreadPool::Task task; + ThreadPool::Task task; while (!right_done.load(std::memory_order_acquire)) { thread_pool->MaybeGetTask(&task); if (task.f) task.f(); } } - // Runs `do_func` in parallel for the range [start, end). The main recursive asynchronous runner that - // calls `ForkJoin`. - static void RunParallelForAsync(int start, int end, int granularity, std::function& do_func, - std::function& done, Eigen::ThreadPool* thread_pool) { - std::function wrapped_do_func = [&do_func](int start, int end) { - for (int i = start; i < end; ++i) do_func(i); - }; - ForkJoinScheduler::RunParallelForAsync(start, end, granularity, wrapped_do_func, done, thread_pool); + static Index ComputeMidpoint(Index start, Index end, Index granularity) { + // Typical workloads choose initial values of `{start, end, granularity}` such that `start - end` and + // `granularity` are powers of two. Since modern processors usually implement (2^x)-way + // set-associative caches, we minimize the number of cache misses by choosing midpoints that are not + // powers of two (to avoid having two addresses in the main memory pointing to the same point in the + // cache). More specifically, we choose the midpoint at (roughly) the 9/16 mark. + const Index size = end - start; + const Index offset = numext::round_down(9 * (size + 1) / 16, granularity); + return start + offset; } - // Variant of `RunAsyncParallelFor` that uses a do function that operates on an index range. - // Specifically, `do_func` takes two arguments: the start and end of the range. - static void RunParallelForAsync(int start, int end, int granularity, std::function& do_func, - std::function& done, Eigen::ThreadPool* thread_pool) { - if ((end - start) <= granularity) { + template + static void RunParallelFor(Index start, Index end, Index granularity, DoFnType&& do_func, ThreadPool* thread_pool) { + Index mid = ComputeMidpoint(start, end, granularity); + if ((end - start) < granularity || mid == start || mid == end) { do_func(start, end); - for (int j = 0; j < end - start; ++j) done(); - } else { - // Typical workloads choose initial values of `{start, end, granularity}` such that `start - end` and - // `granularity` are powers of two. Since modern processors usually implement (2^x)-way - // set-associative caches, we minimize the number of cache misses by choosing midpoints that are not - // powers of two (to avoid having two addresses in the main memory pointing to the same point in the - // cache). More specifically, we choose the midpoint at (roughly) the 9/16 mark. - const int size = end - start; - const int mid = start + 9 * (size + 1) / 16; - ForkJoinScheduler::ForkJoin( - [start, mid, granularity, &do_func, &done, thread_pool]() { - RunParallelForAsync(start, mid, granularity, do_func, done, thread_pool); - }, - [mid, end, granularity, &do_func, &done, thread_pool]() { - RunParallelForAsync(mid, end, granularity, do_func, done, thread_pool); - }, - thread_pool); + return; } + ForkJoin([start, mid, granularity, &do_func, thread_pool]() { + RunParallelFor(start, mid, granularity, do_func, thread_pool); + }, + [mid, end, granularity, &do_func, thread_pool]() { + RunParallelFor(mid, end, granularity, do_func, thread_pool); + }, + thread_pool); } }; diff --git a/Eigen/src/ThreadPool/NonBlockingThreadPool.h b/Eigen/src/ThreadPool/NonBlockingThreadPool.h index 11dfae358..4ec13548b 100644 --- a/Eigen/src/ThreadPool/NonBlockingThreadPool.h +++ b/Eigen/src/ThreadPool/NonBlockingThreadPool.h @@ -156,7 +156,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Tries to assign work to the current task. void MaybeGetTask(Task* t) { PerThread* pt = GetPerThread(); - Queue& q = thread_data_[pt->thread_id].queue; + const int thread_id = pt->thread_id; + // If we are not a worker thread of this pool, we can't get any work. + if (thread_id < 0) return; + Queue& q = thread_data_[thread_id].queue; *t = q.PopFront(); if (t->f) return; if (num_threads_ == 1) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index fdfde4597..3a67ab1d0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -320,6 +320,7 @@ ei_add_test(tuple_test) ei_add_test(threads_eventcount "-pthread" "${CMAKE_THREAD_LIBS_INIT}") ei_add_test(threads_runqueue "-pthread" "${CMAKE_THREAD_LIBS_INIT}") ei_add_test(threads_non_blocking_thread_pool "-pthread" "${CMAKE_THREAD_LIBS_INIT}") +ei_add_test(threads_fork_join "-pthread" "${CMAKE_THREAD_LIBS_INIT}") add_executable(bug1213 bug1213.cpp bug1213_main.cpp) check_cxx_compiler_flag("-ffast-math" COMPILER_SUPPORT_FASTMATH) diff --git a/test/threads_fork_join.cpp b/test/threads_fork_join.cpp index 941c317ed..b852b05f0 100644 --- a/test/threads_fork_join.cpp +++ b/test/threads_fork_join.cpp @@ -12,39 +12,26 @@ #include "Eigen/ThreadPool" struct TestData { - ThreadPool tp; + std::unique_ptr tp; std::vector data; }; TestData make_test_data(int num_threads, int num_shards) { - return {ThreadPool(num_threads), std::vector(num_shards, 1.0)}; + return {std::make_unique(num_threads), std::vector(num_shards, 1.0)}; } -static void test_unary_parallel_for(int granularity) { +static void test_parallel_for(int granularity) { // Test correctness. const int kNumTasks = 100000; TestData test_data = make_test_data(/*num_threads=*/4, kNumTasks); - std::atomic sum = 0.0; - std::function unary_do_fn = [&](int i) { - for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) { - }; - }; - ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(unary_do_fn), &test_data.tp); - VERIFY_IS_EQUAL(sum, kNumTasks); -} - -static void test_binary_parallel_for(int granularity) { - // Test correctness. - const int kNumTasks = 100000; - TestData test_data = make_test_data(/*num_threads=*/4, kNumTasks); - std::atomic sum = 0.0; - std::function binary_do_fn = [&](int i, int j) { + std::atomic sum(0); + std::function binary_do_fn = [&](Index i, Index j) { for (int k = i; k < j; ++k) - for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[k]);) { + for (uint64_t new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[k]);) { }; }; - ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(binary_do_fn), &test_data.tp); - VERIFY_IS_EQUAL(sum, kNumTasks); + ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(binary_do_fn), test_data.tp.get()); + VERIFY_IS_EQUAL(sum.load(), kNumTasks); } static void test_async_parallel_for() { @@ -54,26 +41,26 @@ static void test_async_parallel_for() { const int kNumTasks = 100; const int kNumAsyncCalls = kNumThreads * 4; TestData test_data = make_test_data(kNumThreads, kNumTasks); - std::atomic sum = 0.0; - std::function unary_do_fn = [&](int i) { - for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) { - }; + std::atomic sum(0); + std::function binary_do_fn = [&](Index i, Index j) { + for (Index k = i; k < j; ++k) { + for (uint64_t new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) { + } + } }; - Barrier barrier(kNumTasks * kNumAsyncCalls); + Barrier barrier(kNumAsyncCalls); std::function done = [&]() { barrier.Notify(); }; for (int k = 0; k < kNumAsyncCalls; ++k) { - test_data.tp.Schedule([&]() { - ForkJoinScheduler::ParallelForAsync(0, kNumTasks, /*granularity=*/1, unary_do_fn, done, &test_data.tp); + test_data.tp->Schedule([&]() { + ForkJoinScheduler::ParallelForAsync(0, kNumTasks, /*granularity=*/1, binary_do_fn, done, test_data.tp.get()); }); } barrier.Wait(); - VERIFY_IS_EQUAL(sum, kNumTasks * kNumAsyncCalls); + VERIFY_IS_EQUAL(sum.load(), kNumTasks * kNumAsyncCalls); } EIGEN_DECLARE_TEST(fork_join) { - CALL_SUBTEST(test_unary_parallel_for(1)); - CALL_SUBTEST(test_unary_parallel_for(2)); - CALL_SUBTEST(test_binary_parallel_for(1)); - CALL_SUBTEST(test_binary_parallel_for(2)); + CALL_SUBTEST(test_parallel_for(1)); + CALL_SUBTEST(test_parallel_for(2)); CALL_SUBTEST(test_async_parallel_for()); }