Make it possible to override the synchonization primitives used by the threadpool using macros.

This commit is contained in:
Rasmus Munk Larsen 2023-05-09 19:36:17 +00:00
parent 1321821e86
commit 96c42771d6
6 changed files with 27 additions and 17 deletions

View File

@ -51,6 +51,16 @@
#include "src/Core/util/Meta.h"
#include "src/Core/util/MaxSizeVector.h"
#ifndef EIGEN_MUTEX
#define EIGEN_MUTEX std::mutex
#endif
#ifndef EIGEN_MUTEX_LOCK
#define EIGEN_MUTEX_LOCK std::unique_lock<std::mutex>
#endif
#ifndef EIGEN_CONDVAR
#define EIGEN_CONDVAR std::condition_variable
#endif
// IWYU pragma: begin_exports
#include "src/ThreadPool/ThreadLocal.h"
#include "src/ThreadPool/ThreadYield.h"

View File

@ -33,7 +33,7 @@ class Barrier {
eigen_plain_assert(((v + 2) & ~1) != 0);
return; // either count has not dropped to 0, or waiter is not waiting
}
std::unique_lock<std::mutex> l(mu_);
EIGEN_MUTEX_LOCK l(mu_);
eigen_plain_assert(!notified_);
notified_ = true;
cv_.notify_all();
@ -42,15 +42,15 @@ class Barrier {
void Wait() {
unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
if ((v >> 1) == 0) return;
std::unique_lock<std::mutex> l(mu_);
EIGEN_MUTEX_LOCK l(mu_);
while (!notified_) {
cv_.wait(l);
}
}
private:
std::mutex mu_;
std::condition_variable cv_;
EIGEN_MUTEX mu_;
EIGEN_CONDVAR cv_;
std::atomic<unsigned int> state_; // low bit is waiter flag
bool notified_;
};

View File

@ -171,8 +171,8 @@ class EventCount {
// Align to 128 byte boundary to prevent false sharing with other Waiter
// objects in the same vector.
EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<uint64_t> next;
std::mutex mu;
std::condition_variable cv;
EIGEN_MUTEX mu;
EIGEN_CONDVAR cv;
uint64_t epoch = 0;
unsigned state = kNotSignaled;
enum {
@ -220,7 +220,7 @@ class EventCount {
}
void Park(Waiter* w) {
std::unique_lock<std::mutex> lock(w->mu);
EIGEN_MUTEX_LOCK lock(w->mu);
while (w->state != Waiter::kSignaled) {
w->state = Waiter::kWaiting;
w->cv.wait(lock);
@ -233,7 +233,7 @@ class EventCount {
next = wnext == kStackMask ? nullptr : &waiters_[wnext];
unsigned state;
{
std::unique_lock<std::mutex> lock(w->mu);
EIGEN_MUTEX_LOCK lock(w->mu);
state = w->state;
w->state = Waiter::kSignaled;
}

View File

@ -248,7 +248,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
EventCount ec_;
#ifndef EIGEN_THREAD_LOCAL
std::unique_ptr<Barrier> init_barrier_;
std::mutex per_thread_map_mutex_; // Protects per_thread_map_.
EIGEN_MUTEX per_thread_map_mutex_; // Protects per_thread_map_.
std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
#endif

View File

@ -84,7 +84,7 @@ class RunQueue {
// PushBack adds w at the end of the queue.
// If queue is full returns w, otherwise returns default-constructed Work.
Work PushBack(Work w) {
std::unique_lock<std::mutex> lock(mutex_);
EIGEN_MUTEX_LOCK lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[(back - 1) & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
@ -101,7 +101,7 @@ class RunQueue {
// PopBack removes and returns the last elements in the queue.
Work PopBack() {
if (Empty()) return Work();
std::unique_lock<std::mutex> lock(mutex_);
EIGEN_MUTEX_LOCK lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
Elem* e = &array_[back & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
@ -118,7 +118,7 @@ class RunQueue {
// Returns number of elements removed.
unsigned PopBackHalf(std::vector<Work>* result) {
if (Empty()) return 0;
std::unique_lock<std::mutex> lock(mutex_);
EIGEN_MUTEX_LOCK lock(mutex_);
unsigned back = back_.load(std::memory_order_relaxed);
unsigned size = Size();
unsigned mid = back;
@ -174,7 +174,7 @@ class RunQueue {
kBusy,
kReady,
};
std::mutex mutex_;
EIGEN_MUTEX mutex_;
// Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
// front/back, respectively. The remaining bits contain modification counters
// that are incremented on Push operations. This allows us to (1) distinguish

View File

@ -225,7 +225,7 @@ class ThreadLocal {
if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
// Adds a happens before edge from the last call to SpilledLocal().
std::unique_lock<std::mutex> lock(mu_);
EIGEN_MUTEX_LOCK lock(mu_);
for (auto& kv : per_thread_map_) {
f(kv.first, kv.second);
}
@ -245,7 +245,7 @@ class ThreadLocal {
if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
// Adds a happens before edge from the last call to SpilledLocal().
std::unique_lock<std::mutex> lock(mu_);
EIGEN_MUTEX_LOCK lock(mu_);
for (auto& kv : per_thread_map_) {
release_(kv.second);
}
@ -259,7 +259,7 @@ class ThreadLocal {
// Use unordered map guarded by a mutex when lock free storage is full.
T& SpilledLocal(std::thread::id this_thread) {
std::unique_lock<std::mutex> lock(mu_);
EIGEN_MUTEX_LOCK lock(mu_);
auto it = per_thread_map_.find(this_thread);
if (it == per_thread_map_.end()) {
@ -290,7 +290,7 @@ class ThreadLocal {
// We fallback on per thread map if lock-free storage is full. In practice
// this should never happen, if `capacity_` is a reasonable estimate of the
// number of threads running in a system.
std::mutex mu_; // Protects per_thread_map_.
EIGEN_MUTEX mu_; // Protects per_thread_map_.
std::unordered_map<std::thread::id, T> per_thread_map_;
};