From 4a6ac97d135cef787e3418c1a91f1efe38af1b9c Mon Sep 17 00:00:00 2001 From: William Kong Date: Fri, 24 Jan 2025 22:12:05 +0000 Subject: [PATCH] Add a ForkJoin-based ParallelFor algorithm to the ThreadPool module --- Eigen/ThreadPool | 1 + Eigen/src/ThreadPool/ForkJoin.h | 151 ++++++++++++++++++ Eigen/src/ThreadPool/NonBlockingThreadPool.h | 158 ++++++++----------- test/threads_fork_join.cpp | 79 ++++++++++ 4 files changed, 295 insertions(+), 94 deletions(-) create mode 100644 Eigen/src/ThreadPool/ForkJoin.h create mode 100644 test/threads_fork_join.cpp diff --git a/Eigen/ThreadPool b/Eigen/ThreadPool index c6e2cee5b..39e5d1eec 100644 --- a/Eigen/ThreadPool +++ b/Eigen/ThreadPool @@ -72,6 +72,7 @@ #include "src/ThreadPool/Barrier.h" #include "src/ThreadPool/NonBlockingThreadPool.h" #include "src/ThreadPool/CoreThreadPoolDevice.h" +#include "src/ThreadPool/ForkJoin.h" // IWYU pragma: end_exports #include "src/Core/util/ReenableStupidWarnings.h" diff --git a/Eigen/src/ThreadPool/ForkJoin.h b/Eigen/src/ThreadPool/ForkJoin.h new file mode 100644 index 000000000..a04c54a01 --- /dev/null +++ b/Eigen/src/ThreadPool/ForkJoin.h @@ -0,0 +1,151 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2025 Weiwei Kong +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_THREADPOOL_FORKJOIN_H +#define EIGEN_THREADPOOL_FORKJOIN_H + +// IWYU pragma: private +#include "./InternalHeaderCheck.h" + +namespace Eigen { + +// ForkJoinScheduler provides implementations of various non-blocking ParallelFor algorithms for unary +// and binary parallel tasks. More specfically, the implementations follow the binary tree-based +// algorithm from the following paper: +// +// Lea, D. (2000, June). A java fork/join framework. *In Proceedings of the +// ACM 2000 conference on Java Grande* (pp. 36-43). +// +// For a given binary task function `f(i,j)` and integers `num_threads`, `granularity`, `start`, and `end`, +// the implemented parallel for algorithm schedules and excutes at most `num_threads` of the functions +// from the following set in parallel (either synchronously or asynchronously): +// +// f(start,start+s_1), f(start+s_1,start+s_2), ..., f(start+s_n,end) +// +// where `s_{j+1} - s_{j}` and `end - s_n` are roughly within a factor of two of `granularity`. For a unary +// task function `g(k)`, the same operation is applied with +// +// f(i,j) = [&](){ for(int k=i; k` or `std::function start`, the `done` callback will be called `end - start` times when all tasks have been + // executed. Otherwise, `done` is called only once. + template + static void ParallelForAsync(int start, int end, int granularity, DoFnType do_func, std::function done, + Eigen::ThreadPool* thread_pool) { + if (start >= end) { + done(); + return; + } + ForkJoinScheduler::RunParallelForAsync(start, end, granularity, do_func, done, thread_pool); + } + + // Synchronous variant of Async::ParallelFor. + template + static void ParallelFor(int start, int end, int granularity, DoFnType do_func, Eigen::ThreadPool* thread_pool) { + if (start >= end) return; + auto dummy_done = []() {}; + Barrier barrier(1); + thread_pool->Schedule([start, end, granularity, thread_pool, &do_func, &dummy_done, &barrier]() { + ForkJoinScheduler::ParallelForAsync(start, end, granularity, do_func, dummy_done, thread_pool); + barrier.Notify(); + }); + barrier.Wait(); + } + + private: + // Schedules `right_thunk`, runs `left_thunk` and runs other tasks until + // `right_thunk` has finished. + template + static void ForkJoin(LeftType&& left_thunk, RightType&& right_thunk, Eigen::ThreadPool* thread_pool) { + std::atomic right_done(false); + auto execute_right = [&right_thunk, &right_done]() { + std::forward(right_thunk)(); + right_done.store(true, std::memory_order_release); + }; + thread_pool->Schedule(execute_right); + std::forward(left_thunk)(); + Eigen::ThreadPool::Task task; + while (!right_done.load(std::memory_order_acquire)) { + thread_pool->MaybeGetTask(&task); + if (task.f) task.f(); + } + } + + // Runs `do_func` in parallel for the range [start, end). The main recursive asynchronous runner that + // calls `ForkJoin`. + static void RunParallelForAsync(int start, int end, int granularity, std::function& do_func, + std::function& done, Eigen::ThreadPool* thread_pool) { + std::function wrapped_do_func = [&do_func](int start, int end) { + for (int i = start; i < end; ++i) do_func(i); + }; + ForkJoinScheduler::RunParallelForAsync(start, end, granularity, wrapped_do_func, done, thread_pool); + } + + // Variant of `RunAsyncParallelFor` that uses a do function that operates on an index range. + // Specifically, `do_func` takes two arguments: the start and end of the range. + static void RunParallelForAsync(int start, int end, int granularity, std::function& do_func, + std::function& done, Eigen::ThreadPool* thread_pool) { + if ((end - start) <= granularity) { + do_func(start, end); + for (int j = 0; j < end - start; ++j) done(); + } else { + // Typical workloads choose initial values of `{start, end, granularity}` such that `start - end` and + // `granularity` are powers of two. Since modern processors usually implement (2^x)-way + // set-associative caches, we minimize the number of cache misses by choosing midpoints that are not + // powers of two (to avoid having two addresses in the main memory pointing to the same point in the + // cache). More specifically, we restrict the set of candidate midpoints to: + // + // P := {start, start + granularity, start + 2*granularity, ..., end}, + // + // and choose the entry in `P` at (roughly) the 9/16 mark. + const int size = end - start; + const int mid = start + Eigen::numext::div_ceil(9 * (size + 1) / 16, granularity) * granularity; + ForkJoinScheduler::ForkJoin( + [start, mid, granularity, &do_func, &done, thread_pool]() { + RunParallelForAsync(start, mid, granularity, do_func, done, thread_pool); + }, + [mid, end, granularity, &do_func, &done, thread_pool]() { + RunParallelForAsync(mid, end, granularity, do_func, done, thread_pool); + }, + thread_pool); + } + } +}; + +} // namespace Eigen + +#endif // EIGEN_THREADPOOL_FORKJOIN_H diff --git a/Eigen/src/ThreadPool/NonBlockingThreadPool.h b/Eigen/src/ThreadPool/NonBlockingThreadPool.h index b6abc3cba..9c0fb7b11 100644 --- a/Eigen/src/ThreadPool/NonBlockingThreadPool.h +++ b/Eigen/src/ThreadPool/NonBlockingThreadPool.h @@ -18,9 +18,24 @@ namespace Eigen { template class ThreadPoolTempl : public Eigen::ThreadPoolInterface { public: + typedef typename Environment::EnvThread Thread; typedef typename Environment::Task Task; typedef RunQueue Queue; + struct PerThread { + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} + ThreadPoolTempl* pool; // Parent pool, or null for normal threads. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. + }; + + struct ThreadData { + constexpr ThreadData() : thread(), steal_partition(0), queue() {} + std::unique_ptr thread; + std::atomic steal_partition; + Queue queue; + }; + ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {} ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment()) @@ -31,6 +46,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { all_coprimes_(num_threads), waiters_(num_threads), global_steal_partition_(EncodePartition(0, num_threads_)), + spin_count_(0), spinning_state_(0), blocked_(0), done_(false), @@ -133,6 +149,39 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } + // Tries to assign work to the current task. + void MaybeGetTask(Task* t) { + PerThread* pt = GetPerThread(); + Queue& q = thread_data_[pt->thread_id].queue; + if (*t = q.PopFront(); t->f) return; + if (num_threads_ == 1) { + // For num_threads_ == 1 there is no point in going through the expensive + // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the + // victim queues it might reverse the order in which ops are executed + // compared to the order in which they are scheduled, which tends to be + // counter-productive for the types of I/O workloads single thread pools + // tend to be used for. + for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront(); + } else { + if (EIGEN_PREDICT_FALSE(!t->f)) *t = LocalSteal(); + if (EIGEN_PREDICT_FALSE(!t->f)) *t = GlobalSteal(); + if (EIGEN_PREDICT_FALSE(!t->f)) { + if (allow_spinning_ && StartSpinning()) { + for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront(); + // Notify `spinning_state_` that we are no longer spinning. + bool has_no_notify_task = StopSpinning(); + // If a task was submitted to the queue without a call to + // `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned + // false), and we didn't steal anything above, we must try to + // steal one more time, to make sure that this task will be + // executed. We will not necessarily find it, because it might + // have been already stolen by some other thread. + if (has_no_notify_task && !t->f) *t = q.PopFront(); + } + } + } + } + void Cancel() EIGEN_OVERRIDE { cancelled_ = true; done_ = true; @@ -206,22 +255,6 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } - typedef typename Environment::EnvThread Thread; - - struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} - ThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. - }; - - struct ThreadData { - constexpr ThreadData() : thread(), steal_partition(0), queue() {} - std::unique_ptr thread; - std::atomic steal_partition; - Queue queue; - }; - // Maximum number of threads that can spin in steal loop. static constexpr int kMaxSpinningThreads = 1; @@ -277,6 +310,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Environment env_; const int num_threads_; const bool allow_spinning_; + int spin_count_; MaxSizeVector thread_data_; MaxSizeVector> all_coprimes_; MaxSizeVector waiters_; @@ -314,85 +348,21 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { Queue& q = thread_data_[thread_id].queue; EventCount::Waiter* waiter = &waiters_[thread_id]; // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is - // proportional to num_threads_ and we assume that new work is scheduled at - // a constant rate, so we divide `kSpintCount` by number of threads and - // number of spinning threads. The constant was picked based on a fair dice - // roll, tune it. - const int spin_count = allow_spinning_ && num_threads_ > 0 ? kSpinCount / kMaxSpinningThreads / num_threads_ : 0; - if (num_threads_ == 1) { - // For num_threads_ == 1 there is no point in going through the expensive - // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the - // victim queues it might reverse the order in which ops are executed - // compared to the order in which they are scheduled, which tends to be - // counter-productive for the types of I/O workloads the single thread - // pools tend to be used for. - while (!cancelled_.load(std::memory_order_relaxed)) { - Task t = q.PopFront(); - - for (int i = 0; i < spin_count && !t.f; ++i) { - t = q.PopFront(); - } - - if (EIGEN_PREDICT_FALSE(!t.f)) { - if (!WaitForWork(waiter, &t)) { - return; - } - } - - if (EIGEN_PREDICT_TRUE(t.f)) { - env_.ExecuteTask(t); - } - } - - } else { - while (!cancelled_.load(std::memory_order_relaxed)) { - Task t = q.PopFront(); - - // Do one round of steal loop from local thread partition. - if (EIGEN_PREDICT_FALSE(!t.f)) { - t = LocalSteal(); - } - - // Do one round of steal loop from global thread partition. - if (EIGEN_PREDICT_FALSE(!t.f)) { - t = GlobalSteal(); - } - - // Maybe leave a thread spinning. This improves latency. - if (EIGEN_PREDICT_FALSE(!t.f)) { - if (allow_spinning_ && StartSpinning()) { - for (int i = 0; i < spin_count && !t.f; ++i) { - t = GlobalSteal(); - } - - // Notify `spinning_state_` that we are no longer spinning. - bool has_no_notify_task = StopSpinning(); - - // If a task was submitted to the queue without a call to - // `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned - // false), and we didn't steal anything above, we must try to - // steal one more time, to make sure that this task will be - // executed. We will not necessarily find it, because it might - // have been already stolen by some other thread. - if (has_no_notify_task && !t.f) { - t = GlobalSteal(); - } - } - } - - // If we still don't have a task, wait for one. Return if thread pool is - // in cancelled state. - if (EIGEN_PREDICT_FALSE(!t.f)) { - if (!WaitForWork(waiter, &t)) { - return; - } - } - - // Execute task if we found one. - if (EIGEN_PREDICT_TRUE(t.f)) { - env_.ExecuteTask(t); - } + // proportional to num_threads_ and we assume that new work is scheduled + // at a constant rate, so we divide `kSpintCount` by number of threads + // and number of spinning threads. The constant was picked based on a + // fair dice roll, tune it. + spin_count_ = allow_spinning_ && num_threads_ > 0 ? kSpinCount / kMaxSpinningThreads / num_threads_ : 0; + Task t; + while (!cancelled_.load(std::memory_order_relaxed)) { + MaybeGetTask(&t); + // If we still don't have a task, wait for one. Return if thread pool is + // in cancelled state. + if (EIGEN_PREDICT_FALSE(!t.f)) { + EventCount::Waiter* waiter = &waiters_[pt->thread_id]; + if (!WaitForWork(waiter, &t)) return; } + if (EIGEN_PREDICT_TRUE(t.f)) env_.ExecuteTask(t); } } diff --git a/test/threads_fork_join.cpp b/test/threads_fork_join.cpp new file mode 100644 index 000000000..941c317ed --- /dev/null +++ b/test/threads_fork_join.cpp @@ -0,0 +1,79 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2025 Weiwei Kong +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#define EIGEN_USE_THREADS +#include "main.h" +#include "Eigen/ThreadPool" + +struct TestData { + ThreadPool tp; + std::vector data; +}; + +TestData make_test_data(int num_threads, int num_shards) { + return {ThreadPool(num_threads), std::vector(num_shards, 1.0)}; +} + +static void test_unary_parallel_for(int granularity) { + // Test correctness. + const int kNumTasks = 100000; + TestData test_data = make_test_data(/*num_threads=*/4, kNumTasks); + std::atomic sum = 0.0; + std::function unary_do_fn = [&](int i) { + for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) { + }; + }; + ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(unary_do_fn), &test_data.tp); + VERIFY_IS_EQUAL(sum, kNumTasks); +} + +static void test_binary_parallel_for(int granularity) { + // Test correctness. + const int kNumTasks = 100000; + TestData test_data = make_test_data(/*num_threads=*/4, kNumTasks); + std::atomic sum = 0.0; + std::function binary_do_fn = [&](int i, int j) { + for (int k = i; k < j; ++k) + for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[k]);) { + }; + }; + ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(binary_do_fn), &test_data.tp); + VERIFY_IS_EQUAL(sum, kNumTasks); +} + +static void test_async_parallel_for() { + // Test correctness. + // NOTE: Granularity and type of `do_func` are checked in the synchronous tests. + const int kNumThreads = 4; + const int kNumTasks = 100; + const int kNumAsyncCalls = kNumThreads * 4; + TestData test_data = make_test_data(kNumThreads, kNumTasks); + std::atomic sum = 0.0; + std::function unary_do_fn = [&](int i) { + for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) { + }; + }; + Barrier barrier(kNumTasks * kNumAsyncCalls); + std::function done = [&]() { barrier.Notify(); }; + for (int k = 0; k < kNumAsyncCalls; ++k) { + test_data.tp.Schedule([&]() { + ForkJoinScheduler::ParallelForAsync(0, kNumTasks, /*granularity=*/1, unary_do_fn, done, &test_data.tp); + }); + } + barrier.Wait(); + VERIFY_IS_EQUAL(sum, kNumTasks * kNumAsyncCalls); +} + +EIGEN_DECLARE_TEST(fork_join) { + CALL_SUBTEST(test_unary_parallel_for(1)); + CALL_SUBTEST(test_unary_parallel_for(2)); + CALL_SUBTEST(test_binary_parallel_for(1)); + CALL_SUBTEST(test_binary_parallel_for(2)); + CALL_SUBTEST(test_async_parallel_for()); +}