diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h index 288d79f1f..99e7304d8 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorContractionThreadPool.h @@ -932,7 +932,7 @@ struct TensorEvaluator 1) { Index mid = (start + end) / 2; - device_.enqueueNoNotification([this, mid, end, k, rhs]() { + device_.enqueue([this, mid, end, k, rhs]() { enqueue_packing_helper(mid, end, k, rhs); }); end = mid; @@ -1000,7 +1000,7 @@ struct TensorEvaluator 0 || std::this_thread::get_id() == created_by_thread_id_); if (pack_async) { - device_.enqueueNoNotification([this, start, end, k, rhs]() { + device_.enqueue([this, start, end, k, rhs]() { enqueue_packing_helper(start, end, k, rhs); }); } else { @@ -1264,7 +1264,7 @@ struct TensorEvaluator 1) { Index mid_block_idx = (start_block_idx + end_block_idx) / 2; - evaluator->m_device.enqueueNoNotification([this, &barrier, mid_block_idx, end_block_idx]() { + evaluator->m_device.enqueue([this, &barrier, mid_block_idx, end_block_idx]() { eval(barrier, mid_block_idx, end_block_idx); }); end_block_idx = mid_block_idx; @@ -1282,7 +1282,7 @@ struct TensorEvaluator 1) { Index mid_block_idx = (start_block_idx + end_block_idx) / 2; - evaluator->m_device.enqueueNoNotification( + evaluator->m_device.enqueue( [this, mid_block_idx, end_block_idx]() { evalAsync(mid_block_idx, end_block_idx); }); diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index c95c8f223..fe49d18b6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -15,35 +15,6 @@ namespace Eigen { -// Runs an arbitrary function and then calls Notify() on the passed in -// Notification. -template -struct FunctionWrapperWithNotification { - static void run(Notification* n, Function f, Args... args) { - f(args...); - if (n) { - n->Notify(); - } - } -}; - -template -struct FunctionWrapperWithBarrier { - static void run(Barrier* b, Function f, Args... args) { - f(args...); - if (b) { - b->Notify(); - } - } -}; - -template -static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { - if (n) { - n->Wait(); - } -} - // An abstract interface to a device specific memory allocator. class Allocator { public: @@ -98,8 +69,9 @@ struct ThreadPoolDevice { Barrier barrier(static_cast(num_threads - 1)); // Launch the last 3 blocks on worker threads. for (size_t i = 1; i < num_threads; ++i) { - enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] { + enqueue([n, i, src_ptr, dst_ptr, blocksize, &barrier] { ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize))); + barrier.Notify(); }); } // Launch the first block on the main thread. @@ -140,24 +112,22 @@ struct ThreadPoolDevice { return 1; } - template - EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { - Notification* n = new Notification(); - pool_->Schedule( - std::bind(&FunctionWrapperWithNotification::run, n, std::forward(f), args...)); - return n; - } - - template - EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f, Args&&... args) const { - pool_->Schedule( - std::bind(&FunctionWrapperWithBarrier::run, b, std::forward(f), args...)); - } - + // TODO(rmlarsen): Remove this deprecated interface when all users have been converted. template EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { + enqueue(std::forward(f), std::forward(args)...); + } + + template + EIGEN_STRONG_INLINE void enqueue(Function&& f, Args&&... args) const { if (sizeof...(args) > 0) { - pool_->Schedule(std::bind(std::forward(f), args...)); +#if EIGEN_COMP_CXXVER >= 20 + auto run_f = [f = std::forward(f), + ...args = std::forward(args)]() { f(args...); }; +#else + auto run_f = [f = std::forward(f), &args...]() { f(args...); }; +#endif + pool_->Schedule(std::move(run_f)); } else { pool_->Schedule(std::forward(f)); } @@ -191,27 +161,16 @@ struct ThreadPoolDevice { // Division code rounds mid to block_size, so we are guaranteed to get // block_count leaves that do actual computations. Barrier barrier(static_cast(block.count)); - std::function handleRange; - handleRange = [this, block, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) { - while (lastIdx - firstIdx > block.size) { - // Split into halves and schedule the second half on a different thread. - const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size; - pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); }); - lastIdx = midIdx; - } - // Single block or less, execute directly. - f(firstIdx, lastIdx); - barrier.Notify(); - }; - if (block.count <= numThreads()) { // Avoid a thread hop by running the root of the tree and one block on the // main thread. - handleRange(0, n); + handleRange(0, n, block.size, &barrier, pool_, f); } else { // Execute the root in the thread pool to avoid running work on more than // numThreads() threads. - pool_->Schedule([=, &handleRange]() { handleRange(0, n); }); + pool_->Schedule([this, n, &block, &barrier, &f]() { + handleRange(0, n, block.size, &barrier, pool_, f); + }); } barrier.Wait(); @@ -287,6 +246,21 @@ struct ThreadPoolDevice { private: typedef TensorCostModel CostModel; + static void handleRange(Index firstIdx, Index lastIdx, Index granularity, + Barrier* barrier, ThreadPoolInterface* pool, const std::function& f) { + while (lastIdx - firstIdx > granularity) { + // Split into halves and schedule the second half on a different thread. + const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, granularity) * granularity; + pool->Schedule([=, &f]() { + handleRange(midIdx, lastIdx, granularity, barrier, pool, f); + }); + lastIdx = midIdx; + } + // Single block or less, execute directly. + f(firstIdx, lastIdx); + barrier->Notify(); + } + // For parallelForAsync we must keep passed in closures on the heap, and // delete them only after `done` callback finished. struct ParallelForAsyncContext { diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index b1da1a5e6..da3321073 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -352,7 +352,7 @@ class TensorExecutor -struct FullReducerShard { - static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(const Self& self, typename Self::Index firstIndex, - typename Self::Index numValuesToReduce, Op& reducer, - typename Self::CoeffReturnType* output) { - *output = InnerMostDimReducer::reduce(self, firstIndex, numValuesToReduce, reducer); - } -}; - // Multithreaded full reducer template struct FullReducer { @@ -397,8 +386,11 @@ struct FullReducer { Barrier barrier(internal::convert_index(numblocks)); MaxSizeVector shards(numblocks, reducer.initialize()); for (Index i = 0; i < numblocks; ++i) { - device.enqueue_with_barrier(&barrier, &FullReducerShard::run, self, i * blocksize, - blocksize, reducer, &shards[i]); + auto run_shard = [i, blocksize, &self, &barrier, &shards, &reducer](){ + shards[i] = InnerMostDimReducer::reduce(self, i * blocksize, blocksize, reducer); + barrier.Notify(); + }; + device.enqueue(std::move(run_shard)); } typename Self::CoeffReturnType finalShard; if (numblocks * blocksize < num_coeffs) { @@ -888,10 +880,6 @@ struct TensorReductionEvaluatorBase friend struct internal::FullReducer; -#ifdef EIGEN_USE_THREADS - template - friend struct internal::FullReducerShard; -#endif #if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC)) template KERNEL_FRIEND void internal::FullReductionKernel(R, const S, I_, typename S::CoeffReturnType*, unsigned int*);