From 8278ae63137fb2ce2cdf6fc8117df3080e5cb2fe Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Mon, 13 Aug 2018 15:31:23 -0700 Subject: [PATCH 01/10] Add support for thread local support on platforms that do not support it through emulation using a hash map. --- test/main.h | 6 +- unsupported/Eigen/CXX11/ThreadPool | 10 ++- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 50 -------------- .../src/ThreadPool/NonBlockingThreadPool.h | 65 ++++++++++++++----- .../Eigen/CXX11/src/ThreadPool/ThreadLocal.h | 48 ++++++++++++-- 5 files changed, 103 insertions(+), 76 deletions(-) diff --git a/test/main.h b/test/main.h index de8a4865f..36784b1f4 100644 --- a/test/main.h +++ b/test/main.h @@ -125,7 +125,7 @@ inline void on_temporary_creation(long int size) { if(nb_temporaries!=(N)) { std::cerr << "nb_temporaries == " << nb_temporaries << "\n"; }\ VERIFY( (#XPR) && nb_temporaries==(N) ); \ } - + #endif #include "split_test_helper.h" @@ -328,7 +328,7 @@ namespace Eigen #define VERIFY_RAISES_STATIC_ASSERT(a) \ std::cout << "Can't VERIFY_RAISES_STATIC_ASSERT( " #a " ) with exceptions disabled\n"; #endif - + #if !defined(__CUDACC__) && !defined(__HIPCC__) && !defined(__SYCL_DEVICE_ONLY__) #define EIGEN_USE_CUSTOM_ASSERT #endif @@ -845,4 +845,4 @@ int main(int argc, char *argv[]) #ifdef _MSC_VER // 4503 - decorated name length exceeded, name was truncated #pragma warning( disable : 4503) -#endif \ No newline at end of file +#endif diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index cbb3bbf2c..12aa07c7f 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -44,6 +44,14 @@ #include #include #include +#ifndef EIGEN_THREAD_LOCAL +// There are non-parenthesized calls to "max" in the header, +// which trigger a check in test/main.h causing compilation to fail. +// We work around the check here by removing the check for max in +// the case where we have to emulate thread_local. +#undef max +#include +#endif #include "src/util/CXX11Meta.h" #include "src/util/MaxSizeVector.h" @@ -55,6 +63,7 @@ #include "src/ThreadPool/RunQueue.h" #include "src/ThreadPool/ThreadPoolInterface.h" #include "src/ThreadPool/ThreadEnvironment.h" +#include "src/ThreadPool/Barrier.h" #include "src/ThreadPool/NonBlockingThreadPool.h" #endif @@ -62,4 +71,3 @@ #include #endif // EIGEN_CXX11_THREADPOOL_MODULE - diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 3e3665efb..6fc6688d3 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -12,56 +12,6 @@ namespace Eigen { -// Barrier is an object that allows one or more threads to wait until -// Notify has been called a specified number of times. -class Barrier { - public: - Barrier(unsigned int count) : state_(count << 1), notified_(false) { - eigen_assert(((count << 1) >> 1) == count); - } - ~Barrier() { - eigen_assert((state_>>1) == 0); - } - - void Notify() { - unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; - if (v != 1) { - eigen_assert(((v + 2) & ~1) != 0); - return; // either count has not dropped to 0, or waiter is not waiting - } - std::unique_lock l(mu_); - eigen_assert(!notified_); - notified_ = true; - cv_.notify_all(); - } - - void Wait() { - unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); - if ((v >> 1) == 0) return; - std::unique_lock l(mu_); - while (!notified_) { - cv_.wait(l); - } - } - - private: - std::mutex mu_; - std::condition_variable cv_; - std::atomic state_; // low bit is waiter flag - bool notified_; -}; - - -// Notification is an object that allows a user to to wait for another -// thread to signal a notification that an event has occurred. -// -// Multiple threads can wait on the same Notification object, -// but only one caller must call Notify() on the object. -struct Notification : Barrier { - Notification() : Barrier(1) {}; -}; - - // Runs an arbitrary function and then calls Notify() on the passed in // Notification. template struct FunctionWrapperWithNotification diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index ecd49f382..ede70da8d 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H - namespace Eigen { template @@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { : ThreadPoolTempl(num_threads, true, env) {} ThreadPoolTempl(int num_threads, bool allow_spinning, - Environment env = Environment()) + Environment env = Environment()) : env_(env), num_threads_(num_threads), allow_spinning_(allow_spinning), @@ -61,9 +60,17 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { for (int i = 0; i < num_threads_; i++) { queues_.push_back(new Queue()); } +#ifndef EIGEN_THREAD_LOCAL + init_barrier_.reset(new Barrier(num_threads_)); +#endif for (int i = 0; i < num_threads_; i++) { threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); } +#ifndef EIGEN_THREAD_LOCAL + // Wait for workers to initialize per_thread_map_. Otherwise we might race + // with them in Schedule or CurrentThreadId. + init_barrier_->Wait(); +#endif } ~ThreadPoolTempl() { @@ -85,6 +92,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Join threads explicitly to avoid destruction order issues. for (size_t i = 0; i < num_threads_; i++) delete threads_[i]; for (size_t i = 0; i < num_threads_; i++) delete queues_[i]; +#ifndef EIGEN_THREAD_LOCAL + for (auto it : per_thread_map_) delete it.second; +#endif } void Schedule(std::function fn) { @@ -109,8 +119,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // this is kept alive while any threads can potentially be in Schedule. if (!t.f) { ec_.Notify(false); - } - else { + } else { env_.ExecuteTask(t); // Push failed, execute directly. } } @@ -130,13 +139,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { ec_.Notify(true); } - int NumThreads() const final { - return num_threads_; - } + int NumThreads() const final { return num_threads_; } int CurrentThreadId() const final { - const PerThread* pt = - const_cast(this)->GetPerThread(); + const PerThread* pt = const_cast(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -148,10 +154,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { typedef typename Environment::EnvThread Thread; struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} ThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. }; Environment env_; @@ -166,12 +172,26 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic done_; std::atomic cancelled_; EventCount ec_; +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr init_barrier_; + std::mutex mu; // Protects per_thread_map_. + std::unordered_map per_thread_map_; +#endif // Main worker thread loop. void WorkerLoop(int thread_id) { +#ifndef EIGEN_THREAD_LOCAL + PerThread* pt = new PerThread(); + mu.lock(); + per_thread_map_[GlobalThreadIdHash()] = pt; + mu.unlock(); + init_barrier_->Notify(); + init_barrier_->Wait(); +#else PerThread* pt = GetPerThread(); +#endif pt->pool = this; - pt->rand = std::hash()(std::this_thread::get_id()); + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; Queue* q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; @@ -322,10 +342,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { return -1; } - static EIGEN_STRONG_INLINE PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { + return std::hash()(std::this_thread::get_id()); + } + + EIGEN_STRONG_INLINE PerThread* GetPerThread() { +#ifndef EIGEN_THREAD_LOCAL + static PerThread dummy; + auto it = per_thread_map_.find(GlobalThreadIdHash()); + if (it == per_thread_map_.end()) { + return &dummy; + } else { + return it->second; + } +#else EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; return pt; +#endif } static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { @@ -333,7 +367,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Update the internal state *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; // Generate the random output (using the PCG-XSH-RS scheme) - return static_cast((current ^ (current >> 22)) >> (22 + (current >> 61))); + return static_cast((current ^ (current >> 22)) >> + (22 + (current >> 61))); } }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index cfa221732..f33759ba9 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -10,13 +10,47 @@ #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -// Try to come up with a portable implementation of thread local variables -#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7) -#define EIGEN_THREAD_LOCAL static __thread -#elif EIGEN_COMP_CLANG -#define EIGEN_THREAD_LOCAL static __thread -#else -#define EIGEN_THREAD_LOCAL static thread_local +#undef EIGEN_THREAD_LOCAL + +#if EIGEN_MAX_CPP_VER>=11 && (__has_feature(cxx_thread_local)) + #define EIGEN_THREAD_LOCAL static thread_local +#elif (EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)) || EIGEN_COMP_CLANG + #define EIGEN_THREAD_LOCAL static __thread #endif +// Disable TLS for Apple and Android builds with older toolchains. +#if defined(__APPLE__) +// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED, +// __IPHONE_8_0. +#include +#include +#endif +// Checks whether C++11's `thread_local` storage duration specifier is +// supported. +#if defined(__apple_build_version__) && \ + ((__apple_build_version__ < 8000042) || \ + (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)) +// Notes: Xcode's clang did not support `thread_local` until version +// 8, and even then not for all iOS < 9.0. +#undef EIGEN_THREAD_LOCAL + +#elif defined(__ANDROID__) && EIGEN_COMP_CLANG +// There are platforms for which TLS should not be used even though the compiler +// makes it seem like it's supported (Android NDK < r12b for example). +// This is primarily because of linker problems and toolchain misconfiguration: +// TLS isn't supported until NDK r12b per +// https://developer.android.com/ndk/downloads/revision_history.html +// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in +// . For NDK < r16, users should define these macros, +// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11. +#if __has_include() +#include +#endif // __has_include() +#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \ + defined(__NDK_MINOR__) && \ + ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1))) +#undef EIGEN_THREAD_LOCAL +#endif +#endif // defined(__ANDROID__) && defined(__clang__) + #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H From 9bb75d8d31571f5513107080c8d3c85e27ff8430 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Mon, 13 Aug 2018 15:34:03 -0700 Subject: [PATCH 02/10] Add Barrier.h. --- .../Eigen/CXX11/src/ThreadPool/Barrier.h | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h new file mode 100644 index 000000000..c37fc1e65 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h @@ -0,0 +1,67 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2018 Rasmus Munk Larsen +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Barrier is an object that allows one or more threads to wait until +// Notify has been called a specified number of times. + +#ifndef EIGEN_CXX11_THREADPOOL_BARRIER_H +#define EIGEN_CXX11_THREADPOOL_BARRIER_H + +namespace Eigen { + +class Barrier { + public: + Barrier(unsigned int count) : state_(count << 1), notified_(false) { + eigen_assert(((count << 1) >> 1) == count); + } + ~Barrier() { + eigen_assert((state_>>1) == 0); + } + + void Notify() { + unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; + if (v != 1) { + eigen_assert(((v + 2) & ~1) != 0); + return; // either count has not dropped to 0, or waiter is not waiting + } + std::unique_lock l(mu_); + eigen_assert(!notified_); + notified_ = true; + cv_.notify_all(); + } + + void Wait() { + unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); + if ((v >> 1) == 0) return; + std::unique_lock l(mu_); + while (!notified_) { + cv_.wait(l); + } + } + + private: + std::mutex mu_; + std::condition_variable cv_; + std::atomic state_; // low bit is waiter flag + bool notified_; +}; + + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object, +// but only one caller must call Notify() on the object. +struct Notification : Barrier { + Notification() : Barrier(1) {}; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_BARRIER_H From 15d4f515e2d4982bd16f0a85a7bbb5343270deec Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Tue, 14 Aug 2018 12:17:46 -0700 Subject: [PATCH 03/10] Use plain_assert in destructors to avoid throwing in CXX11 tests where main.h owerwrites eigen_assert with a throwing version. --- unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h | 7 ++----- unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h | 5 +++-- .../Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 6 +++--- unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h | 8 +++----- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h index c37fc1e65..ef5e9ff18 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h @@ -20,9 +20,7 @@ class Barrier { Barrier(unsigned int count) : state_(count << 1), notified_(false) { eigen_assert(((count << 1) >> 1) == count); } - ~Barrier() { - eigen_assert((state_>>1) == 0); - } + ~Barrier() { eigen_plain_assert((state_ >> 1) == 0); } void Notify() { unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; @@ -52,14 +50,13 @@ class Barrier { bool notified_; }; - // Notification is an object that allows a user to to wait for another // thread to signal a notification that an event has occurred. // // Multiple threads can wait on the same Notification object, // but only one caller must call Notify() on the object. struct Notification : Barrier { - Notification() : Barrier(1) {}; + Notification() : Barrier(1){}; }; } // namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h index 0a7181102..22c952ae1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -58,7 +58,7 @@ class EventCount { ~EventCount() { // Ensure there are no waiters. - eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); + eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); } // Prewait prepares for waiting. @@ -169,7 +169,8 @@ class EventCount { class Waiter { friend class EventCount; - // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector. + // Align to 128 byte boundary to prevent false sharing with other Waiter + // objects in the same vector. EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic next; std::mutex mu; std::condition_variable cv; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index ede70da8d..354995be8 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -90,8 +90,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } // Join threads explicitly to avoid destruction order issues. - for (size_t i = 0; i < num_threads_; i++) delete threads_[i]; - for (size_t i = 0; i < num_threads_; i++) delete queues_[i]; + for (int i = 0; i < num_threads_; i++) delete threads_[i]; + for (int i = 0; i < num_threads_; i++) delete queues_[i]; #ifndef EIGEN_THREAD_LOCAL for (auto it : per_thread_map_) delete it.second; #endif @@ -298,7 +298,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // If we are shutting down and all worker threads blocked without work, // that's we are done. blocked_++; - if (done_ && blocked_ == num_threads_) { + if (done_ && blocked_ == static_cast(num_threads_)) { ec_.CancelWait(waiter); // Almost done, but need to re-check queues. // Consider that all queues are empty and all worker threads are preempted diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index cb3690a2e..05c739aa1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ - namespace Eigen { // RunQueue is a fixed-size, partially non-blocking deque or Work items. @@ -47,7 +46,7 @@ class RunQueue { array_[i].state.store(kEmpty, std::memory_order_relaxed); } - ~RunQueue() { eigen_assert(Size() == 0); } + ~RunQueue() { eigen_plain_assert(Size() == 0); } // PushFront inserts w at the beginning of the queue. // If queue is full returns w, otherwise returns default-constructed Work. @@ -131,9 +130,8 @@ class RunQueue { Elem* e = &array_[mid & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); if (n == 0) { - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, - std::memory_order_acquire)) + if (s != kReady || !e->state.compare_exchange_strong( + s, kBusy, std::memory_order_acquire)) continue; start = mid; } else { From e51d9e473aa1f882d3b3106ec2427a44d2a76ceb Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Thu, 23 Aug 2018 11:42:05 -0700 Subject: [PATCH 04/10] Protect #undef max with #ifdef max. --- unsupported/Eigen/CXX11/ThreadPool | 2 ++ 1 file changed, 2 insertions(+) diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index 12aa07c7f..64ea83b7e 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -49,7 +49,9 @@ // which trigger a check in test/main.h causing compilation to fail. // We work around the check here by removing the check for max in // the case where we have to emulate thread_local. +#ifdef max #undef max +#endif #include #endif From 6e0464004a7bcd666d3b5962c3c999ff78f416f1 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Thu, 23 Aug 2018 12:10:08 -0700 Subject: [PATCH 05/10] Store std::unique_ptr instead of raw pointers in per_thread_map_. --- .../CXX11/src/ThreadPool/NonBlockingThreadPool.h | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index a800e827f..1ac4de3b5 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -58,9 +58,6 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } queues_.resize(num_threads_); - for (int i = 0; i < num_threads_; i++) { - queues_.push_back(new Queue()); - } #ifndef EIGEN_THREAD_LOCAL init_barrier_.reset(new Barrier(num_threads_)); #endif @@ -93,9 +90,6 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Join threads explicitly to avoid destruction order issues. threads_.resize(0); queues_.resize(0); -#ifndef EIGEN_THREAD_LOCAL - for (auto it : per_thread_map_) delete it.second; -#endif } void Schedule(std::function fn) { @@ -176,21 +170,19 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { #ifndef EIGEN_THREAD_LOCAL std::unique_ptr init_barrier_; std::mutex mu; // Protects per_thread_map_. - std::unordered_map per_thread_map_; + std::unordered_map> per_thread_map_; #endif // Main worker thread loop. void WorkerLoop(int thread_id) { #ifndef EIGEN_THREAD_LOCAL - PerThread* pt = new PerThread(); mu.lock(); - per_thread_map_[GlobalThreadIdHash()] = pt; + eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), new PerThread()).second); mu.unlock(); init_barrier_->Notify(); init_barrier_->Wait(); -#else - PerThread* pt = GetPerThread(); #endif + PerThread* pt = GetPerThread(); pt->pool = this; pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; @@ -355,7 +347,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { if (it == per_thread_map_.end()) { return &dummy; } else { - return it->second; + return it->second.get(); } #else EIGEN_THREAD_LOCAL PerThread per_thread_; From 6cedc5a9b38d6ddda69d532b28dff9ee5c2d1c04 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Thu, 23 Aug 2018 12:11:58 -0700 Subject: [PATCH 06/10] rename mu. --- .../Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 1ac4de3b5..d710faa94 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -169,16 +169,16 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { EventCount ec_; #ifndef EIGEN_THREAD_LOCAL std::unique_ptr init_barrier_; - std::mutex mu; // Protects per_thread_map_. + std::mutex per_thread_map_mutex_; // Protects per_thread_map_. std::unordered_map> per_thread_map_; #endif // Main worker thread loop. void WorkerLoop(int thread_id) { #ifndef EIGEN_THREAD_LOCAL - mu.lock(); + per_thread_map_mutex_.lock(); eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), new PerThread()).second); - mu.unlock(); + per_thread_map_mutex_.unlock(); init_barrier_->Notify(); init_barrier_->Wait(); #endif From 668690978ff66151b2a495767c7daf33d06be4a5 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Thu, 23 Aug 2018 12:54:33 -0700 Subject: [PATCH 07/10] Pad PerThread when we emulate thread_local to prevent false sharing. --- .../Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index d710faa94..1cb63bcfa 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -153,6 +153,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { ThreadPoolTempl* pool; // Parent pool, or null for normal threads. uint64_t rand; // Random generator state. int thread_id; // Worker thread index in pool. +#ifndef EIGEN_THREAD_LOCAL + // Prevent false sharing. + char pad_[128]; +#endif }; Environment env_; From e9f9d70611d0a66751c34b1430ed7649aff6e2bf Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Thu, 23 Aug 2018 12:59:46 -0700 Subject: [PATCH 08/10] Don't rely on __had_feature for g++. Don't use __thread. Only use thread_local for gcc 4.8 or newer. --- unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index f33759ba9..89ed6e5e5 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -12,10 +12,10 @@ #undef EIGEN_THREAD_LOCAL -#if EIGEN_MAX_CPP_VER>=11 && (__has_feature(cxx_thread_local)) - #define EIGEN_THREAD_LOCAL static thread_local -#elif (EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)) || EIGEN_COMP_CLANG - #define EIGEN_THREAD_LOCAL static __thread +#if EIGEN_MAX_CPP_VER >= 11 && \ + ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \ + __has_feature(cxx_thread_local)) +#define EIGEN_THREAD_LOCAL static thread_local #endif // Disable TLS for Apple and Android builds with older toolchains. From 8d9bc5cc022bee4a06201c7c5a1dec2b73697f5f Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Thu, 23 Aug 2018 13:06:39 -0700 Subject: [PATCH 09/10] Fix g++ compilation. --- unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index 1cb63bcfa..60a0c9fb6 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -180,8 +180,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Main worker thread loop. void WorkerLoop(int thread_id) { #ifndef EIGEN_THREAD_LOCAL + std::unique_ptr new_pt(new PerThread()); per_thread_map_mutex_.lock(); - eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), new PerThread()).second); + eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second); per_thread_map_mutex_.unlock(); init_barrier_->Notify(); init_barrier_->Wait(); From 744e2fe0dedb697a8802a3e633e37a4f844da372 Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Fri, 24 Aug 2018 10:24:54 -0700 Subject: [PATCH 10/10] Address comments about EIGEN_THREAD_LOCAL. --- unsupported/Eigen/CXX11/ThreadPool | 9 ++++----- unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h | 2 -- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index 64ea83b7e..1dcc4eb6c 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -44,6 +44,10 @@ #include #include #include +#include "src/util/CXX11Meta.h" +#include "src/util/MaxSizeVector.h" + +#include "src/ThreadPool/ThreadLocal.h" #ifndef EIGEN_THREAD_LOCAL // There are non-parenthesized calls to "max" in the header, // which trigger a check in test/main.h causing compilation to fail. @@ -54,11 +58,6 @@ #endif #include #endif - -#include "src/util/CXX11Meta.h" -#include "src/util/MaxSizeVector.h" - -#include "src/ThreadPool/ThreadLocal.h" #include "src/ThreadPool/ThreadYield.h" #include "src/ThreadPool/ThreadCancel.h" #include "src/ThreadPool/EventCount.h" diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index 89ed6e5e5..a41731c34 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -10,8 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -#undef EIGEN_THREAD_LOCAL - #if EIGEN_MAX_CPP_VER >= 11 && \ ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \ __has_feature(cxx_thread_local))