diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index ef22a268a..ca2794cb5 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -75,9 +75,9 @@ struct ThreadPoolDevice { EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { deallocate(buffer); } - + template - EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const { + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const { return data; } @@ -181,44 +181,173 @@ struct ThreadPoolDevice { return pool_->CurrentThreadId(); } - // parallelFor executes f with [0, n) arguments in parallel and waits for - // completion. F accepts a half-open interval [first, last). - // Block size is chosen based on the iteration cost and resulting parallel + // WARNING: This function is synchronous and will block the calling thread. + // + // Synchronous parallelFor executes f with [0, n) arguments in parallel and + // waits for completion. F accepts a half-open interval [first, last). Block + // size is chosen based on the iteration cost and resulting parallel // efficiency. If block_align is not nullptr, it is called to round up the // block size. void parallelFor(Index n, const TensorOpCost& cost, std::function block_align, std::function f) const { - typedef TensorCostModel CostModel; + // Compute small problems directly in the caller thread. if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast(numThreads())) == 1) { f(0, n); return; } - // Calculate block size based on (1) the iteration cost and (2) parallel - // efficiency. We want blocks to be not too small to mitigate - // parallelization overheads; not too large to mitigate tail - // effect and potential load imbalance and we also want number - // of blocks to be evenly dividable across threads. + // Compute block size and total count of blocks. + ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); - double block_size_f = 1.0 / CostModel::taskSize(1, cost); + // Recursively divide size into halves until we reach block_size. + // Division code rounds mid to block_size, so we are guaranteed to get + // block_count leaves that do actual computations. + Barrier barrier(static_cast(block.count)); + std::function handleRange; + handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, + Index lastIdx) { + while (lastIdx - firstIdx > block.size) { + // Split into halves and schedule the second half on a different thread. + const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size; + pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); }); + lastIdx = midIdx; + } + // Single block or less, execute directly. + f(firstIdx, lastIdx); + barrier.Notify(); + }; + + if (block.count <= numThreads()) { + // Avoid a thread hop by running the root of the tree and one block on the + // main thread. + handleRange(0, n); + } else { + // Execute the root in the thread pool to avoid running work on more than + // numThreads() threads. + pool_->Schedule([=, &handleRange]() { handleRange(0, n); }); + } + + barrier.Wait(); + } + + // Convenience wrapper for parallelFor that does not align blocks. + void parallelFor(Index n, const TensorOpCost& cost, + std::function f) const { + parallelFor(n, cost, NULL, std::move(f)); + } + + // WARNING: This function is asynchronous and will not block the calling thread. + // + // Asynchronous parallelFor executes f with [0, n) arguments in parallel + // without waiting for completion. When the last block finished, it will call + // 'done' callback. F accepts a half-open interval [first, last). Block size + // is chosen based on the iteration cost and resulting parallel efficiency. If + // block_align is not nullptr, it is called to round up the block size. + void parallelForAsync(Index n, const TensorOpCost& cost, + std::function block_align, + std::function f, + std::function done) const { + // Compute block size and total count of blocks. + ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); + + ParallelForAsyncContext* const ctx = + new ParallelForAsyncContext(block.count, std::move(f), std::move(done)); + + // Recursively divide size into halves until we reach block_size. + // Division code rounds mid to block_size, so we are guaranteed to get + // block_count leaves that do actual computations. + ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) { + while (lastIdx - firstIdx > block.size) { + // Split into halves and schedule the second half on a different thread. + const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size; + pool_->Schedule( + [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); }); + lastIdx = midIdx; + } + + // Single block or less, execute directly. + ctx->f(firstIdx, lastIdx); + + // Call 'done' callback if it was the last block. + if (ctx->count.fetch_sub(1) == 1) { + (ctx->done)(); + // We can't delete ctx right now, because it will deallocate the closure + // we are currently in. + pool_->Schedule([ctx]() { delete ctx; }); + } + }; + + // Execute the root in the thread pool. + pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); }); + } + + // Convenience wrapper for parallelForAsync that does not align blocks. + void parallelForAsync(Index n, const TensorOpCost& cost, + std::function f, + std::function done) const { + parallelForAsync(n, cost, NULL, std::move(f), std::move(done)); + } + + // Thread pool accessor. + ThreadPoolInterface* getPool() const { return pool_; } + + // Allocator accessor. + Allocator* allocator() const { return allocator_; } + + private: + typedef TensorCostModel CostModel; + + // For parallelForAsync we must keep passed in closures on the heap, and + // delete them only after `done` callback finished. + struct ParallelForAsyncContext { + ParallelForAsyncContext(Index count, std::function f, + std::function done) + : count(count), f(std::move(f)), done(std::move(done)) {} + + std::atomic count; + std::function f; + std::function done; + + std::function handle_range; + }; + + struct ParallelForBlock { + Index size; // block size + Index count; // number of blocks + }; + + // Calculates block size based on (1) the iteration cost and (2) parallel + // efficiency. We want blocks to be not too small to mitigate parallelization + // overheads; not too large to mitigate tail effect and potential load + // imbalance and we also want number of blocks to be evenly dividable across + // threads. + ParallelForBlock CalculateParallelForBlock( + const Index n, const TensorOpCost& cost, + std::function block_align) const { + const double block_size_f = 1.0 / CostModel::taskSize(1, cost); const Index max_oversharding_factor = 4; Index block_size = numext::mini( - n, numext::maxi(divup(n, max_oversharding_factor * numThreads()), - block_size_f)); + n, numext::maxi( + divup(n, max_oversharding_factor * numThreads()), + block_size_f)); const Index max_block_size = numext::mini(n, 2 * block_size); + if (block_align) { Index new_block_size = block_align(block_size); eigen_assert(new_block_size >= block_size); block_size = numext::mini(n, new_block_size); } + Index block_count = divup(n, block_size); + // Calculate parallel efficiency as fraction of total CPU time used for // computations: double max_efficiency = static_cast(block_count) / (divup(block_count, numThreads()) * numThreads()); + // Now try to increase block size up to max_block_size as long as it // doesn't decrease parallel efficiency. for (Index prev_block_count = block_count; @@ -251,47 +380,9 @@ struct ThreadPoolDevice { } } - // Recursively divide size into halves until we reach block_size. - // Division code rounds mid to block_size, so we are guaranteed to get - // block_count leaves that do actual computations. - Barrier barrier(static_cast(block_count)); - std::function handleRange; - handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) { - while (lastIdx - firstIdx > block_size) { - // Split into halves and schedule the second half on a different thread. - const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block_size) * block_size; - pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); }); - lastIdx = midIdx; - } - // Single block or less, execute directly. - f(firstIdx, lastIdx); - barrier.Notify(); - }; - if (block_count <= numThreads()) { - // Avoid a thread hop by running the root of the tree and one block on the - // main thread. - handleRange(0, n); - } else { - // Execute the root in the thread pool to avoid running work on more than - // numThreads() threads. - pool_->Schedule([=, &handleRange]() { handleRange(0, n); }); - } - barrier.Wait(); + return {block_size, block_count}; } - // Convenience wrapper for parallelFor that does not align blocks. - void parallelFor(Index n, const TensorOpCost& cost, - std::function f) const { - parallelFor(n, cost, NULL, std::move(f)); - } - - // Thread pool accessor. - ThreadPoolInterface* getPool() const { return pool_; } - - // Allocator accessor. - Allocator* allocator() const { return allocator_; } - - private: ThreadPoolInterface* pool_; int num_threads_; Allocator* allocator_;