mirror of
https://gitlab.com/libeigen/eigen.git
synced 2025-08-12 19:59:05 +08:00
Merged in ezhulenev/eigen-01 (pull request PR-683)
Asynchronous parallelFor in Eigen ThreadPoolDevice
This commit is contained in:
commit
84fefdf321
@ -181,44 +181,173 @@ struct ThreadPoolDevice {
|
|||||||
return pool_->CurrentThreadId();
|
return pool_->CurrentThreadId();
|
||||||
}
|
}
|
||||||
|
|
||||||
// parallelFor executes f with [0, n) arguments in parallel and waits for
|
// WARNING: This function is synchronous and will block the calling thread.
|
||||||
// completion. F accepts a half-open interval [first, last).
|
//
|
||||||
// Block size is chosen based on the iteration cost and resulting parallel
|
// Synchronous parallelFor executes f with [0, n) arguments in parallel and
|
||||||
|
// waits for completion. F accepts a half-open interval [first, last). Block
|
||||||
|
// size is chosen based on the iteration cost and resulting parallel
|
||||||
// efficiency. If block_align is not nullptr, it is called to round up the
|
// efficiency. If block_align is not nullptr, it is called to round up the
|
||||||
// block size.
|
// block size.
|
||||||
void parallelFor(Index n, const TensorOpCost& cost,
|
void parallelFor(Index n, const TensorOpCost& cost,
|
||||||
std::function<Index(Index)> block_align,
|
std::function<Index(Index)> block_align,
|
||||||
std::function<void(Index, Index)> f) const {
|
std::function<void(Index, Index)> f) const {
|
||||||
typedef TensorCostModel<ThreadPoolDevice> CostModel;
|
// Compute small problems directly in the caller thread.
|
||||||
if (n <= 1 || numThreads() == 1 ||
|
if (n <= 1 || numThreads() == 1 ||
|
||||||
CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
|
CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
|
||||||
f(0, n);
|
f(0, n);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate block size based on (1) the iteration cost and (2) parallel
|
// Compute block size and total count of blocks.
|
||||||
// efficiency. We want blocks to be not too small to mitigate
|
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
|
||||||
// parallelization overheads; not too large to mitigate tail
|
|
||||||
// effect and potential load imbalance and we also want number
|
|
||||||
// of blocks to be evenly dividable across threads.
|
|
||||||
|
|
||||||
double block_size_f = 1.0 / CostModel::taskSize(1, cost);
|
// Recursively divide size into halves until we reach block_size.
|
||||||
|
// Division code rounds mid to block_size, so we are guaranteed to get
|
||||||
|
// block_count leaves that do actual computations.
|
||||||
|
Barrier barrier(static_cast<unsigned int>(block.count));
|
||||||
|
std::function<void(Index, Index)> handleRange;
|
||||||
|
handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
|
||||||
|
Index lastIdx) {
|
||||||
|
while (lastIdx - firstIdx > block.size) {
|
||||||
|
// Split into halves and schedule the second half on a different thread.
|
||||||
|
const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
|
||||||
|
pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
|
||||||
|
lastIdx = midIdx;
|
||||||
|
}
|
||||||
|
// Single block or less, execute directly.
|
||||||
|
f(firstIdx, lastIdx);
|
||||||
|
barrier.Notify();
|
||||||
|
};
|
||||||
|
|
||||||
|
if (block.count <= numThreads()) {
|
||||||
|
// Avoid a thread hop by running the root of the tree and one block on the
|
||||||
|
// main thread.
|
||||||
|
handleRange(0, n);
|
||||||
|
} else {
|
||||||
|
// Execute the root in the thread pool to avoid running work on more than
|
||||||
|
// numThreads() threads.
|
||||||
|
pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
|
||||||
|
}
|
||||||
|
|
||||||
|
barrier.Wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convenience wrapper for parallelFor that does not align blocks.
|
||||||
|
void parallelFor(Index n, const TensorOpCost& cost,
|
||||||
|
std::function<void(Index, Index)> f) const {
|
||||||
|
parallelFor(n, cost, NULL, std::move(f));
|
||||||
|
}
|
||||||
|
|
||||||
|
// WARNING: This function is asynchronous and will not block the calling thread.
|
||||||
|
//
|
||||||
|
// Asynchronous parallelFor executes f with [0, n) arguments in parallel
|
||||||
|
// without waiting for completion. When the last block finished, it will call
|
||||||
|
// 'done' callback. F accepts a half-open interval [first, last). Block size
|
||||||
|
// is chosen based on the iteration cost and resulting parallel efficiency. If
|
||||||
|
// block_align is not nullptr, it is called to round up the block size.
|
||||||
|
void parallelForAsync(Index n, const TensorOpCost& cost,
|
||||||
|
std::function<Index(Index)> block_align,
|
||||||
|
std::function<void(Index, Index)> f,
|
||||||
|
std::function<void()> done) const {
|
||||||
|
// Compute block size and total count of blocks.
|
||||||
|
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
|
||||||
|
|
||||||
|
ParallelForAsyncContext* const ctx =
|
||||||
|
new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
|
||||||
|
|
||||||
|
// Recursively divide size into halves until we reach block_size.
|
||||||
|
// Division code rounds mid to block_size, so we are guaranteed to get
|
||||||
|
// block_count leaves that do actual computations.
|
||||||
|
ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
|
||||||
|
while (lastIdx - firstIdx > block.size) {
|
||||||
|
// Split into halves and schedule the second half on a different thread.
|
||||||
|
const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
|
||||||
|
pool_->Schedule(
|
||||||
|
[ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
|
||||||
|
lastIdx = midIdx;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Single block or less, execute directly.
|
||||||
|
ctx->f(firstIdx, lastIdx);
|
||||||
|
|
||||||
|
// Call 'done' callback if it was the last block.
|
||||||
|
if (ctx->count.fetch_sub(1) == 1) {
|
||||||
|
(ctx->done)();
|
||||||
|
// We can't delete ctx right now, because it will deallocate the closure
|
||||||
|
// we are currently in.
|
||||||
|
pool_->Schedule([ctx]() { delete ctx; });
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Execute the root in the thread pool.
|
||||||
|
pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convenience wrapper for parallelForAsync that does not align blocks.
|
||||||
|
void parallelForAsync(Index n, const TensorOpCost& cost,
|
||||||
|
std::function<void(Index, Index)> f,
|
||||||
|
std::function<void()> done) const {
|
||||||
|
parallelForAsync(n, cost, NULL, std::move(f), std::move(done));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Thread pool accessor.
|
||||||
|
ThreadPoolInterface* getPool() const { return pool_; }
|
||||||
|
|
||||||
|
// Allocator accessor.
|
||||||
|
Allocator* allocator() const { return allocator_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
typedef TensorCostModel<ThreadPoolDevice> CostModel;
|
||||||
|
|
||||||
|
// For parallelForAsync we must keep passed in closures on the heap, and
|
||||||
|
// delete them only after `done` callback finished.
|
||||||
|
struct ParallelForAsyncContext {
|
||||||
|
ParallelForAsyncContext(Index count, std::function<void(Index, Index)> f,
|
||||||
|
std::function<void()> done)
|
||||||
|
: count(count), f(std::move(f)), done(std::move(done)) {}
|
||||||
|
|
||||||
|
std::atomic<Index> count;
|
||||||
|
std::function<void(Index, Index)> f;
|
||||||
|
std::function<void()> done;
|
||||||
|
|
||||||
|
std::function<void(Index, Index)> handle_range;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ParallelForBlock {
|
||||||
|
Index size; // block size
|
||||||
|
Index count; // number of blocks
|
||||||
|
};
|
||||||
|
|
||||||
|
// Calculates block size based on (1) the iteration cost and (2) parallel
|
||||||
|
// efficiency. We want blocks to be not too small to mitigate parallelization
|
||||||
|
// overheads; not too large to mitigate tail effect and potential load
|
||||||
|
// imbalance and we also want number of blocks to be evenly dividable across
|
||||||
|
// threads.
|
||||||
|
ParallelForBlock CalculateParallelForBlock(
|
||||||
|
const Index n, const TensorOpCost& cost,
|
||||||
|
std::function<Index(Index)> block_align) const {
|
||||||
|
const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
|
||||||
const Index max_oversharding_factor = 4;
|
const Index max_oversharding_factor = 4;
|
||||||
Index block_size = numext::mini(
|
Index block_size = numext::mini(
|
||||||
n, numext::maxi<Index>(divup<Index>(n, max_oversharding_factor * numThreads()),
|
n, numext::maxi<Index>(
|
||||||
|
divup<Index>(n, max_oversharding_factor * numThreads()),
|
||||||
block_size_f));
|
block_size_f));
|
||||||
const Index max_block_size = numext::mini(n, 2 * block_size);
|
const Index max_block_size = numext::mini(n, 2 * block_size);
|
||||||
|
|
||||||
if (block_align) {
|
if (block_align) {
|
||||||
Index new_block_size = block_align(block_size);
|
Index new_block_size = block_align(block_size);
|
||||||
eigen_assert(new_block_size >= block_size);
|
eigen_assert(new_block_size >= block_size);
|
||||||
block_size = numext::mini(n, new_block_size);
|
block_size = numext::mini(n, new_block_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
Index block_count = divup(n, block_size);
|
Index block_count = divup(n, block_size);
|
||||||
|
|
||||||
// Calculate parallel efficiency as fraction of total CPU time used for
|
// Calculate parallel efficiency as fraction of total CPU time used for
|
||||||
// computations:
|
// computations:
|
||||||
double max_efficiency =
|
double max_efficiency =
|
||||||
static_cast<double>(block_count) /
|
static_cast<double>(block_count) /
|
||||||
(divup<int>(block_count, numThreads()) * numThreads());
|
(divup<int>(block_count, numThreads()) * numThreads());
|
||||||
|
|
||||||
// Now try to increase block size up to max_block_size as long as it
|
// Now try to increase block size up to max_block_size as long as it
|
||||||
// doesn't decrease parallel efficiency.
|
// doesn't decrease parallel efficiency.
|
||||||
for (Index prev_block_count = block_count;
|
for (Index prev_block_count = block_count;
|
||||||
@ -251,47 +380,9 @@ struct ThreadPoolDevice {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recursively divide size into halves until we reach block_size.
|
return {block_size, block_count};
|
||||||
// Division code rounds mid to block_size, so we are guaranteed to get
|
|
||||||
// block_count leaves that do actual computations.
|
|
||||||
Barrier barrier(static_cast<unsigned int>(block_count));
|
|
||||||
std::function<void(Index, Index)> handleRange;
|
|
||||||
handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
|
|
||||||
while (lastIdx - firstIdx > block_size) {
|
|
||||||
// Split into halves and schedule the second half on a different thread.
|
|
||||||
const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block_size) * block_size;
|
|
||||||
pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
|
|
||||||
lastIdx = midIdx;
|
|
||||||
}
|
|
||||||
// Single block or less, execute directly.
|
|
||||||
f(firstIdx, lastIdx);
|
|
||||||
barrier.Notify();
|
|
||||||
};
|
|
||||||
if (block_count <= numThreads()) {
|
|
||||||
// Avoid a thread hop by running the root of the tree and one block on the
|
|
||||||
// main thread.
|
|
||||||
handleRange(0, n);
|
|
||||||
} else {
|
|
||||||
// Execute the root in the thread pool to avoid running work on more than
|
|
||||||
// numThreads() threads.
|
|
||||||
pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
|
|
||||||
}
|
|
||||||
barrier.Wait();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convenience wrapper for parallelFor that does not align blocks.
|
|
||||||
void parallelFor(Index n, const TensorOpCost& cost,
|
|
||||||
std::function<void(Index, Index)> f) const {
|
|
||||||
parallelFor(n, cost, NULL, std::move(f));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Thread pool accessor.
|
|
||||||
ThreadPoolInterface* getPool() const { return pool_; }
|
|
||||||
|
|
||||||
// Allocator accessor.
|
|
||||||
Allocator* allocator() const { return allocator_; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
ThreadPoolInterface* pool_;
|
ThreadPoolInterface* pool_;
|
||||||
int num_threads_;
|
int num_threads_;
|
||||||
Allocator* allocator_;
|
Allocator* allocator_;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user