mirror of
https://gitlab.com/libeigen/eigen.git
synced 2025-08-12 11:49:02 +08:00
Optimized the non blocking thread pool:
* Use a pseudo-random permutation of queue indices during random stealing. This ensures that all the queues are considered. * Directly pop from a non-empty queue when we are waiting for work, instead of first noticing that there is a non-empty queue and then doing another round of random stealing to re-discover the non-empty queue. * Steal only 1 task from a remote queue instead of half of tasks.
This commit is contained in:
parent
05c365fb16
commit
dc7dbc2df7
@ -23,14 +23,38 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
: env_(env),
|
: env_(env),
|
||||||
threads_(num_threads),
|
threads_(num_threads),
|
||||||
queues_(num_threads),
|
queues_(num_threads),
|
||||||
|
coprimes_(num_threads),
|
||||||
waiters_(num_threads),
|
waiters_(num_threads),
|
||||||
blocked_(),
|
blocked_(),
|
||||||
spinning_(),
|
spinning_(),
|
||||||
done_(),
|
done_(),
|
||||||
ec_(waiters_) {
|
ec_(waiters_) {
|
||||||
for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue());
|
// Calculate coprimes of num_threads.
|
||||||
for (int i = 0; i < num_threads; i++)
|
// Coprimes are used for a random walk over all threads in Steal
|
||||||
|
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
|
||||||
|
// a walk starting thread index t and calculate num_threads - 1 subsequent
|
||||||
|
// indices as (t + coprime) % num_threads, we will cover all threads without
|
||||||
|
// repetitions (effectively getting a presudo-random permutation of thread
|
||||||
|
// indices).
|
||||||
|
for (unsigned i = 1; i <= num_threads; i++) {
|
||||||
|
unsigned a = i;
|
||||||
|
unsigned b = num_threads;
|
||||||
|
// If GCD(a, b) == 1, then a and b are coprimes.
|
||||||
|
while (b != 0) {
|
||||||
|
unsigned tmp = a;
|
||||||
|
a = b;
|
||||||
|
b = tmp % b;
|
||||||
|
}
|
||||||
|
if (a == 1) {
|
||||||
|
coprimes_.push_back(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < num_threads; i++) {
|
||||||
|
queues_.push_back(new Queue());
|
||||||
|
}
|
||||||
|
for (int i = 0; i < num_threads; i++) {
|
||||||
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
|
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
~NonBlockingThreadPoolTempl() {
|
~NonBlockingThreadPoolTempl() {
|
||||||
@ -84,6 +108,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
Environment env_;
|
Environment env_;
|
||||||
MaxSizeVector<Thread*> threads_;
|
MaxSizeVector<Thread*> threads_;
|
||||||
MaxSizeVector<Queue*> queues_;
|
MaxSizeVector<Queue*> queues_;
|
||||||
|
MaxSizeVector<unsigned> coprimes_;
|
||||||
std::vector<EventCount::Waiter> waiters_;
|
std::vector<EventCount::Waiter> waiters_;
|
||||||
std::atomic<unsigned> blocked_;
|
std::atomic<unsigned> blocked_;
|
||||||
std::atomic<bool> spinning_;
|
std::atomic<bool> spinning_;
|
||||||
@ -97,82 +122,69 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
pt->index = index;
|
pt->index = index;
|
||||||
Queue* q = queues_[index];
|
Queue* q = queues_[index];
|
||||||
EventCount::Waiter* waiter = &waiters_[index];
|
EventCount::Waiter* waiter = &waiters_[index];
|
||||||
std::vector<Task> stolen;
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Task t;
|
Task t = q->PopFront();
|
||||||
if (!stolen.empty()) {
|
|
||||||
t = std::move(stolen.back());
|
|
||||||
stolen.pop_back();
|
|
||||||
}
|
|
||||||
if (!t.f) t = q->PopFront();
|
|
||||||
if (!t.f) {
|
if (!t.f) {
|
||||||
if (Steal(&stolen)) {
|
t = Steal();
|
||||||
t = std::move(stolen.back());
|
if (!t.f) {
|
||||||
stolen.pop_back();
|
// Leave one thread spinning. This reduces latency.
|
||||||
while (stolen.size()) {
|
// TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
|
||||||
Task t1 = q->PushFront(std::move(stolen.back()));
|
// Also, the time it takes to attempt to steal work 1000 times depends
|
||||||
stolen.pop_back();
|
// on the size of the thread pool. However the speed at which the user
|
||||||
if (t1.f) {
|
// of the thread pool submit tasks is independent of the size of the
|
||||||
// There is not much we can do in this case. Just execute the
|
// pool. Consider a time based limit instead.
|
||||||
// remaining directly.
|
if (!spinning_ && !spinning_.exchange(true)) {
|
||||||
stolen.push_back(std::move(t1));
|
for (int i = 0; i < 1000 && !t.f; i++) {
|
||||||
break;
|
t = Steal();
|
||||||
|
}
|
||||||
|
spinning_ = false;
|
||||||
|
}
|
||||||
|
if (!t.f) {
|
||||||
|
if (!WaitForWork(waiter, &t)) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (t.f) {
|
if (t.f) {
|
||||||
env_.ExecuteTask(t);
|
env_.ExecuteTask(t);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
// Leave one thread spinning. This reduces latency.
|
|
||||||
if (!spinning_ && !spinning_.exchange(true)) {
|
|
||||||
bool nowork = true;
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
|
||||||
if (!OutOfWork()) {
|
|
||||||
nowork = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
spinning_ = false;
|
|
||||||
if (!nowork) continue;
|
|
||||||
}
|
|
||||||
if (!WaitForWork(waiter)) return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Steal tries to steal work from other worker threads in best-effort manner.
|
// Steal tries to steal work from other worker threads in best-effort manner.
|
||||||
bool Steal(std::vector<Task>* stolen) {
|
Task Steal() {
|
||||||
if (queues_.size() == 1) return false;
|
|
||||||
PerThread* pt = GetPerThread();
|
PerThread* pt = GetPerThread();
|
||||||
unsigned lastq = pt->index;
|
unsigned size = queues_.size();
|
||||||
for (unsigned i = queues_.size(); i > 0; i--) {
|
unsigned r = Rand(&pt->rand);
|
||||||
unsigned victim = Rand(&pt->rand) % queues_.size();
|
unsigned inc = coprimes_[r % coprimes_.size()];
|
||||||
if (victim == lastq && queues_.size() > 2) {
|
unsigned victim = r % size;
|
||||||
i++;
|
for (unsigned i = 0; i < size; i++) {
|
||||||
continue;
|
Task t = queues_[victim]->PopBack();
|
||||||
|
if (t.f) {
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
victim += inc;
|
||||||
|
if (victim >= size) {
|
||||||
|
victim -= size;
|
||||||
}
|
}
|
||||||
// Steal half of elements from a victim queue.
|
|
||||||
// It is typical to steal just one element, but that assumes that work is
|
|
||||||
// recursively subdivided in halves so that the stolen element is exactly
|
|
||||||
// half of work. If work elements are equally-sized, then is makes sense
|
|
||||||
// to steal half of elements at once and then work locally for a while.
|
|
||||||
if (queues_[victim]->PopBackHalf(stolen)) return true;
|
|
||||||
lastq = victim;
|
|
||||||
}
|
}
|
||||||
// Just to make sure that we did not miss anything.
|
return Task();
|
||||||
for (unsigned i = queues_.size(); i > 0; i--)
|
|
||||||
if (queues_[i - 1]->PopBackHalf(stolen)) return true;
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForWork blocks until new work is available, or if it is time to exit.
|
// WaitForWork blocks until new work is available (returns true), or if it is
|
||||||
bool WaitForWork(EventCount::Waiter* waiter) {
|
// time to exit (returns false). Can optionally return a task to execute in t
|
||||||
// We already did best-effort emptiness check in Steal, so prepare blocking.
|
// (in such case t.f != nullptr on return).
|
||||||
|
bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
|
||||||
|
eigen_assert(!t->f);
|
||||||
|
// We already did best-effort emptiness check in Steal, so prepare for
|
||||||
|
// blocking.
|
||||||
ec_.Prewait(waiter);
|
ec_.Prewait(waiter);
|
||||||
// Now do reliable emptiness check.
|
// Now do a reliable emptiness check.
|
||||||
if (!OutOfWork()) {
|
int victim = NonEmptyQueueIndex();
|
||||||
|
if (victim != -1) {
|
||||||
ec_.CancelWait(waiter);
|
ec_.CancelWait(waiter);
|
||||||
|
*t = queues_[victim]->PopBack();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// Number of blocked threads is used as termination condition.
|
// Number of blocked threads is used as termination condition.
|
||||||
@ -186,7 +198,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
// right after incrementing blocked_ above. Now a free-standing thread
|
// right after incrementing blocked_ above. Now a free-standing thread
|
||||||
// submits work and calls destructor (which sets done_). If we don't
|
// submits work and calls destructor (which sets done_). If we don't
|
||||||
// re-check queues, we will exit leaving the work unexecuted.
|
// re-check queues, we will exit leaving the work unexecuted.
|
||||||
if (!OutOfWork()) {
|
if (NonEmptyQueueIndex() != -1) {
|
||||||
// Note: we must not pop from queues before we decrement blocked_,
|
// Note: we must not pop from queues before we decrement blocked_,
|
||||||
// otherwise the following scenario is possible. Consider that instead
|
// otherwise the following scenario is possible. Consider that instead
|
||||||
// of checking for emptiness we popped the only element from queues.
|
// of checking for emptiness we popped the only element from queues.
|
||||||
@ -205,10 +217,22 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool OutOfWork() {
|
int NonEmptyQueueIndex() {
|
||||||
for (unsigned i = 0; i < queues_.size(); i++)
|
PerThread* pt = GetPerThread();
|
||||||
if (!queues_[i]->Empty()) return false;
|
unsigned size = queues_.size();
|
||||||
return true;
|
unsigned r = Rand(&pt->rand);
|
||||||
|
unsigned inc = coprimes_[r % coprimes_.size()];
|
||||||
|
unsigned victim = r % size;
|
||||||
|
for (unsigned i = 0; i < size; i++) {
|
||||||
|
if (!queues_[victim]->Empty()) {
|
||||||
|
return victim;
|
||||||
|
}
|
||||||
|
victim += inc;
|
||||||
|
if (victim >= size) {
|
||||||
|
victim -= size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
PerThread* GetPerThread() {
|
PerThread* GetPerThread() {
|
||||||
|
@ -100,7 +100,7 @@ class RunQueue {
|
|||||||
// PopBack removes and returns the last elements in the queue.
|
// PopBack removes and returns the last elements in the queue.
|
||||||
// Can fail spuriously.
|
// Can fail spuriously.
|
||||||
Work PopBack() {
|
Work PopBack() {
|
||||||
if (Empty()) return 0;
|
if (Empty()) return Work();
|
||||||
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
|
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
|
||||||
if (!lock) return Work();
|
if (!lock) return Work();
|
||||||
unsigned back = back_.load(std::memory_order_relaxed);
|
unsigned back = back_.load(std::memory_order_relaxed);
|
||||||
|
@ -100,6 +100,14 @@ void test_basic_runqueue()
|
|||||||
// Empty again.
|
// Empty again.
|
||||||
VERIFY(q.Empty());
|
VERIFY(q.Empty());
|
||||||
VERIFY_IS_EQUAL(0u, q.Size());
|
VERIFY_IS_EQUAL(0u, q.Size());
|
||||||
|
VERIFY_IS_EQUAL(0, q.PushFront(1));
|
||||||
|
VERIFY_IS_EQUAL(0, q.PushFront(2));
|
||||||
|
VERIFY_IS_EQUAL(0, q.PushFront(3));
|
||||||
|
VERIFY_IS_EQUAL(1, q.PopBack());
|
||||||
|
VERIFY_IS_EQUAL(2, q.PopBack());
|
||||||
|
VERIFY_IS_EQUAL(3, q.PopBack());
|
||||||
|
VERIFY(q.Empty());
|
||||||
|
VERIFY_IS_EQUAL(0, q.Size());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty tests that the queue is not claimed to be empty when is is in fact not.
|
// Empty tests that the queue is not claimed to be empty when is is in fact not.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user