Add a ForkJoin-based ParallelFor algorithm to the ThreadPool module

This commit is contained in:
William Kong 2025-01-24 22:12:05 +00:00 committed by Rasmus Munk Larsen
parent e986838464
commit 4a6ac97d13
4 changed files with 295 additions and 94 deletions

View File

@ -72,6 +72,7 @@
#include "src/ThreadPool/Barrier.h"
#include "src/ThreadPool/NonBlockingThreadPool.h"
#include "src/ThreadPool/CoreThreadPoolDevice.h"
#include "src/ThreadPool/ForkJoin.h"
// IWYU pragma: end_exports
#include "src/Core/util/ReenableStupidWarnings.h"

View File

@ -0,0 +1,151 @@
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2025 Weiwei Kong <weiweikong@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef EIGEN_THREADPOOL_FORKJOIN_H
#define EIGEN_THREADPOOL_FORKJOIN_H
// IWYU pragma: private
#include "./InternalHeaderCheck.h"
namespace Eigen {
// ForkJoinScheduler provides implementations of various non-blocking ParallelFor algorithms for unary
// and binary parallel tasks. More specfically, the implementations follow the binary tree-based
// algorithm from the following paper:
//
// Lea, D. (2000, June). A java fork/join framework. *In Proceedings of the
// ACM 2000 conference on Java Grande* (pp. 36-43).
//
// For a given binary task function `f(i,j)` and integers `num_threads`, `granularity`, `start`, and `end`,
// the implemented parallel for algorithm schedules and excutes at most `num_threads` of the functions
// from the following set in parallel (either synchronously or asynchronously):
//
// f(start,start+s_1), f(start+s_1,start+s_2), ..., f(start+s_n,end)
//
// where `s_{j+1} - s_{j}` and `end - s_n` are roughly within a factor of two of `granularity`. For a unary
// task function `g(k)`, the same operation is applied with
//
// f(i,j) = [&](){ for(int k=i; k<j; ++k) g(k); };
//
// Note that the parameter `granularity` should be tuned by the user based on the trade-off of running the
// given task function sequentially vs. scheduling individual tasks in parallel. An example of a partially
// tuned `granularity` is in `Eigen::CoreThreadPoolDevice::parallelFor(...)` where the template
// parameter `PacketSize` and float input `cost` are used to indirectly compute a granularity level for a
// given task function.
//
// Example usage #1 (synchronous):
// ```
// ThreadPool thread_pool(num_threads);
// ForkJoinScheduler::ParallelFor(0, num_tasks, granularity, std::move(parallel_task), &thread_pool);
// ```
//
// Example usage #2 (asynchronous):
// ```
// ThreadPool thread_pool(num_threads);
// Barrier barrier(num_completions);
// auto done = [&](){barrier.Notify();};
// for (int k=0; k<num_async_calls; ++k) {
// thread_pool.Schedule([&](){
// ForkJoinScheduler::ParallelFor(0, num_tasks, granularity, parallel_task, done, &thread_pool);
// });
// }
// barrier.Wait();
// ```
class ForkJoinScheduler {
public:
// Runs `do_func` asynchronously for the range [start, end) with a specified granularity. `do_func` should
// either be of type `std::function<void(int)>` or `std::function<void(int, int)`.
// If `end > start`, the `done` callback will be called `end - start` times when all tasks have been
// executed. Otherwise, `done` is called only once.
template <typename DoFnType>
static void ParallelForAsync(int start, int end, int granularity, DoFnType do_func, std::function<void()> done,
Eigen::ThreadPool* thread_pool) {
if (start >= end) {
done();
return;
}
ForkJoinScheduler::RunParallelForAsync(start, end, granularity, do_func, done, thread_pool);
}
// Synchronous variant of Async::ParallelFor.
template <typename DoFnType>
static void ParallelFor(int start, int end, int granularity, DoFnType do_func, Eigen::ThreadPool* thread_pool) {
if (start >= end) return;
auto dummy_done = []() {};
Barrier barrier(1);
thread_pool->Schedule([start, end, granularity, thread_pool, &do_func, &dummy_done, &barrier]() {
ForkJoinScheduler::ParallelForAsync(start, end, granularity, do_func, dummy_done, thread_pool);
barrier.Notify();
});
barrier.Wait();
}
private:
// Schedules `right_thunk`, runs `left_thunk` and runs other tasks until
// `right_thunk` has finished.
template <typename LeftType, typename RightType>
static void ForkJoin(LeftType&& left_thunk, RightType&& right_thunk, Eigen::ThreadPool* thread_pool) {
std::atomic<bool> right_done(false);
auto execute_right = [&right_thunk, &right_done]() {
std::forward<RightType>(right_thunk)();
right_done.store(true, std::memory_order_release);
};
thread_pool->Schedule(execute_right);
std::forward<LeftType>(left_thunk)();
Eigen::ThreadPool::Task task;
while (!right_done.load(std::memory_order_acquire)) {
thread_pool->MaybeGetTask(&task);
if (task.f) task.f();
}
}
// Runs `do_func` in parallel for the range [start, end). The main recursive asynchronous runner that
// calls `ForkJoin`.
static void RunParallelForAsync(int start, int end, int granularity, std::function<void(int)>& do_func,
std::function<void()>& done, Eigen::ThreadPool* thread_pool) {
std::function<void(int, int)> wrapped_do_func = [&do_func](int start, int end) {
for (int i = start; i < end; ++i) do_func(i);
};
ForkJoinScheduler::RunParallelForAsync(start, end, granularity, wrapped_do_func, done, thread_pool);
}
// Variant of `RunAsyncParallelFor` that uses a do function that operates on an index range.
// Specifically, `do_func` takes two arguments: the start and end of the range.
static void RunParallelForAsync(int start, int end, int granularity, std::function<void(int, int)>& do_func,
std::function<void()>& done, Eigen::ThreadPool* thread_pool) {
if ((end - start) <= granularity) {
do_func(start, end);
for (int j = 0; j < end - start; ++j) done();
} else {
// Typical workloads choose initial values of `{start, end, granularity}` such that `start - end` and
// `granularity` are powers of two. Since modern processors usually implement (2^x)-way
// set-associative caches, we minimize the number of cache misses by choosing midpoints that are not
// powers of two (to avoid having two addresses in the main memory pointing to the same point in the
// cache). More specifically, we restrict the set of candidate midpoints to:
//
// P := {start, start + granularity, start + 2*granularity, ..., end},
//
// and choose the entry in `P` at (roughly) the 9/16 mark.
const int size = end - start;
const int mid = start + Eigen::numext::div_ceil(9 * (size + 1) / 16, granularity) * granularity;
ForkJoinScheduler::ForkJoin(
[start, mid, granularity, &do_func, &done, thread_pool]() {
RunParallelForAsync(start, mid, granularity, do_func, done, thread_pool);
},
[mid, end, granularity, &do_func, &done, thread_pool]() {
RunParallelForAsync(mid, end, granularity, do_func, done, thread_pool);
},
thread_pool);
}
}
};
} // namespace Eigen
#endif // EIGEN_THREADPOOL_FORKJOIN_H

View File

@ -18,9 +18,24 @@ namespace Eigen {
template <typename Environment>
class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
public:
typedef typename Environment::EnvThread Thread;
typedef typename Environment::Task Task;
typedef RunQueue<Task, 1024> Queue;
struct PerThread {
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
};
struct ThreadData {
constexpr ThreadData() : thread(), steal_partition(0), queue() {}
std::unique_ptr<Thread> thread;
std::atomic<unsigned> steal_partition;
Queue queue;
};
ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment())
@ -31,6 +46,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
all_coprimes_(num_threads),
waiters_(num_threads),
global_steal_partition_(EncodePartition(0, num_threads_)),
spin_count_(0),
spinning_state_(0),
blocked_(0),
done_(false),
@ -133,6 +149,39 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
}
// Tries to assign work to the current task.
void MaybeGetTask(Task* t) {
PerThread* pt = GetPerThread();
Queue& q = thread_data_[pt->thread_id].queue;
if (*t = q.PopFront(); t->f) return;
if (num_threads_ == 1) {
// For num_threads_ == 1 there is no point in going through the expensive
// steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
// victim queues it might reverse the order in which ops are executed
// compared to the order in which they are scheduled, which tends to be
// counter-productive for the types of I/O workloads single thread pools
// tend to be used for.
for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront();
} else {
if (EIGEN_PREDICT_FALSE(!t->f)) *t = LocalSteal();
if (EIGEN_PREDICT_FALSE(!t->f)) *t = GlobalSteal();
if (EIGEN_PREDICT_FALSE(!t->f)) {
if (allow_spinning_ && StartSpinning()) {
for (int i = 0; i < spin_count_ && !t->f; ++i) *t = q.PopFront();
// Notify `spinning_state_` that we are no longer spinning.
bool has_no_notify_task = StopSpinning();
// If a task was submitted to the queue without a call to
// `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned
// false), and we didn't steal anything above, we must try to
// steal one more time, to make sure that this task will be
// executed. We will not necessarily find it, because it might
// have been already stolen by some other thread.
if (has_no_notify_task && !t->f) *t = q.PopFront();
}
}
}
}
void Cancel() EIGEN_OVERRIDE {
cancelled_ = true;
done_ = true;
@ -206,22 +255,6 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
}
typedef typename Environment::EnvThread Thread;
struct PerThread {
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
};
struct ThreadData {
constexpr ThreadData() : thread(), steal_partition(0), queue() {}
std::unique_ptr<Thread> thread;
std::atomic<unsigned> steal_partition;
Queue queue;
};
// Maximum number of threads that can spin in steal loop.
static constexpr int kMaxSpinningThreads = 1;
@ -277,6 +310,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
Environment env_;
const int num_threads_;
const bool allow_spinning_;
int spin_count_;
MaxSizeVector<ThreadData> thread_data_;
MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
@ -314,85 +348,21 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
Queue& q = thread_data_[thread_id].queue;
EventCount::Waiter* waiter = &waiters_[thread_id];
// TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
// proportional to num_threads_ and we assume that new work is scheduled at
// a constant rate, so we divide `kSpintCount` by number of threads and
// number of spinning threads. The constant was picked based on a fair dice
// roll, tune it.
const int spin_count = allow_spinning_ && num_threads_ > 0 ? kSpinCount / kMaxSpinningThreads / num_threads_ : 0;
if (num_threads_ == 1) {
// For num_threads_ == 1 there is no point in going through the expensive
// steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
// victim queues it might reverse the order in which ops are executed
// compared to the order in which they are scheduled, which tends to be
// counter-productive for the types of I/O workloads the single thread
// pools tend to be used for.
while (!cancelled_.load(std::memory_order_relaxed)) {
Task t = q.PopFront();
for (int i = 0; i < spin_count && !t.f; ++i) {
t = q.PopFront();
}
if (EIGEN_PREDICT_FALSE(!t.f)) {
if (!WaitForWork(waiter, &t)) {
return;
}
}
if (EIGEN_PREDICT_TRUE(t.f)) {
env_.ExecuteTask(t);
}
}
} else {
while (!cancelled_.load(std::memory_order_relaxed)) {
Task t = q.PopFront();
// Do one round of steal loop from local thread partition.
if (EIGEN_PREDICT_FALSE(!t.f)) {
t = LocalSteal();
}
// Do one round of steal loop from global thread partition.
if (EIGEN_PREDICT_FALSE(!t.f)) {
t = GlobalSteal();
}
// Maybe leave a thread spinning. This improves latency.
if (EIGEN_PREDICT_FALSE(!t.f)) {
if (allow_spinning_ && StartSpinning()) {
for (int i = 0; i < spin_count && !t.f; ++i) {
t = GlobalSteal();
}
// Notify `spinning_state_` that we are no longer spinning.
bool has_no_notify_task = StopSpinning();
// If a task was submitted to the queue without a call to
// `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned
// false), and we didn't steal anything above, we must try to
// steal one more time, to make sure that this task will be
// executed. We will not necessarily find it, because it might
// have been already stolen by some other thread.
if (has_no_notify_task && !t.f) {
t = GlobalSteal();
}
}
}
// If we still don't have a task, wait for one. Return if thread pool is
// in cancelled state.
if (EIGEN_PREDICT_FALSE(!t.f)) {
if (!WaitForWork(waiter, &t)) {
return;
}
}
// Execute task if we found one.
if (EIGEN_PREDICT_TRUE(t.f)) {
env_.ExecuteTask(t);
}
// proportional to num_threads_ and we assume that new work is scheduled
// at a constant rate, so we divide `kSpintCount` by number of threads
// and number of spinning threads. The constant was picked based on a
// fair dice roll, tune it.
spin_count_ = allow_spinning_ && num_threads_ > 0 ? kSpinCount / kMaxSpinningThreads / num_threads_ : 0;
Task t;
while (!cancelled_.load(std::memory_order_relaxed)) {
MaybeGetTask(&t);
// If we still don't have a task, wait for one. Return if thread pool is
// in cancelled state.
if (EIGEN_PREDICT_FALSE(!t.f)) {
EventCount::Waiter* waiter = &waiters_[pt->thread_id];
if (!WaitForWork(waiter, &t)) return;
}
if (EIGEN_PREDICT_TRUE(t.f)) env_.ExecuteTask(t);
}
}

View File

@ -0,0 +1,79 @@
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2025 Weiwei Kong <weiweikong@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
#define EIGEN_USE_THREADS
#include "main.h"
#include "Eigen/ThreadPool"
struct TestData {
ThreadPool tp;
std::vector<double> data;
};
TestData make_test_data(int num_threads, int num_shards) {
return {ThreadPool(num_threads), std::vector<double>(num_shards, 1.0)};
}
static void test_unary_parallel_for(int granularity) {
// Test correctness.
const int kNumTasks = 100000;
TestData test_data = make_test_data(/*num_threads=*/4, kNumTasks);
std::atomic<double> sum = 0.0;
std::function<void(int)> unary_do_fn = [&](int i) {
for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) {
};
};
ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(unary_do_fn), &test_data.tp);
VERIFY_IS_EQUAL(sum, kNumTasks);
}
static void test_binary_parallel_for(int granularity) {
// Test correctness.
const int kNumTasks = 100000;
TestData test_data = make_test_data(/*num_threads=*/4, kNumTasks);
std::atomic<double> sum = 0.0;
std::function<void(int, int)> binary_do_fn = [&](int i, int j) {
for (int k = i; k < j; ++k)
for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[k]);) {
};
};
ForkJoinScheduler::ParallelFor(0, kNumTasks, granularity, std::move(binary_do_fn), &test_data.tp);
VERIFY_IS_EQUAL(sum, kNumTasks);
}
static void test_async_parallel_for() {
// Test correctness.
// NOTE: Granularity and type of `do_func` are checked in the synchronous tests.
const int kNumThreads = 4;
const int kNumTasks = 100;
const int kNumAsyncCalls = kNumThreads * 4;
TestData test_data = make_test_data(kNumThreads, kNumTasks);
std::atomic<double> sum = 0.0;
std::function<void(int)> unary_do_fn = [&](int i) {
for (double new_sum = sum; !sum.compare_exchange_weak(new_sum, new_sum + test_data.data[i]);) {
};
};
Barrier barrier(kNumTasks * kNumAsyncCalls);
std::function<void()> done = [&]() { barrier.Notify(); };
for (int k = 0; k < kNumAsyncCalls; ++k) {
test_data.tp.Schedule([&]() {
ForkJoinScheduler::ParallelForAsync(0, kNumTasks, /*granularity=*/1, unary_do_fn, done, &test_data.tp);
});
}
barrier.Wait();
VERIFY_IS_EQUAL(sum, kNumTasks * kNumAsyncCalls);
}
EIGEN_DECLARE_TEST(fork_join) {
CALL_SUBTEST(test_unary_parallel_for(1));
CALL_SUBTEST(test_unary_parallel_for(2));
CALL_SUBTEST(test_binary_parallel_for(1));
CALL_SUBTEST(test_binary_parallel_for(2));
CALL_SUBTEST(test_async_parallel_for());
}