Make the non-blocking threadpool more flexible and less wasteful of CPU cycles for high-latency use-cases.

* Adds a hint to ThreadPool allowing us to turn off spin waiting. Currently each reader and record yielder op in a graph creates a threadpool with a thread that spins for 1000 iterations through the work stealing loop before yielding. This is wasteful for such ops that process I/O.

* This also changes the number of iterations through the steal loop to be inversely proportional to the number of threads. Since the time of each iteration is proportional to the number of threads, this yields roughly a constant spin time.

* Implement a separate worker loop for the num_threads == 1 case since there is no point in going through the expensive steal loop. Moreover, since Steal() calls PopBack() on the victim queues it might reverse the order in which ops are executed, compared to the order in which they are scheduled, which is usually counter-productive for the types of I/O workloads the single thread pools tend to be used for.

* Store num_threads in a member variable for simplicity and to avoid a data race between the thread creation loop and worker threads calling threads_.size().
This commit is contained in:
Rasmus Munk Larsen 2017-03-09 15:41:03 -08:00
parent 970ff78294
commit 344c2694a6
2 changed files with 111 additions and 59 deletions

View File

@ -20,7 +20,9 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
typedef RunQueue<Task, 1024> Queue; typedef RunQueue<Task, 1024> Queue;
NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment()) NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
: env_(env), : num_threads_(num_threads),
allow_spinning_(true),
env_(env),
threads_(num_threads), threads_(num_threads),
queues_(num_threads), queues_(num_threads),
coprimes_(num_threads), coprimes_(num_threads),
@ -30,34 +32,24 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
done_(false), done_(false),
cancelled_(false), cancelled_(false),
ec_(waiters_) { ec_(waiters_) {
waiters_.resize(num_threads); Init();
}
// Calculate coprimes of num_threads. NonBlockingThreadPoolTempl(int num_threads, bool allow_spinning,
// Coprimes are used for a random walk over all threads in Steal Environment env = Environment())
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take : num_threads_(num_threads),
// a walk starting thread index t and calculate num_threads - 1 subsequent allow_spinning_(allow_spinning),
// indices as (t + coprime) % num_threads, we will cover all threads without env_(env),
// repetitions (effectively getting a presudo-random permutation of thread threads_(num_threads),
// indices). queues_(num_threads),
for (int i = 1; i <= num_threads; i++) { coprimes_(num_threads),
unsigned a = i; waiters_(num_threads),
unsigned b = num_threads; blocked_(0),
// If GCD(a, b) == 1, then a and b are coprimes. spinning_(0),
while (b != 0) { done_(false),
unsigned tmp = a; cancelled_(false),
a = b; ec_(waiters_) {
b = tmp % b; Init();
}
if (a == 1) {
coprimes_.push_back(i);
}
}
for (int i = 0; i < num_threads; i++) {
queues_.push_back(new Queue());
}
for (int i = 0; i < num_threads; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
} }
~NonBlockingThreadPoolTempl() { ~NonBlockingThreadPoolTempl() {
@ -77,8 +69,8 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
} }
// Join threads explicitly to avoid destruction order issues. // Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; for (size_t i = 0; i < num_threads_; i++) delete threads_[i];
for (size_t i = 0; i < threads_.size(); i++) delete queues_[i]; for (size_t i = 0; i < num_threads_; i++) delete queues_[i];
} }
void Schedule(std::function<void()> fn) { void Schedule(std::function<void()> fn) {
@ -125,7 +117,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
} }
int NumThreads() const final { int NumThreads() const final {
return static_cast<int>(threads_.size()); return num_threads_;
} }
int CurrentThreadId() const final { int CurrentThreadId() const final {
@ -149,6 +141,8 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
}; };
Environment env_; Environment env_;
const int num_threads_;
const bool allow_spinning_;
MaxSizeVector<Thread*> threads_; MaxSizeVector<Thread*> threads_;
MaxSizeVector<Queue*> queues_; MaxSizeVector<Queue*> queues_;
MaxSizeVector<unsigned> coprimes_; MaxSizeVector<unsigned> coprimes_;
@ -159,6 +153,37 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<bool> cancelled_; std::atomic<bool> cancelled_;
EventCount ec_; EventCount ec_;
void Init() {
waiters_.resize(num_threads_);
// Calculate coprimes of num_threads_.
// Coprimes are used for a random walk over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
// a walk starting thread index t and calculate num_threads - 1 subsequent
// indices as (t + coprime) % num_threads, we will cover all threads without
// repetitions (effectively getting a presudo-random permutation of thread
// indices).
for (int i = 1; i <= num_threads_; i++) {
unsigned a = i;
unsigned b = num_threads_;
// If GCD(a, b) == 1, then a and b are coprimes.
while (b != 0) {
unsigned tmp = a;
a = b;
b = tmp % b;
}
if (a == 1) {
coprimes_.push_back(i);
}
}
for (int i = 0; i < num_threads_; i++) {
queues_.push_back(new Queue());
}
for (int i = 0; i < num_threads_; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
}
// Main worker thread loop. // Main worker thread loop.
void WorkerLoop(int thread_id) { void WorkerLoop(int thread_id) {
PerThread* pt = GetPerThread(); PerThread* pt = GetPerThread();
@ -167,19 +192,44 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
pt->thread_id = thread_id; pt->thread_id = thread_id;
Queue* q = queues_[thread_id]; Queue* q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id];
// TODO(dvyukov,rmlarsen): The time spent in Steal() is proportional
// to num_threads_ and we assume that new work is scheduled at a
// constant rate, so we set spin_count to 5000 / num_threads_. The
// constant was picked based on a fair dice roll, tune it.
const int spin_count =
allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
if (num_threads_ == 1) {
// For num_threads_ == 1 there is no point in going through the expensive
// steal loop. Moreover, since Steal() calls PopBack() on the victim
// queues it might reverse the order in which ops are executed compared to
// the order in which they are scheduled, which tends to be
// counter-productive for the types of I/O workloads the single thread
// pools tend to be used for.
while (!cancelled_) {
Task t = q->PopFront();
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
t = q->PopFront();
}
}
if (!t.f) {
if (!WaitForWork(waiter, &t)) {
return;
}
}
if (t.f) {
env_.ExecuteTask(t);
}
}
} else {
while (!cancelled_) { while (!cancelled_) {
Task t = q->PopFront(); Task t = q->PopFront();
if (!t.f) { if (!t.f) {
t = Steal(); t = Steal();
if (!t.f) { if (!t.f) {
// Leave one thread spinning. This reduces latency. // Leave one thread spinning. This reduces latency.
// TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it. if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
// Also, the time it takes to attempt to steal work 1000 times depends for (int i = 0; i < spin_count && !t.f; i++) {
// on the size of the thread pool. However the speed at which the user
// of the thread pool submit tasks is independent of the size of the
// pool. Consider a time based limit instead.
if (!spinning_ && !spinning_.exchange(true)) {
for (int i = 0; i < 1000 && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) { if (!cancelled_.load(std::memory_order_relaxed)) {
t = Steal(); t = Steal();
} else { } else {
@ -200,6 +250,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
} }
} }
} }
}
// Steal tries to steal work from other worker threads in best-effort manner. // Steal tries to steal work from other worker threads in best-effort manner.
Task Steal() { Task Steal() {
@ -244,7 +295,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// If we are shutting down and all worker threads blocked without work, // If we are shutting down and all worker threads blocked without work,
// that's we are done. // that's we are done.
blocked_++; blocked_++;
if (done_ && blocked_ == threads_.size()) { if (done_ && blocked_ == num_threads_) {
ec_.CancelWait(waiter); ec_.CancelWait(waiter);
// Almost done, but need to re-check queues. // Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted // Consider that all queues are empty and all worker threads are preempted

View File

@ -23,11 +23,11 @@ static void test_create_destroy_empty_pool()
} }
static void test_parallelism() static void test_parallelism(bool allow_spinning)
{ {
// Test we never-ever fail to match available tasks with idle threads. // Test we never-ever fail to match available tasks with idle threads.
const int kThreads = 16; // code below expects that this is a multiple of 4 const int kThreads = 16; // code below expects that this is a multiple of 4
NonBlockingThreadPool tp(kThreads); NonBlockingThreadPool tp(kThreads, allow_spinning);
VERIFY_IS_EQUAL(tp.NumThreads(), kThreads); VERIFY_IS_EQUAL(tp.NumThreads(), kThreads);
VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1); VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1);
for (int iter = 0; iter < 100; ++iter) { for (int iter = 0; iter < 100; ++iter) {
@ -119,6 +119,7 @@ static void test_cancel()
void test_cxx11_non_blocking_thread_pool() void test_cxx11_non_blocking_thread_pool()
{ {
CALL_SUBTEST(test_create_destroy_empty_pool()); CALL_SUBTEST(test_create_destroy_empty_pool());
CALL_SUBTEST(test_parallelism()); CALL_SUBTEST(test_parallelism(true));
CALL_SUBTEST(test_parallelism(false));
CALL_SUBTEST(test_cancel()); CALL_SUBTEST(test_cancel());
} }