Reworked the threadpool cancellation mechanism to not depend on pthread_cancel since it turns out that pthread_cancel doesn't work properly on numerous platforms.

This commit is contained in:
Benoit Steiner 2016-12-09 13:05:14 -08:00
parent 3d59a47720
commit 2f5b7a199b
5 changed files with 48 additions and 26 deletions

View File

@ -28,6 +28,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
blocked_(0), blocked_(0),
spinning_(0), spinning_(0),
done_(false), done_(false),
cancelled_(false),
ec_(waiters_) { ec_(waiters_) {
waiters_.resize(num_threads); waiters_.resize(num_threads);
@ -61,10 +62,19 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
~NonBlockingThreadPoolTempl() { ~NonBlockingThreadPoolTempl() {
done_ = true; done_ = true;
// Now if all threads block without work, they will start exiting. // Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long, // But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life. // block, submit new work, unblock and otherwise live full life.
ec_.Notify(true); if (!cancelled_) {
ec_.Notify(true);
} else {
// Since we were cancelled, there might be entries in the queues.
// Empty them to prevent their destructor from asserting.
for (size_t i = 0; i < queues_.size(); i++) {
queues_[i]->Flush();
}
}
// Join threads explicitly to avoid destruction order issues. // Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
@ -91,16 +101,25 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// completes overall computations, which in turn leads to destruction of // completes overall computations, which in turn leads to destruction of
// this. We expect that such scenario is prevented by program, that is, // this. We expect that such scenario is prevented by program, that is,
// this is kept alive while any threads can potentially be in Schedule. // this is kept alive while any threads can potentially be in Schedule.
if (!t.f) if (!t.f) {
ec_.Notify(false); ec_.Notify(false);
else }
else {
env_.ExecuteTask(t); // Push failed, execute directly. env_.ExecuteTask(t); // Push failed, execute directly.
}
} }
void Cancel() { void Cancel() {
cancelled_ = true;
done_ = true;
// Let each thread know it's been cancelled.
for (size_t i = 0; i < threads_.size(); i++) { for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->Cancel(); threads_[i]->OnCancel();
} }
// Wake up the threads without work to let them exit on their own.
ec_.Notify(true);
} }
int NumThreads() const final { int NumThreads() const final {
@ -135,6 +154,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<unsigned> blocked_; std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_; std::atomic<bool> spinning_;
std::atomic<bool> done_; std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_; EventCount ec_;
// Main worker thread loop. // Main worker thread loop.
@ -145,7 +165,7 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
pt->thread_id = thread_id; pt->thread_id = thread_id;
Queue* q = queues_[thread_id]; Queue* q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id];
for (;;) { while (!cancelled_) {
Task t = q->PopFront(); Task t = q->PopFront();
if (!t.f) { if (!t.f) {
t = Steal(); t = Steal();
@ -158,7 +178,11 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
// pool. Consider a time based limit instead. // pool. Consider a time based limit instead.
if (!spinning_ && !spinning_.exchange(true)) { if (!spinning_ && !spinning_.exchange(true)) {
for (int i = 0; i < 1000 && !t.f; i++) { for (int i = 0; i < 1000 && !t.f; i++) {
t = Steal(); if (!cancelled_.load(std::memory_order_relaxed)) {
t = Steal();
} else {
return;
}
} }
spinning_ = false; spinning_ = false;
} }
@ -207,8 +231,12 @@ class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
int victim = NonEmptyQueueIndex(); int victim = NonEmptyQueueIndex();
if (victim != -1) { if (victim != -1) {
ec_.CancelWait(waiter); ec_.CancelWait(waiter);
*t = queues_[victim]->PopBack(); if (cancelled_) {
return true; return false;
} else {
*t = queues_[victim]->PopBack();
return true;
}
} }
// Number of blocked threads is used as termination condition. // Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work, // If we are shutting down and all worker threads blocked without work,

View File

@ -71,7 +71,7 @@ class SimpleThreadPoolTempl : public ThreadPoolInterface {
void Cancel() { void Cancel() {
for (size_t i = 0; i < threads_.size(); i++) { for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->Cancel(); threads_[i]->OnCancel();
} }
} }

View File

@ -23,7 +23,8 @@ struct StlThreadEnvironment {
public: public:
EnvThread(std::function<void()> f) : thr_(std::move(f)) {} EnvThread(std::function<void()> f) : thr_(std::move(f)) {}
~EnvThread() { thr_.join(); } ~EnvThread() { thr_.join(); }
void Cancel() { EIGEN_THREAD_CANCEL(thr_); } // This function is called when the threadpool is cancelled.
void OnCancel() { }
private: private:
std::thread thr_; std::thread thr_;

View File

@ -19,7 +19,8 @@ class ThreadPoolInterface {
// Submits a closure to be run by a thread in the pool. // Submits a closure to be run by a thread in the pool.
virtual void Schedule(std::function<void()> fn) = 0; virtual void Schedule(std::function<void()> fn) = 0;
// Cancel all the threads in the pool. // Stop processing the closures that have been enqueued.
// Currently running closures may still be processed.
virtual void Cancel() = 0; virtual void Cancel() = 0;
// Returns the number of threads in the pool. // Returns the number of threads in the pool.

View File

@ -104,23 +104,15 @@ static void test_parallelism()
static void test_cancel() static void test_cancel()
{ {
NonBlockingThreadPool tp(4); NonBlockingThreadPool tp(2);
#ifdef EIGEN_SUPPORTS_THREAD_CANCELLATION // Schedule a large number of closure that each sleeps for one second. This
std::cout << "Thread cancellation is supported on this platform" << std::endl; // will keep the thread pool busy for much longer than the default test timeout.
for (int i = 0; i < 1000; ++i) {
tp.Schedule([]() { sleep(2); });
}
// Put 2 threads to sleep for much longer than the default test timeout. // Cancel the processing of all the closures that are still pending.
tp.Schedule([]() { sleep(3600); } );
tp.Schedule([]() { sleep(3600 * 24); } );
#else
std::cout << "Thread cancellation is a no-op on this platform" << std::endl;
// Make 2 threads sleep for a short period of time
tp.Schedule([]() { sleep(1); } );
tp.Schedule([]() { sleep(2); } );
#endif
// Call cancel:
tp.Cancel(); tp.Cancel();
} }