Added a new parallelFor api to the thread pool device.

This commit is contained in:
Benoit Steiner 2016-05-09 10:45:12 -07:00
parent dc7dbc2df7
commit ba95e43ea2

View File

@ -172,7 +172,105 @@ struct ThreadPoolDevice {
pool_->Schedule(func);
}
// parallelFor executes f with [0, size) arguments in parallel and waits for
// completion. Block size is choosen between min_block_size and
// 2 * min_block_size to achieve the best parallel efficiency.
// If min_block_size == -1, parallelFor uses block size of 1.
// If hard_align > 0, block size is aligned to hard_align.
// If soft_align > hard_align, block size is aligned to soft_align provided
// that it does not increase block size too much.
void parallelFor(Index size, Index min_block_size, Index hard_align,
Index soft_align,
std::function<void(Index, Index)> f) const {
if (size <= 1 || (min_block_size != -1 && size < min_block_size) ||
numThreads() == 1) {
f(0, size);
return;
}
Index block_size = 1;
Index block_count = size;
if (min_block_size != -1) {
// Calculate block size based on (1) estimated cost and (2) parallel
// efficiency. We want blocks to be not too small to mitigate
// parallelization overheads; not too large to mitigate tail effect and
// potential load imbalance and we also want number of blocks to be evenly
// dividable across threads.
min_block_size = numext::maxi<Index>(min_block_size, 1);
block_size = numext::mini(min_block_size, size);
// Upper bound on block size:
const Index max_block_size = numext::mini(min_block_size * 2, size);
block_size = numext::mini(
alignBlockSize(block_size, hard_align, soft_align), size);
block_count = divup(size, block_size);
// Calculate parallel efficiency as fraction of total CPU time used for
// computations:
double max_efficiency =
static_cast<double>(block_count) /
(divup<int>(block_count, numThreads()) * numThreads());
// Now try to increase block size up to max_block_size as long as it
// doesn't decrease parallel efficiency.
for (Index prev_block_count = block_count; prev_block_count > 1;) {
// This is the next block size that divides size into a smaller number
// of blocks than the current block_size.
Index coarser_block_size = divup(size, prev_block_count - 1);
coarser_block_size =
alignBlockSize(coarser_block_size, hard_align, soft_align);
if (coarser_block_size > max_block_size) {
break; // Reached max block size. Stop.
}
// Recalculate parallel efficiency.
const Index coarser_block_count = divup(size, coarser_block_size);
eigen_assert(coarser_block_count < prev_block_count);
prev_block_count = coarser_block_count;
const double coarser_efficiency =
static_cast<double>(coarser_block_count) /
(divup<int>(coarser_block_count, numThreads()) * numThreads());
if (coarser_efficiency + 0.01 >= max_efficiency) {
// Taking it.
block_size = coarser_block_size;
block_count = coarser_block_count;
if (max_efficiency < coarser_efficiency) {
max_efficiency = coarser_efficiency;
}
}
}
}
// Recursively divide size into halves until we reach block_size.
// Division code rounds mid to block_size, so we are guaranteed to get
// block_count leaves that do actual computations.
Barrier barrier(block_count);
std::function<void(Index, Index)> handleRange;
handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
if (last - first <= block_size) {
// Single block or less, execute directly.
f(first, last);
barrier.Notify();
return;
}
// Split into halves and submit to the pool.
Index mid = first + divup((last - first) / 2, block_size) * block_size;
pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
};
handleRange(0, size);
barrier.Wait();
}
private:
static Index alignBlockSize(Index size, Index hard_align, Index soft_align) {
if (soft_align > hard_align && size >= 4 * soft_align) {
// Align to soft_align, if it won't increase size by more than 25%.
return (size + soft_align - 1) & ~(soft_align - 1);
}
if (hard_align > 0) {
return (size + hard_align - 1) & ~(hard_align - 1);
}
return size;
}
ThreadPoolInterface* pool_;
size_t num_threads_;
};