Use a single Barrier instead of a collection of Notifications to reduce the thread synchronization overhead

This commit is contained in:
Benoit Steiner 2016-03-22 15:24:23 -07:00
parent 65a7113a36
commit 002cf0d1c9
4 changed files with 73 additions and 38 deletions

View File

@ -51,6 +51,7 @@ typedef unsigned __int64 uint64_t;
#endif #endif
#ifdef EIGEN_USE_THREADS #ifdef EIGEN_USE_THREADS
#include <atomic>
#include <condition_variable> #include <condition_variable>
#include <deque> #include <deque>
#include <mutex> #include <mutex>

View File

@ -118,47 +118,82 @@ class ThreadPool : public ThreadPoolInterface {
}; };
// Notification is an object that allows a user to to wait for another // Barrier is an object that allows one or more threads to wait until
// thread to signal a notification that an event has occurred. // Notify has been called a specified number of times.
// class Barrier {
// Multiple threads can wait on the same Notification object.
// but only one caller must call Notify() on the object.
class Notification {
public: public:
Notification() : notified_(false) {} Barrier(unsigned int count) : state_(count << 1), notified_(false) {
~Notification() {} eigen_assert(((count << 1) >> 1) == count);
}
~Barrier() {
eigen_assert((state_>>1) == 0);
}
void Notify() { void Notify() {
unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
if (v != 1) {
eigen_assert(((v + 2) & ~1) != 0);
return; // either count has not dropped to 0, or waiter is not waiting
}
std::unique_lock<std::mutex> l(mu_); std::unique_lock<std::mutex> l(mu_);
eigen_assert(!notified_); eigen_assert(!notified_);
notified_ = true; notified_ = true;
cv_.notify_all(); cv_.notify_all();
} }
void WaitForNotification() { void Wait() {
unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
if ((v >> 1) == 0) return;
std::unique_lock<std::mutex> l(mu_); std::unique_lock<std::mutex> l(mu_);
cv_.wait(l, [this]() { return notified_; } ); while (!notified_) {
cv_.wait(l);
}
} }
private: private:
std::mutex mu_; std::mutex mu_;
std::condition_variable cv_; std::condition_variable cv_;
std::atomic<unsigned int> state_; // low bit is waiter flag
bool notified_; bool notified_;
}; };
// Notification is an object that allows a user to to wait for another
// thread to signal a notification that an event has occurred.
//
// Multiple threads can wait on the same Notification object,
// but only one caller must call Notify() on the object.
struct Notification : Barrier {
Notification() : Barrier(1) {};
};
// Runs an arbitrary function and then calls Notify() on the passed in // Runs an arbitrary function and then calls Notify() on the passed in
// Notification. // Notification.
template <typename Function, typename... Args> struct FunctionWrapper template <typename Function, typename... Args> struct FunctionWrapperWithNotification
{ {
static void run(Notification* n, Function f, Args... args) { static void run(Notification* n, Function f, Args... args) {
f(args...); f(args...);
if (n) {
n->Notify(); n->Notify();
} }
}
}; };
static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) { template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
{
static void run(Barrier* b, Function f, Args... args) {
f(args...);
if (b) {
b->Notify();
}
}
};
template <typename SyncType>
static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
if (n) { if (n) {
n->WaitForNotification(); n->Wait();
} }
} }
@ -203,10 +238,20 @@ struct ThreadPoolDevice {
EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
Notification* n = new Notification(); Notification* n = new Notification();
std::function<void()> func = std::function<void()> func =
std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...); std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...);
pool_->Schedule(func); pool_->Schedule(func);
return n; return n;
} }
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b,
Function&& f,
Args&&... args) const {
std::function<void()> func = std::bind(
&FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...);
pool_->Schedule(func);
}
template <class Function, class... Args> template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
std::function<void()> func = std::bind(f, args...); std::function<void()> func = std::bind(f, args...);

View File

@ -127,20 +127,16 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
const Index blocksize = numext::maxi<Index>(PacketSize, (blocksz - (blocksz % PacketSize))); const Index blocksize = numext::maxi<Index>(PacketSize, (blocksz - (blocksz % PacketSize)));
const Index numblocks = size / blocksize; const Index numblocks = size / blocksize;
MaxSizeVector<Notification*> results(numblocks); Barrier barrier(numblocks);
for (int i = 0; i < numblocks; ++i) { for (int i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&EvalRange<Evaluator, Index, Vectorizable>::run, evaluator, i*blocksize, (i+1)*blocksize)); device.enqueue_with_barrier(&barrier, &EvalRange<Evaluator, Index, Vectorizable>::run, evaluator, i*blocksize, (i+1)*blocksize);
} }
if (numblocks * blocksize < size) { if (numblocks * blocksize < size) {
EvalRange<Evaluator, Index, Vectorizable>::run(evaluator, numblocks * blocksize, size); EvalRange<Evaluator, Index, Vectorizable>::run(evaluator, numblocks * blocksize, size);
} }
for (int i = 0; i < numblocks; ++i) { barrier.Wait();
wait_until_ready(results[i]);
delete results[i];
}
} }
evaluator.cleanup(); evaluator.cleanup();
} }

View File

@ -256,12 +256,11 @@ struct FullReducer<Self, Op, ThreadPoolDevice, false> {
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0; const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize); eigen_assert(num_coeffs >= numblocks * blocksize);
MaxSizeVector<Notification*> results(numblocks); Barrier barrier(numblocks);
MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize()); MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) { for (Index i = 0; i < numblocks; ++i) {
results.push_back( device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, false>::run, self,
device.enqueue(&FullReducerShard<Self, Op, false>::run, self, i * blocksize, blocksize, reducer, &shards[i]);
i * blocksize, blocksize, reducer, &shards[i]));
} }
typename Self::CoeffReturnType finalShard; typename Self::CoeffReturnType finalShard;
@ -271,10 +270,7 @@ struct FullReducer<Self, Op, ThreadPoolDevice, false> {
} else { } else {
finalShard = reducer.initialize(); finalShard = reducer.initialize();
} }
for (Index i = 0; i < numblocks; ++i) { barrier.Wait();
wait_until_ready(results[i]);
delete results[i];
}
for (Index i = 0; i < numblocks; ++i) { for (Index i = 0; i < numblocks; ++i) {
reducer.reduce(shards[i], &finalShard); reducer.reduce(shards[i], &finalShard);
} }
@ -307,12 +303,12 @@ struct FullReducer<Self, Op, ThreadPoolDevice, true> {
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0; const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize); eigen_assert(num_coeffs >= numblocks * blocksize);
MaxSizeVector<Notification*> results(numblocks); Barrier barrier(numblocks);
MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize()); MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) { for (Index i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&FullReducerShard<Self, Op, true>::run, device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, true>::run,
self, i * blocksize, blocksize, reducer, self, i * blocksize, blocksize, reducer,
&shards[i])); &shards[i]);
} }
typename Self::CoeffReturnType finalShard; typename Self::CoeffReturnType finalShard;
if (numblocks * blocksize < num_coeffs) { if (numblocks * blocksize < num_coeffs) {
@ -322,10 +318,7 @@ struct FullReducer<Self, Op, ThreadPoolDevice, true> {
finalShard = reducer.initialize(); finalShard = reducer.initialize();
} }
for (Index i = 0; i < numblocks; ++i) { barrier.Wait();
wait_until_ready(results[i]);
delete results[i];
}
for (Index i = 0; i < numblocks; ++i) { for (Index i = 0; i < numblocks; ++i) {
reducer.reduce(shards[i], &finalShard); reducer.reduce(shards[i], &finalShard);
} }