diff --git a/test/main.h b/test/main.h index 5d550969e..36784b1f4 100644 --- a/test/main.h +++ b/test/main.h @@ -125,7 +125,7 @@ inline void on_temporary_creation(long int size) { if(nb_temporaries!=(N)) { std::cerr << "nb_temporaries == " << nb_temporaries << "\n"; }\ VERIFY( (#XPR) && nb_temporaries==(N) ); \ } - + #endif #include "split_test_helper.h" @@ -328,7 +328,7 @@ namespace Eigen #define VERIFY_RAISES_STATIC_ASSERT(a) \ std::cout << "Can't VERIFY_RAISES_STATIC_ASSERT( " #a " ) with exceptions disabled\n"; #endif - + #if !defined(__CUDACC__) && !defined(__HIPCC__) && !defined(__SYCL_DEVICE_ONLY__) #define EIGEN_USE_CUSTOM_ASSERT #endif diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index cbb3bbf2c..1dcc4eb6c 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -44,17 +44,27 @@ #include #include #include - #include "src/util/CXX11Meta.h" #include "src/util/MaxSizeVector.h" #include "src/ThreadPool/ThreadLocal.h" +#ifndef EIGEN_THREAD_LOCAL +// There are non-parenthesized calls to "max" in the header, +// which trigger a check in test/main.h causing compilation to fail. +// We work around the check here by removing the check for max in +// the case where we have to emulate thread_local. +#ifdef max +#undef max +#endif +#include +#endif #include "src/ThreadPool/ThreadYield.h" #include "src/ThreadPool/ThreadCancel.h" #include "src/ThreadPool/EventCount.h" #include "src/ThreadPool/RunQueue.h" #include "src/ThreadPool/ThreadPoolInterface.h" #include "src/ThreadPool/ThreadEnvironment.h" +#include "src/ThreadPool/Barrier.h" #include "src/ThreadPool/NonBlockingThreadPool.h" #endif @@ -62,4 +72,3 @@ #include #endif // EIGEN_CXX11_THREADPOOL_MODULE - diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 3e3665efb..6fc6688d3 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -12,56 +12,6 @@ namespace Eigen { -// Barrier is an object that allows one or more threads to wait until -// Notify has been called a specified number of times. -class Barrier { - public: - Barrier(unsigned int count) : state_(count << 1), notified_(false) { - eigen_assert(((count << 1) >> 1) == count); - } - ~Barrier() { - eigen_assert((state_>>1) == 0); - } - - void Notify() { - unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; - if (v != 1) { - eigen_assert(((v + 2) & ~1) != 0); - return; // either count has not dropped to 0, or waiter is not waiting - } - std::unique_lock l(mu_); - eigen_assert(!notified_); - notified_ = true; - cv_.notify_all(); - } - - void Wait() { - unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); - if ((v >> 1) == 0) return; - std::unique_lock l(mu_); - while (!notified_) { - cv_.wait(l); - } - } - - private: - std::mutex mu_; - std::condition_variable cv_; - std::atomic state_; // low bit is waiter flag - bool notified_; -}; - - -// Notification is an object that allows a user to to wait for another -// thread to signal a notification that an event has occurred. -// -// Multiple threads can wait on the same Notification object, -// but only one caller must call Notify() on the object. -struct Notification : Barrier { - Notification() : Barrier(1) {}; -}; - - // Runs an arbitrary function and then calls Notify() on the passed in // Notification. template struct FunctionWrapperWithNotification diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h new file mode 100644 index 000000000..ef5e9ff18 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/Barrier.h @@ -0,0 +1,64 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2018 Rasmus Munk Larsen +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Barrier is an object that allows one or more threads to wait until +// Notify has been called a specified number of times. + +#ifndef EIGEN_CXX11_THREADPOOL_BARRIER_H +#define EIGEN_CXX11_THREADPOOL_BARRIER_H + +namespace Eigen { + +class Barrier { + public: + Barrier(unsigned int count) : state_(count << 1), notified_(false) { + eigen_assert(((count << 1) >> 1) == count); + } + ~Barrier() { eigen_plain_assert((state_ >> 1) == 0); } + + void Notify() { + unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; + if (v != 1) { + eigen_assert(((v + 2) & ~1) != 0); + return; // either count has not dropped to 0, or waiter is not waiting + } + std::unique_lock l(mu_); + eigen_assert(!notified_); + notified_ = true; + cv_.notify_all(); + } + + void Wait() { + unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); + if ((v >> 1) == 0) return; + std::unique_lock l(mu_); + while (!notified_) { + cv_.wait(l); + } + } + + private: + std::mutex mu_; + std::condition_variable cv_; + std::atomic state_; // low bit is waiter flag + bool notified_; +}; + +// Notification is an object that allows a user to to wait for another +// thread to signal a notification that an event has occurred. +// +// Multiple threads can wait on the same Notification object, +// but only one caller must call Notify() on the object. +struct Notification : Barrier { + Notification() : Barrier(1){}; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_BARRIER_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h index 0a7181102..22c952ae1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -58,7 +58,7 @@ class EventCount { ~EventCount() { // Ensure there are no waiters. - eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); + eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); } // Prewait prepares for waiting. @@ -169,7 +169,8 @@ class EventCount { class Waiter { friend class EventCount; - // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector. + // Align to 128 byte boundary to prevent false sharing with other Waiter + // objects in the same vector. EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic next; std::mutex mu; std::condition_variable cv; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h index a93e22a76..60a0c9fb6 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H - namespace Eigen { template @@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { : ThreadPoolTempl(num_threads, true, env) {} ThreadPoolTempl(int num_threads, bool allow_spinning, - Environment env = Environment()) + Environment env = Environment()) : env_(env), num_threads_(num_threads), allow_spinning_(allow_spinning), @@ -59,9 +58,17 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { } } queues_.resize(num_threads_); +#ifndef EIGEN_THREAD_LOCAL + init_barrier_.reset(new Barrier(num_threads_)); +#endif for (int i = 0; i < num_threads_; i++) { threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); } +#ifndef EIGEN_THREAD_LOCAL + // Wait for workers to initialize per_thread_map_. Otherwise we might race + // with them in Schedule or CurrentThreadId. + init_barrier_->Wait(); +#endif } ~ThreadPoolTempl() { @@ -82,6 +89,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Join threads explicitly to avoid destruction order issues. threads_.resize(0); + queues_.resize(0); } void Schedule(std::function fn) { @@ -106,8 +114,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // this is kept alive while any threads can potentially be in Schedule. if (!t.f) { ec_.Notify(false); - } - else { + } else { env_.ExecuteTask(t); // Push failed, execute directly. } } @@ -127,13 +134,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { ec_.Notify(true); } - int NumThreads() const final { - return num_threads_; - } + int NumThreads() const final { return num_threads_; } int CurrentThreadId() const final { - const PerThread* pt = - const_cast(this)->GetPerThread(); + const PerThread* pt = const_cast(this)->GetPerThread(); if (pt->pool == this) { return pt->thread_id; } else { @@ -145,10 +149,14 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { typedef typename Environment::EnvThread Thread; struct PerThread { - constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { } + constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} ThreadPoolTempl* pool; // Parent pool, or null for normal threads. - uint64_t rand; // Random generator state. - int thread_id; // Worker thread index in pool. + uint64_t rand; // Random generator state. + int thread_id; // Worker thread index in pool. +#ifndef EIGEN_THREAD_LOCAL + // Prevent false sharing. + char pad_[128]; +#endif }; Environment env_; @@ -163,12 +171,25 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { std::atomic done_; std::atomic cancelled_; EventCount ec_; +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr init_barrier_; + std::mutex per_thread_map_mutex_; // Protects per_thread_map_. + std::unordered_map> per_thread_map_; +#endif // Main worker thread loop. void WorkerLoop(int thread_id) { +#ifndef EIGEN_THREAD_LOCAL + std::unique_ptr new_pt(new PerThread()); + per_thread_map_mutex_.lock(); + eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second); + per_thread_map_mutex_.unlock(); + init_barrier_->Notify(); + init_barrier_->Wait(); +#endif PerThread* pt = GetPerThread(); pt->pool = this; - pt->rand = std::hash()(std::this_thread::get_id()); + pt->rand = GlobalThreadIdHash(); pt->thread_id = thread_id; Queue& q = queues_[thread_id]; EventCount::Waiter* waiter = &waiters_[thread_id]; @@ -320,10 +341,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { return -1; } - static EIGEN_STRONG_INLINE PerThread* GetPerThread() { + static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { + return std::hash()(std::this_thread::get_id()); + } + + EIGEN_STRONG_INLINE PerThread* GetPerThread() { +#ifndef EIGEN_THREAD_LOCAL + static PerThread dummy; + auto it = per_thread_map_.find(GlobalThreadIdHash()); + if (it == per_thread_map_.end()) { + return &dummy; + } else { + return it->second.get(); + } +#else EIGEN_THREAD_LOCAL PerThread per_thread_; PerThread* pt = &per_thread_; return pt; +#endif } static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { @@ -331,7 +366,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface { // Update the internal state *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; // Generate the random output (using the PCG-XSH-RS scheme) - return static_cast((current ^ (current >> 22)) >> (22 + (current >> 61))); + return static_cast((current ^ (current >> 22)) >> + (22 + (current >> 61))); } }; diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h index cb3690a2e..05c739aa1 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -10,7 +10,6 @@ #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ - namespace Eigen { // RunQueue is a fixed-size, partially non-blocking deque or Work items. @@ -47,7 +46,7 @@ class RunQueue { array_[i].state.store(kEmpty, std::memory_order_relaxed); } - ~RunQueue() { eigen_assert(Size() == 0); } + ~RunQueue() { eigen_plain_assert(Size() == 0); } // PushFront inserts w at the beginning of the queue. // If queue is full returns w, otherwise returns default-constructed Work. @@ -131,9 +130,8 @@ class RunQueue { Elem* e = &array_[mid & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); if (n == 0) { - if (s != kReady || - !e->state.compare_exchange_strong(s, kBusy, - std::memory_order_acquire)) + if (s != kReady || !e->state.compare_exchange_strong( + s, kBusy, std::memory_order_acquire)) continue; start = mid; } else { diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h index cfa221732..a41731c34 100644 --- a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadLocal.h @@ -10,13 +10,45 @@ #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H -// Try to come up with a portable implementation of thread local variables -#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7) -#define EIGEN_THREAD_LOCAL static __thread -#elif EIGEN_COMP_CLANG -#define EIGEN_THREAD_LOCAL static __thread -#else +#if EIGEN_MAX_CPP_VER >= 11 && \ + ((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \ + __has_feature(cxx_thread_local)) #define EIGEN_THREAD_LOCAL static thread_local #endif +// Disable TLS for Apple and Android builds with older toolchains. +#if defined(__APPLE__) +// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED, +// __IPHONE_8_0. +#include +#include +#endif +// Checks whether C++11's `thread_local` storage duration specifier is +// supported. +#if defined(__apple_build_version__) && \ + ((__apple_build_version__ < 8000042) || \ + (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)) +// Notes: Xcode's clang did not support `thread_local` until version +// 8, and even then not for all iOS < 9.0. +#undef EIGEN_THREAD_LOCAL + +#elif defined(__ANDROID__) && EIGEN_COMP_CLANG +// There are platforms for which TLS should not be used even though the compiler +// makes it seem like it's supported (Android NDK < r12b for example). +// This is primarily because of linker problems and toolchain misconfiguration: +// TLS isn't supported until NDK r12b per +// https://developer.android.com/ndk/downloads/revision_history.html +// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in +// . For NDK < r16, users should define these macros, +// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11. +#if __has_include() +#include +#endif // __has_include() +#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \ + defined(__NDK_MINOR__) && \ + ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1))) +#undef EIGEN_THREAD_LOCAL +#endif +#endif // defined(__ANDROID__) && defined(__clang__) + #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H