Merged in rmlarsen/eigen_threadpool (pull request PR-640)

Fix deadlocks in thread pool.

Approved-by: Eugene Zhulenev <ezhulenev@google.com>
This commit is contained in:
Rasmus Larsen 2019-05-13 20:04:35 +00:00
commit c8d8d5c0fc
4 changed files with 18 additions and 23 deletions

View File

@ -20,8 +20,7 @@ namespace Eigen {
// if (predicate) // if (predicate)
// return act(); // return act();
// EventCount::Waiter& w = waiters[my_index]; // EventCount::Waiter& w = waiters[my_index];
// if (!ec.Prewait(&w)) // ec.Prewait(&w);
// return act();
// if (predicate) { // if (predicate) {
// ec.CancelWait(&w); // ec.CancelWait(&w);
// return act(); // return act();
@ -62,23 +61,17 @@ class EventCount {
} }
// Prewait prepares for waiting. // Prewait prepares for waiting.
// If Prewait returns true, the thread must re-check the wait predicate // After calling Prewait, the thread must re-check the wait predicate
// and then call either CancelWait or CommitWait. // and then call either CancelWait or CommitWait.
// Otherwise, the thread should assume the predicate may be true void Prewait() {
// and don't call CancelWait/CommitWait (there was a concurrent Notify call).
bool Prewait() {
uint64_t state = state_.load(std::memory_order_relaxed); uint64_t state = state_.load(std::memory_order_relaxed);
for (;;) { for (;;) {
CheckState(state); CheckState(state);
uint64_t newstate = state + kWaiterInc; uint64_t newstate = state + kWaiterInc;
if ((state & kSignalMask) != 0) {
// Consume the signal and cancel waiting.
newstate -= kSignalInc + kWaiterInc;
}
CheckState(newstate); CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate, if (state_.compare_exchange_weak(state, newstate,
std::memory_order_seq_cst)) std::memory_order_seq_cst))
return (state & kSignalMask) == 0; return;
} }
} }
@ -118,8 +111,13 @@ class EventCount {
for (;;) { for (;;) {
CheckState(state, true); CheckState(state, true);
uint64_t newstate = state - kWaiterInc; uint64_t newstate = state - kWaiterInc;
// Also take away a signal if any. // We don't know if the thread was also notified or not,
if ((state & kSignalMask) != 0) newstate -= kSignalInc; // so we should not consume a signal unconditionaly.
// Only if number of waiters is equal to number of signals,
// we know that the thread was notified and we must take away the signal.
if (((state & kWaiterMask) >> kWaiterShift) ==
((state & kSignalMask) >> kSignalShift))
newstate -= kSignalInc;
CheckState(newstate); CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate, if (state_.compare_exchange_weak(state, newstate,
std::memory_order_acq_rel)) std::memory_order_acq_rel))

View File

@ -379,7 +379,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
eigen_plain_assert(!t->f); eigen_plain_assert(!t->f);
// We already did best-effort emptiness check in Steal, so prepare for // We already did best-effort emptiness check in Steal, so prepare for
// blocking. // blocking.
if (!ec_.Prewait()) return true; ec_.Prewait();
// Now do a reliable emptiness check. // Now do a reliable emptiness check.
int victim = NonEmptyQueueIndex(); int victim = NonEmptyQueueIndex();
if (victim != -1) { if (victim != -1) {

View File

@ -97,11 +97,9 @@ class RunQueue {
} }
// PopBack removes and returns the last elements in the queue. // PopBack removes and returns the last elements in the queue.
// Can fail spuriously.
Work PopBack() { Work PopBack() {
if (Empty()) return Work(); if (Empty()) return Work();
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); std::unique_lock<std::mutex> lock(mutex_);
if (!lock) return Work();
unsigned back = back_.load(std::memory_order_relaxed); unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[back & kMask]; Elem* e = &array_[back & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed); uint8_t s = e->state.load(std::memory_order_relaxed);
@ -115,11 +113,10 @@ class RunQueue {
} }
// PopBackHalf removes and returns half last elements in the queue. // PopBackHalf removes and returns half last elements in the queue.
// Returns number of elements removed. But can also fail spuriously. // Returns number of elements removed.
unsigned PopBackHalf(std::vector<Work>* result) { unsigned PopBackHalf(std::vector<Work>* result) {
if (Empty()) return 0; if (Empty()) return 0;
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); std::unique_lock<std::mutex> lock(mutex_);
if (!lock) return 0;
unsigned back = back_.load(std::memory_order_relaxed); unsigned back = back_.load(std::memory_order_relaxed);
unsigned size = Size(); unsigned size = Size();
unsigned mid = back; unsigned mid = back;

View File

@ -30,10 +30,10 @@ static void test_basic_eventcount()
EventCount ec(waiters); EventCount ec(waiters);
EventCount::Waiter& w = waiters[0]; EventCount::Waiter& w = waiters[0];
ec.Notify(false); ec.Notify(false);
VERIFY(ec.Prewait()); ec.Prewait();
ec.Notify(true); ec.Notify(true);
ec.CommitWait(&w); ec.CommitWait(&w);
VERIFY(ec.Prewait()); ec.Prewait();
ec.CancelWait(); ec.CancelWait();
} }
@ -112,7 +112,7 @@ static void test_stress_eventcount()
unsigned idx = rand_reentrant(&rnd) % kQueues; unsigned idx = rand_reentrant(&rnd) % kQueues;
if (queues[idx].Pop()) continue; if (queues[idx].Pop()) continue;
j--; j--;
if (!ec.Prewait()) continue; ec.Prewait();
bool empty = true; bool empty = true;
for (int q = 0; q < kQueues; q++) { for (int q = 0; q < kQueues; q++) {
if (!queues[q].Empty()) { if (!queues[q].Empty()) {