Speedup Tensor ThreadPool RunQueu::Empty()

This commit is contained in:
Eugene Zhulenev 2019-02-13 10:20:53 -08:00
parent bdcb5f3304
commit 8c2f30c790

View File

@ -148,32 +148,46 @@ class RunQueue {
return n; return n;
} }
// Size returns current queue size. // Size returns current queue size; if NeedSizeEstimate is false, only whether
// the size is 0 is guaranteed to be correct.
// Can be called by any thread at any time. // Can be called by any thread at any time.
unsigned Size() const { template <bool NeedSizeEstimate>
unsigned SizeOrNotEmpty() const {
// Emptiness plays critical role in thread pool blocking. So we go to great // Emptiness plays critical role in thread pool blocking. So we go to great
// effort to not produce false positives (claim non-empty queue as empty). // effort to not produce false positives (claim non-empty queue as empty).
unsigned front = front_.load(std::memory_order_acquire);
for (;;) { for (;;) {
// Capture a consistent snapshot of front/tail. // Capture a consistent snapshot of front/tail.
unsigned front = front_.load(std::memory_order_acquire);
unsigned back = back_.load(std::memory_order_acquire); unsigned back = back_.load(std::memory_order_acquire);
unsigned front1 = front_.load(std::memory_order_relaxed); unsigned front1 = front_.load(std::memory_order_relaxed);
if (front != front1) continue; if (front != front1) {
int size = (front & kMask2) - (back & kMask2); front = front1;
// Fix overflow. std::atomic_thread_fence(std::memory_order_acquire);
if (size < 0) size += 2 * kSize; continue;
// Order of modification in push/pop is crafted to make the queue look }
// larger than it is during concurrent modifications. E.g. pop can if (NeedSizeEstimate) {
// decrement size before the corresponding push has incremented it. int size = (front & kMask2) - (back & kMask2);
// So the computed size can be up to kSize + 1, fix it. // Fix overflow.
if (size > static_cast<int>(kSize)) size = kSize; if (size < 0) size += 2 * kSize;
return size; // Order of modification in push/pop is crafted to make the queue look
// larger than it is during concurrent modifications. E.g. pop can
// decrement size before the corresponding push has incremented it.
// So the computed size can be up to kSize + 1, fix it.
if (size > kSize) size = kSize;
return size;
} else {
return ((front ^ back) & kMask2);
}
} }
} }
// Size returns current queue size.
// Can be called by any thread at any time.
unsigned Size() const { return SizeOrNotEmpty<true>(); }
// Empty tests whether container is empty. // Empty tests whether container is empty.
// Can be called by any thread at any time. // Can be called by any thread at any time.
bool Empty() const { return Size() == 0; } bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
// Delete all the elements from the queue. // Delete all the elements from the queue.
void Flush() { void Flush() {