From 6b800744ce914cf243ac3169e136c5000253f52e Mon Sep 17 00:00:00 2001 From: Benoit Steiner Date: Wed, 20 May 2015 13:52:07 -0700 Subject: [PATCH] Moved away from std::async and std::future as the underlying mechnism for the thread pool device. On several platforms, the functions passed to std::async are not scheduled in the order in which they are given to std::async, which leads to massive performance issues in the contraction code. Instead we now have a custom thread pool that ensures that the functions are picked up by the threads in the pool in the order in which they are enqueued in the pool. --- unsupported/Eigen/CXX11/Tensor | 5 +- .../src/Tensor/TensorContractionThreadPool.h | 67 +++++--- .../Eigen/CXX11/src/Tensor/TensorDeviceType.h | 162 ++++++++++++++++-- .../Eigen/CXX11/src/Tensor/TensorExecutor.h | 5 +- unsupported/test/cxx11_tensor_thread_pool.cpp | 21 ++- 5 files changed, 212 insertions(+), 48 deletions(-) diff --git a/unsupported/Eigen/CXX11/Tensor b/unsupported/Eigen/CXX11/Tensor index 520da66bb..05c5127a1 100644 --- a/unsupported/Eigen/CXX11/Tensor +++ b/unsupported/Eigen/CXX11/Tensor @@ -35,7 +35,10 @@ #endif #ifdef EIGEN_USE_THREADS -#include +#include +#include +#include +#include #endif #ifdef EIGEN_USE_GPU diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index cb2fd53fe..ed87d3100 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -46,8 +46,8 @@ struct packRhsAndKernelArg { const Index n_block_idx; const Index m_blocks; const Index n_blocks; - std::vector* kernel_promises; - const std::vector* lhs_futures; + std::vector* kernel_notifications; + const std::vector* lhs_notifications; const bool need_to_pack; }; @@ -219,17 +219,13 @@ struct TensorEvaluator(this->m_device.allocate(sizeB * sizeof(RhsScalar)))); } - // lhs_futures starts with all null futures - std::vector lhs_futures(num_threads); + // lhs_notifications starts with all null Notifications + std::vector lhs_notifications(num_threads, nullptr); // this should really be numBlockAs * n_blocks; - const Index num_kernel_promises = num_threads * n_blocks; - std::vector kernel_promises(num_kernel_promises); - std::vector kernel_futures(num_kernel_promises); - for (std::size_t i = 0; i < kernel_promises.size(); ++i) { - kernel_promises[i].set_value(); - kernel_futures[i] = kernel_promises[i].get_future(); - } + const Index num_kernel_notifications = num_threads * n_blocks; + std::vector kernel_notifications(num_kernel_notifications, + nullptr); for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) { const Index k_start = k_block_idx * kc; @@ -245,11 +241,16 @@ struct TensorEvaluator 0); Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads; + for (int i = 0; i < n_blocks; ++i) { - Index future_id = (blockAId * n_blocks + i); - wait_until_ready(&kernel_futures[future_id]); - kernel_promises[future_id] = Promise(); - kernel_futures[future_id] = kernel_promises[future_id].get_future(); + Index notification_id = (blockAId * n_blocks + i); + // Wait for any current kernels using this slot to complete + // before using it. + if (kernel_notifications[notification_id]) { + wait_until_ready(kernel_notifications[notification_id]); + delete kernel_notifications[notification_id]; + } + kernel_notifications[notification_id] = new Notification(); } const packLArg arg = { blockAs[blockAId], // blockA @@ -260,8 +261,12 @@ struct TensorEvaluatorm_device.enqueue(&Self::packLhs, arg); + // Delete any existing notification since we may be + // replacing it. The algorithm should ensure that there are + // no existing waiters on this notification. + delete lhs_notifications[blockAId]; + lhs_notifications[blockAId] = + this->m_device.enqueue(&Self::packLhs, arg); } // now start kernels. @@ -278,7 +283,7 @@ struct TensorEvaluatorm_device.enqueueNoFuture(&Self::packRhsAndKernel, arg); + // We asynchronously kick off this function, which ends up + // notifying the appropriate kernel_notifications objects, + // which this thread waits on before exiting. + this->m_device.enqueueNoNotification(&Self::packRhsAndKernel, arg); } } } // Make sure all the kernels are done. - for (size_t i = 0; i < kernel_futures.size(); ++i) { - wait_until_ready(&kernel_futures[i]); + for (size_t i = 0; i < kernel_notifications.size(); ++i) { + wait_until_ready(kernel_notifications[i]); + delete kernel_notifications[i]; + } + + // No need to wait for lhs notifications since they should have + // already been waited on. Just clean them up. + for (size_t i = 0; i < lhs_notifications.size(); ++i) { + delete lhs_notifications[i]; } // deallocate all of the memory for both A and B's @@ -360,15 +375,15 @@ struct TensorEvaluatorNotify(); } } } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h index efd207507..1018395a1 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceType.h @@ -38,19 +38,151 @@ struct DefaultDevice { // We should really use a thread pool here but first we need to find a portable thread pool library. #ifdef EIGEN_USE_THREADS -typedef std::future Future; -typedef std::promise Promise; +// The implementation of the ThreadPool type ensures that the Schedule method +// runs the functions it is provided in FIFO order when the scheduling is done +// by a single thread. +class ThreadPool { + public: + // Construct a pool that contains "num_threads" threads. + explicit ThreadPool(int num_threads) { + for (int i = 0; i < num_threads; i++) { + threads_.push_back(new std::thread([this]() { WorkerLoop(); })); + } + } -static EIGEN_STRONG_INLINE void wait_until_ready(const Future* f) { - f->wait(); -} -static EIGEN_STRONG_INLINE void get_when_ready(Future* f) { - f->get(); + // Wait until all scheduled work has finished and then destroy the + // set of threads. + ~ThreadPool() + { + { + // Wait for all work to get done. + std::unique_lock l(mu_); + empty_.wait(l, [this]() { return pending_.empty(); }); + exiting_ = true; + + // Wakeup all waiters. + for (auto w : waiters_) { + w->ready = true; + w->work = nullptr; + w->cv.notify_one(); + } + } + + // Wait for threads to finish. + for (auto t : threads_) { + t->join(); + delete t; + } + } + + // Schedule fn() for execution in the pool of threads. The functions are + // executed in the order in which they are scheduled. + void Schedule(std::function fn) { + std::unique_lock l(mu_); + if (waiters_.empty()) { + pending_.push_back(fn); + } else { + Waiter* w = waiters_.back(); + waiters_.pop_back(); + w->ready = true; + w->work = fn; + w->cv.notify_one(); + } + } + + protected: + void WorkerLoop() { + std::unique_lock l(mu_); + Waiter w; + while (!exiting_) { + std::function fn; + if (pending_.empty()) { + // Wait for work to be assigned to me + w.ready = false; + waiters_.push_back(&w); + w.cv.wait(l, [&w]() { return w.ready; }); + fn = w.work; + w.work = nullptr; + } else { + // Pick up pending work + fn = pending_.front(); + pending_.pop_front(); + if (pending_.empty()) { + empty_.notify_all(); + } + } + if (fn) { + mu_.unlock(); + fn(); + mu_.lock(); + } + } + } + + private: + struct Waiter { + std::condition_variable cv; + std::function work; + bool ready; + }; + + std::mutex mu_; + std::vector threads_; // All threads + std::vector waiters_; // Stack of waiting threads. + std::deque> pending_; // Queue of pending work + std::condition_variable empty_; // Signaled on pending_.empty() + bool exiting_ = false; +}; + + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object. +// but only one caller must call Notify() on the object. +class Notification { + public: + Notification() : notified_(false) {} + ~Notification() {} + + void Notify() { + std::unique_lock l(mu_); + eigen_assert(!notified_); + notified_ = true; + cv_.notify_all(); + } + + void WaitForNotification() { + std::unique_lock l(mu_); + cv_.wait(l, [this]() { return notified_; } ); + } + + private: + std::mutex mu_; + std::condition_variable cv_; + bool notified_; +}; + +// Runs an arbitrary function and then calls Notify() on the passed in +// Notification. +template struct FunctionWrapper +{ + static void run(Notification* n, Function f, Args... args) { + f(args...); + n->Notify(); + } +}; + +static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) { + if (n) { + n->WaitForNotification(); + } } +// Build a thread pool device on top the an existing pool of threads. struct ThreadPoolDevice { - ThreadPoolDevice(size_t num_cores) : num_threads_(num_cores) { } + ThreadPoolDevice(ThreadPool* pool, size_t num_cores) : pool_(pool), num_threads_(num_cores) { } EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { return internal::aligned_malloc(num_bytes); @@ -73,15 +205,21 @@ struct ThreadPoolDevice { } template - EIGEN_STRONG_INLINE Future enqueue(Function&& f, Args&&... args) const { - return std::async(std::launch::async, f, args...); + EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { + Notification* n = new Notification(); + std::function func = + std::bind(&FunctionWrapper::run, n, f, args...); + pool_->Schedule(func); + return n; } template - EIGEN_STRONG_INLINE void enqueueNoFuture(Function&& f, Args&&... args) const { - std::async(std::launch::async, f, args...); + EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { + std::function func = std::bind(f, args...); + pool_->Schedule(func); } private: + ThreadPool* pool_; size_t num_threads_; }; diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 02e1667b9..6ea588e4b 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -131,7 +131,7 @@ class TensorExecutor const Index blocksize = std::max(PacketSize, (blocksz - (blocksz % PacketSize))); const Index numblocks = size / blocksize; - std::vector results; + std::vector results; results.reserve(numblocks); for (int i = 0; i < numblocks; ++i) { results.push_back(device.enqueue(&EvalRange::run, evaluator, i*blocksize, (i+1)*blocksize)); @@ -142,7 +142,8 @@ class TensorExecutor } for (int i = 0; i < numblocks; ++i) { - get_when_ready(&results[i]); + wait_until_ready(results[i]); + delete results[i]; } } diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp index 6fe65c7f9..05b55f706 100644 --- a/unsupported/test/cxx11_tensor_thread_pool.cpp +++ b/unsupported/test/cxx11_tensor_thread_pool.cpp @@ -26,7 +26,8 @@ static void test_multithread_elementwise() in1.setRandom(); in2.setRandom(); - Eigen::ThreadPoolDevice thread_pool_device(internal::random(3, 11)); + Eigen::ThreadPool tp(internal::random(3, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random(3, 11)); out.device(thread_pool_device) = in1 + in2 * 3.14f; for (int i = 0; i < 2; ++i) { @@ -48,7 +49,8 @@ static void test_multithread_compound_assignment() in1.setRandom(); in2.setRandom(); - Eigen::ThreadPoolDevice thread_pool_device(internal::random(3, 11)); + Eigen::ThreadPool tp(internal::random(3, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random(3, 11)); out.device(thread_pool_device) = in1; out.device(thread_pool_device) += in2 * 3.14f; @@ -80,7 +82,8 @@ static void test_multithread_contraction() MapXf m_right(t_right.data(), 1147, 1400); Matrix m_result(1500, 1400); - Eigen::ThreadPoolDevice thread_pool_device(4); + Eigen::ThreadPool tp(4); + Eigen::ThreadPoolDevice thread_pool_device(&tp, 4); // compute results by separate methods t_result.device(thread_pool_device) = t_left.contract(t_right, dims); @@ -115,7 +118,8 @@ static void test_contraction_corner_cases() MapXf m_right(t_right.data(), 32, 28*28); Matrix m_result(500, 28*28); - Eigen::ThreadPoolDevice thread_pool_device(12); + Eigen::ThreadPool tp(12); + Eigen::ThreadPoolDevice thread_pool_device(&tp, 12); // compute results by separate methods t_result.device(thread_pool_device) = t_left.contract(t_right, dims); @@ -204,7 +208,8 @@ static void test_multithread_contraction_agrees_with_singlethread() { typedef Tensor::DimensionPair DimPair; Eigen::array dims({{DimPair(1, 2)}}); - Eigen::ThreadPoolDevice thread_pool_device(internal::random(2, 11)); + Eigen::ThreadPool tp(internal::random(2, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random(2, 11)); Tensor st_result; st_result = left.contract(right, dims); @@ -227,7 +232,8 @@ static void test_memcpy() { for (int i = 0; i < 5; ++i) { const int num_threads = internal::random(3, 11); - Eigen::ThreadPoolDevice thread_pool_device(num_threads); + Eigen::ThreadPool tp(num_threads); + Eigen::ThreadPoolDevice thread_pool_device(&tp, num_threads); const int size = internal::random(13, 7632); Tensor t1(size); @@ -243,7 +249,8 @@ static void test_memcpy() { static void test_multithread_random() { - Eigen::ThreadPoolDevice device(2); + Eigen::ThreadPool tp(2); + Eigen::ThreadPoolDevice device(&tp, 2); Tensor t(1 << 20); t.device(device) = t.random>(); }