mirror of
https://gitlab.com/libeigen/eigen.git
synced 2025-04-29 23:34:12 +08:00
Optimize ThreadPool spinning
This commit is contained in:
parent
c593e9e948
commit
e44db21092
@ -31,8 +31,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
all_coprimes_(num_threads),
|
all_coprimes_(num_threads),
|
||||||
waiters_(num_threads),
|
waiters_(num_threads),
|
||||||
global_steal_partition_(EncodePartition(0, num_threads_)),
|
global_steal_partition_(EncodePartition(0, num_threads_)),
|
||||||
|
spinning_state_(0),
|
||||||
blocked_(0),
|
blocked_(0),
|
||||||
spinning_(0),
|
|
||||||
done_(false),
|
done_(false),
|
||||||
cancelled_(false),
|
cancelled_(false),
|
||||||
ec_(waiters_) {
|
ec_(waiters_) {
|
||||||
@ -125,7 +125,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
// this. We expect that such scenario is prevented by program, that is,
|
// this. We expect that such scenario is prevented by program, that is,
|
||||||
// this is kept alive while any threads can potentially be in Schedule.
|
// this is kept alive while any threads can potentially be in Schedule.
|
||||||
if (!t.f) {
|
if (!t.f) {
|
||||||
ec_.Notify(false);
|
if (IsNotifyParkedThreadRequired()) {
|
||||||
|
ec_.Notify(false);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
env_.ExecuteTask(t); // Push failed, execute directly.
|
env_.ExecuteTask(t); // Push failed, execute directly.
|
||||||
}
|
}
|
||||||
@ -165,8 +167,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
// Exposed publicly as static functions so that external callers can reuse
|
// Exposed publicly as static functions so that external callers can reuse
|
||||||
// this encode/decode logic for maintaining their own thread-safe copies of
|
// this encode/decode logic for maintaining their own thread-safe copies of
|
||||||
// scheduling and steal domain(s).
|
// scheduling and steal domain(s).
|
||||||
static const int kMaxPartitionBits = 16;
|
static constexpr int kMaxPartitionBits = 16;
|
||||||
static const int kMaxThreads = 1 << kMaxPartitionBits;
|
static constexpr int kMaxThreads = 1 << kMaxPartitionBits;
|
||||||
|
|
||||||
inline unsigned EncodePartition(unsigned start, unsigned limit) { return (start << kMaxPartitionBits) | limit; }
|
inline unsigned EncodePartition(unsigned start, unsigned limit) { return (start << kMaxPartitionBits) | limit; }
|
||||||
|
|
||||||
@ -211,10 +213,6 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
|
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
|
||||||
uint64_t rand; // Random generator state.
|
uint64_t rand; // Random generator state.
|
||||||
int thread_id; // Worker thread index in pool.
|
int thread_id; // Worker thread index in pool.
|
||||||
#ifndef EIGEN_THREAD_LOCAL
|
|
||||||
// Prevent false sharing.
|
|
||||||
char pad_[128];
|
|
||||||
#endif
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ThreadData {
|
struct ThreadData {
|
||||||
@ -224,6 +222,58 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
Queue queue;
|
Queue queue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Maximum number of threads that can spin in steal loop.
|
||||||
|
static constexpr int kMaxSpinningThreads = 1;
|
||||||
|
|
||||||
|
// The number of steal loop spin iterations before parking (this number is
|
||||||
|
// divided by the number of threads, to get spin count for each thread).
|
||||||
|
static constexpr int kSpinCount = 5000;
|
||||||
|
|
||||||
|
// If there are enough active threads with empty pending-task queues, a thread
|
||||||
|
// that runs out of work can just be parked without spinning, because these
|
||||||
|
// active threads will go into a steal loop after finishing their current
|
||||||
|
// tasks.
|
||||||
|
//
|
||||||
|
// In the worst case when all active threads are executing long/expensive
|
||||||
|
// tasks, the next Schedule() will have to wait until one of the parked
|
||||||
|
// threads will be unparked, however this should be very rare in practice.
|
||||||
|
static constexpr int kMinActiveThreadsToStartSpinning = 4;
|
||||||
|
|
||||||
|
struct SpinningState {
|
||||||
|
// Spinning state layout:
|
||||||
|
//
|
||||||
|
// - Low 32 bits encode the number of threads that are spinning in steal
|
||||||
|
// loop.
|
||||||
|
//
|
||||||
|
// - High 32 bits encode the number of tasks that were submitted to the pool
|
||||||
|
// without a call to `ec_.Notify()`. This number can't be larger than
|
||||||
|
// the number of spinning threads. Each spinning thread, when it exits the
|
||||||
|
// spin loop must check if this number is greater than zero, and maybe
|
||||||
|
// make another attempt to steal a task and decrement it by one.
|
||||||
|
static constexpr uint64_t kNumSpinningMask = 0x00000000FFFFFFFF;
|
||||||
|
static constexpr uint64_t kNumNoNotifyMask = 0xFFFFFFFF00000000;
|
||||||
|
static constexpr uint64_t kNumNoNotifyShift = 32;
|
||||||
|
|
||||||
|
uint64_t num_spinning; // number of spinning threads
|
||||||
|
uint64_t num_no_notification; // number of tasks submitted without
|
||||||
|
// notifying waiting threads
|
||||||
|
|
||||||
|
// Decodes `spinning_state_` value.
|
||||||
|
static SpinningState Decode(uint64_t state) {
|
||||||
|
uint64_t num_spinning = (state & kNumSpinningMask);
|
||||||
|
uint64_t num_no_notification = (state & kNumNoNotifyMask) >> kNumNoNotifyShift;
|
||||||
|
|
||||||
|
assert(num_no_notification <= num_spinning);
|
||||||
|
return {num_spinning, num_no_notification};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encodes as `spinning_state_` value.
|
||||||
|
uint64_t Encode() const {
|
||||||
|
assert(num_no_notification <= num_spinning);
|
||||||
|
return (num_no_notification << kNumNoNotifyShift) | num_spinning;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Environment env_;
|
Environment env_;
|
||||||
const int num_threads_;
|
const int num_threads_;
|
||||||
const bool allow_spinning_;
|
const bool allow_spinning_;
|
||||||
@ -231,8 +281,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
|
MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
|
||||||
MaxSizeVector<EventCount::Waiter> waiters_;
|
MaxSizeVector<EventCount::Waiter> waiters_;
|
||||||
unsigned global_steal_partition_;
|
unsigned global_steal_partition_;
|
||||||
|
std::atomic<uint64_t> spinning_state_;
|
||||||
std::atomic<unsigned> blocked_;
|
std::atomic<unsigned> blocked_;
|
||||||
std::atomic<bool> spinning_;
|
|
||||||
std::atomic<bool> done_;
|
std::atomic<bool> done_;
|
||||||
std::atomic<bool> cancelled_;
|
std::atomic<bool> cancelled_;
|
||||||
EventCount ec_;
|
EventCount ec_;
|
||||||
@ -242,6 +292,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
|
std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
unsigned NumBlockedThreads() const { return blocked_.load(); }
|
||||||
|
unsigned NumActiveThreads() const { return num_threads_ - blocked_.load(); }
|
||||||
|
|
||||||
// Main worker thread loop.
|
// Main worker thread loop.
|
||||||
void WorkerLoop(int thread_id) {
|
void WorkerLoop(int thread_id) {
|
||||||
#ifndef EIGEN_THREAD_LOCAL
|
#ifndef EIGEN_THREAD_LOCAL
|
||||||
@ -262,9 +315,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
EventCount::Waiter* waiter = &waiters_[thread_id];
|
EventCount::Waiter* waiter = &waiters_[thread_id];
|
||||||
// TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
|
// TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
|
||||||
// proportional to num_threads_ and we assume that new work is scheduled at
|
// proportional to num_threads_ and we assume that new work is scheduled at
|
||||||
// a constant rate, so we set spin_count to 5000 / num_threads_. The
|
// a constant rate, so we divide `kSpintCount` by number of threads and
|
||||||
// constant was picked based on a fair dice roll, tune it.
|
// number of spinning threads. The constant was picked based on a fair dice
|
||||||
const int spin_count = allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
|
// roll, tune it.
|
||||||
|
const int spin_count = allow_spinning_ && num_threads_ > 0 ? kSpinCount / kMaxSpinningThreads / num_threads_ : 0;
|
||||||
if (num_threads_ == 1) {
|
if (num_threads_ == 1) {
|
||||||
// For num_threads_ == 1 there is no point in going through the expensive
|
// For num_threads_ == 1 there is no point in going through the expensive
|
||||||
// steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
|
// steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
|
||||||
@ -272,50 +326,70 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
// compared to the order in which they are scheduled, which tends to be
|
// compared to the order in which they are scheduled, which tends to be
|
||||||
// counter-productive for the types of I/O workloads the single thread
|
// counter-productive for the types of I/O workloads the single thread
|
||||||
// pools tend to be used for.
|
// pools tend to be used for.
|
||||||
while (!cancelled_) {
|
while (!cancelled_.load(std::memory_order_relaxed)) {
|
||||||
Task t = q.PopFront();
|
Task t = q.PopFront();
|
||||||
for (int i = 0; i < spin_count && !t.f; i++) {
|
|
||||||
if (!cancelled_.load(std::memory_order_relaxed)) {
|
for (int i = 0; i < spin_count && !t.f; ++i) {
|
||||||
t = q.PopFront();
|
t = q.PopFront();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (!t.f) {
|
|
||||||
|
if (EIGEN_PREDICT_FALSE(!t.f)) {
|
||||||
if (!WaitForWork(waiter, &t)) {
|
if (!WaitForWork(waiter, &t)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (t.f) {
|
|
||||||
|
if (EIGEN_PREDICT_TRUE(t.f)) {
|
||||||
env_.ExecuteTask(t);
|
env_.ExecuteTask(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
while (!cancelled_) {
|
while (!cancelled_.load(std::memory_order_relaxed)) {
|
||||||
Task t = q.PopFront();
|
Task t = q.PopFront();
|
||||||
if (!t.f) {
|
|
||||||
|
// Do one round of steal loop from local thread partition.
|
||||||
|
if (EIGEN_PREDICT_FALSE(!t.f)) {
|
||||||
t = LocalSteal();
|
t = LocalSteal();
|
||||||
if (!t.f) {
|
}
|
||||||
t = GlobalSteal();
|
|
||||||
if (!t.f) {
|
// Do one round of steal loop from global thread partition.
|
||||||
// Leave one thread spinning. This reduces latency.
|
if (EIGEN_PREDICT_FALSE(!t.f)) {
|
||||||
if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
|
t = GlobalSteal();
|
||||||
for (int i = 0; i < spin_count && !t.f; i++) {
|
}
|
||||||
if (!cancelled_.load(std::memory_order_relaxed)) {
|
|
||||||
t = GlobalSteal();
|
// Maybe leave a thread spinning. This improves latency.
|
||||||
} else {
|
if (EIGEN_PREDICT_FALSE(!t.f)) {
|
||||||
return;
|
if (allow_spinning_ && StartSpinning()) {
|
||||||
}
|
for (int i = 0; i < spin_count && !t.f; ++i) {
|
||||||
}
|
t = GlobalSteal();
|
||||||
spinning_ = false;
|
}
|
||||||
}
|
|
||||||
if (!t.f) {
|
// Notify `spinning_state_` that we are no longer spinning.
|
||||||
if (!WaitForWork(waiter, &t)) {
|
bool has_no_notify_task = StopSpinning();
|
||||||
return;
|
|
||||||
}
|
// If a task was submitted to the queue without a call to
|
||||||
}
|
// `ec_.Notify()` (if `IsNotifyParkedThreadRequired()` returned
|
||||||
|
// false), and we didn't steal anything above, we must try to
|
||||||
|
// steal one more time, to make sure that this task will be
|
||||||
|
// executed. We will not necessarily find it, because it might
|
||||||
|
// have been already stolen by some other thread.
|
||||||
|
if (has_no_notify_task && !t.f) {
|
||||||
|
t = GlobalSteal();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (t.f) {
|
|
||||||
|
// If we still don't have a task, wait for one. Return if thread pool is
|
||||||
|
// in cancelled state.
|
||||||
|
if (EIGEN_PREDICT_FALSE(!t.f)) {
|
||||||
|
if (!WaitForWork(waiter, &t)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute task if we found one.
|
||||||
|
if (EIGEN_PREDICT_TRUE(t.f)) {
|
||||||
env_.ExecuteTask(t);
|
env_.ExecuteTask(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -437,6 +511,76 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StartSpinning() checks if the number of threads in the spin loop is less
|
||||||
|
// than the allowed maximum. If so, increments the number of spinning threads
|
||||||
|
// by one and returns true (caller must enter the spin loop). Otherwise
|
||||||
|
// returns false, and the caller must not enter the spin loop.
|
||||||
|
bool StartSpinning() {
|
||||||
|
if (NumActiveThreads() > kMinActiveThreadsToStartSpinning) return false;
|
||||||
|
|
||||||
|
uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
|
||||||
|
for (;;) {
|
||||||
|
SpinningState state = SpinningState::Decode(spinning);
|
||||||
|
|
||||||
|
if ((state.num_spinning - state.num_no_notification) >= kMaxSpinningThreads) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment the number of spinning threads.
|
||||||
|
++state.num_spinning;
|
||||||
|
|
||||||
|
if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopSpinning() decrements the number of spinning threads by one. It also
|
||||||
|
// checks if there were any tasks submitted into the pool without notifying
|
||||||
|
// parked threads, and decrements the count by one. Returns true if the number
|
||||||
|
// of tasks submitted without notification was decremented. In this case,
|
||||||
|
// caller thread might have to call Steal() one more time.
|
||||||
|
bool StopSpinning() {
|
||||||
|
uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
|
||||||
|
for (;;) {
|
||||||
|
SpinningState state = SpinningState::Decode(spinning);
|
||||||
|
|
||||||
|
// Decrement the number of spinning threads.
|
||||||
|
--state.num_spinning;
|
||||||
|
|
||||||
|
// Maybe decrement the number of tasks submitted without notification.
|
||||||
|
bool has_no_notify_task = state.num_no_notification > 0;
|
||||||
|
if (has_no_notify_task) --state.num_no_notification;
|
||||||
|
|
||||||
|
if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
|
||||||
|
return has_no_notify_task;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNotifyParkedThreadRequired() returns true if parked thread must be
|
||||||
|
// notified about new added task. If there are threads spinning in the steal
|
||||||
|
// loop, there is no need to unpark any of the waiting threads, the task will
|
||||||
|
// be picked up by one of the spinning threads.
|
||||||
|
bool IsNotifyParkedThreadRequired() {
|
||||||
|
uint64_t spinning = spinning_state_.load(std::memory_order_relaxed);
|
||||||
|
for (;;) {
|
||||||
|
SpinningState state = SpinningState::Decode(spinning);
|
||||||
|
|
||||||
|
// If the number of tasks submitted without notifying parked threads is
|
||||||
|
// equal to the number of spinning threads, we must wake up one of the
|
||||||
|
// parked threads.
|
||||||
|
if (state.num_no_notification == state.num_spinning) return true;
|
||||||
|
|
||||||
|
// Increment the number of tasks submitted without notification.
|
||||||
|
++state.num_no_notification;
|
||||||
|
|
||||||
|
if (spinning_state_.compare_exchange_weak(spinning, state.Encode(), std::memory_order_relaxed)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
|
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
|
||||||
return std::hash<std::thread::id>()(std::this_thread::get_id());
|
return std::hash<std::thread::id>()(std::this_thread::get_id());
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user