From 97c0c5d485ddec0369326825a41db48d8505cf4c Mon Sep 17 00:00:00 2001 From: Rasmus Munk Larsen Date: Tue, 22 Oct 2019 12:42:44 -0700 Subject: [PATCH] Add block evaluation V2 to TensorAsyncExecutor. Add async evaluation to a number of ops. --- .../Eigen/CXX11/src/Tensor/TensorBase.h | 7 +- .../Eigen/CXX11/src/Tensor/TensorDevice.h | 56 +++++--- .../Eigen/CXX11/src/Tensor/TensorEvalTo.h | 10 ++ .../Eigen/CXX11/src/Tensor/TensorExecutor.h | 125 +++++++++++++++--- .../Eigen/CXX11/src/Tensor/TensorForcedEval.h | 31 +++-- .../src/Tensor/TensorForwardDeclarations.h | 4 +- .../Eigen/CXX11/src/Tensor/TensorMorphing.h | 8 ++ .../Eigen/CXX11/src/Tensor/TensorReduction.h | 45 +++++-- unsupported/test/cxx11_tensor_executor.cpp | 6 +- 9 files changed, 226 insertions(+), 66 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h index a951bea6d..7aa98fac6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h @@ -1129,16 +1129,11 @@ class TensorBase : public TensorBase { return TensorDevice(dev, derived()); } -#ifdef EIGEN_USE_THREADS // Select the async device on which to evaluate the expression. template - typename internal::enable_if< - internal::is_same::value, - TensorAsyncDevice>::type - device(const DeviceType& dev, DoneCallback done) { + TensorAsyncDevice device(const DeviceType& dev, DoneCallback done) { return TensorAsyncDevice(dev, derived(), std::move(done)); } -#endif // EIGEN_USE_THREADS protected: EIGEN_DEVICE_FUNC diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h index cc9c65702..804a16cc5 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h @@ -63,18 +63,18 @@ template class TensorDevice { ExpressionType& m_expression; }; -#ifdef EIGEN_USE_THREADS - /** \class TensorAsyncDevice - * \ingroup CXX11_Tensor_Module - * - * \brief Pseudo expression providing an operator = that will evaluate its - * argument asynchronously on the specified device (currently supports only - * ThreadPoolDevice). - * - * Example: - * auto done = []() { ... expression evaluation done ... }; - * C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B; + * \ingroup CXX11_Tensor_Module + * + * \brief Pseudo expression providing an operator = that will evaluate its + * argument asynchronously on the specified device. Currently only + * ThreadPoolDevice implements proper asynchronous execution, while the default + * and GPU devices just run the expression synchronously and call m_done() on + * completion.. + * + * Example: + * auto done = []() { ... expression evaluation done ... }; + * C.device(thread_pool_device, std::move(done)) = A + B; */ template @@ -87,11 +87,11 @@ class TensorAsyncDevice { template EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) { typedef TensorAssignOp Assign; - typedef internal::TensorAsyncExecutor Executor; + typedef internal::TensorExecutor Executor; - // WARNING: After assignment 'm_done' callback will be in undefined state. Assign assign(m_expression, other); - Executor::runAsync(assign, m_device, std::move(m_done)); + Executor::run(assign, m_device); + m_done(); return *this; } @@ -102,7 +102,33 @@ class TensorAsyncDevice { DoneCallback m_done; }; -#endif // EIGEN_USE_THREADS + +#ifdef EIGEN_USE_THREADS +template +class TensorAsyncDevice { + public: + TensorAsyncDevice(const ThreadPoolDevice& device, ExpressionType& expression, + DoneCallback done) + : m_device(device), m_expression(expression), m_done(std::move(done)) {} + + template + EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) { + typedef TensorAssignOp Assign; + typedef internal::TensorAsyncExecutor Executor; + + // WARNING: After assignment 'm_done' callback will be in undefined state. + Assign assign(m_expression, other); + Executor::runAsync(assign, m_device, std::move(m_done)); + + return *this; + } + + protected: + const ThreadPoolDevice& m_device; + ExpressionType& m_expression; + DoneCallback m_done; +}; +#endif } // end namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h b/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h index 22fc64c1f..cd1338c66 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorEvalTo.h @@ -151,6 +151,16 @@ struct TensorEvaluator, Device> return m_impl.evalSubExprsIfNeeded(m_buffer); } +#ifdef EIGEN_USE_THREADS + template + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync( + EvaluatorPointerType scalar, EvalSubExprsCallback done) { + EIGEN_UNUSED_VARIABLE(scalar); + eigen_assert(scalar == NULL); + m_impl.evalSubExprsIfNeededAsync(m_buffer, std::move(done)); + } +#endif + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalScalar(Index i) { m_buffer[i] = m_impl.coeff(i); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index 11cec3d1c..f736c238f 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -102,7 +102,7 @@ class TensorExecutor { * available for ThreadPoolDevice (see definition below). */ template + bool Vectorizable, TiledEvaluation Tiling> class TensorAsyncExecutor {}; /** @@ -544,9 +544,9 @@ class TensorExecutor + TiledEvaluation Tiling> class TensorAsyncExecutor { + Vectorizable, Tiling> { public: typedef typename Expression::Index StorageIndex; typedef TensorEvaluator Evaluator; @@ -598,7 +598,7 @@ class TensorAsyncExecutor class TensorAsyncExecutor { + Vectorizable, /*Tileable*/ TiledEvaluation::Legacy> { public: typedef typename traits::Index StorageIndex; typedef typename traits::Scalar Scalar; @@ -607,7 +607,9 @@ class TensorAsyncExecutor::NumDimensions; typedef TensorEvaluator Evaluator; - typedef TensorBlockMapper BlockMapper; + typedef TensorBlockMapper + BlockMapper; typedef TensorExecutorTilingContext TilingContext; static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, @@ -624,7 +626,7 @@ class TensorAsyncExecutor::runAsync(expr, device, std::move(delete_ctx)); + /*Tileable*/ TiledEvaluation::Off>::runAsync(expr, device, std::move(delete_ctx)); return; } @@ -635,22 +637,102 @@ class TensorAsyncExecutortiling = - GetTensorExecutorTilingContext(device, ctx->evaluator); + GetTensorExecutorTilingContext( + device, ctx->evaluator); - device.parallelForAsync( - ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, - [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { - ScalarNoConst* thread_buf = - ctx->tiling.template GetCurrentThreadBuffer( - ctx->device); - for (StorageIndex i = firstIdx; i < lastIdx; ++i) { - auto block = - ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); - ctx->evaluator.evalBlock(&block); - } - }, - [ctx]() { delete ctx; }); + auto eval_block = [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + ctx->tiling.template GetCurrentThreadBuffer( + ctx->device); + for (StorageIndex i = firstIdx; i < lastIdx; ++i) { + auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); + ctx->evaluator.evalBlock(&block); + } + }; + device.parallelForAsync(ctx->tiling.block_mapper.total_block_count(), + ctx->tiling.cost, eval_block, + [ctx]() { delete ctx; }); + }; + + ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs); + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + DoneCallback done) + : device(thread_pool), + evaluator(expr, thread_pool), + on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + device.deallocate(tiling.buffer); + evaluator.cleanup(); + } + + const ThreadPoolDevice& device; + Evaluator evaluator; + TilingContext tiling; + + private: + DoneCallback on_done; + }; +}; + +template +class TensorAsyncExecutor { + public: + typedef typename traits::Index IndexType; + typedef typename traits::Scalar Scalar; + typedef typename remove_const::type ScalarNoConst; + + static const int NumDims = traits::NumDimensions; + + typedef TensorEvaluator Evaluator; + typedef TensorBlockMapper + BlockMapper; + typedef TensorExecutorTilingContext TilingContext; + + typedef internal::TensorBlockDescriptor TensorBlockDesc; + typedef internal::TensorBlockScratchAllocator + TensorBlockScratch; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + DoneCallback done) { + + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + + const auto on_eval_subexprs = [ctx](bool need_assign) -> void { + if (!need_assign) { + delete ctx; + return; + } + + ctx->tiling = + internal::GetTensorExecutorTilingContext( + ctx->device, ctx->evaluator, /*allocate_buffer=*/false); + + auto eval_block = [ctx](IndexType firstBlockIdx, IndexType lastBlockIdx) { + TensorBlockScratch scratch(ctx->device); + + for (IndexType block_idx = firstBlockIdx; block_idx < lastBlockIdx; + ++block_idx) { + auto block = + ctx->tiling.block_mapper.GetBlockForIndex(block_idx, nullptr); + TensorBlockDesc desc(block.first_coeff_index(), block.block_sizes()); + ctx->evaluator.evalBlockV2(desc, scratch); + scratch.reset(); + } + }; + ctx->device.parallelForAsync(ctx->tiling.block_mapper.total_block_count(), + ctx->tiling.cost, eval_block, [ctx]() { delete ctx; }); }; ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs); @@ -682,7 +764,6 @@ class TensorAsyncExecutor, Device> EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType) { const Index numValues = internal::array_prod(m_impl.dimensions()); m_buffer = m_device.get((CoeffReturnType*)m_device.allocate_temp(numValues * sizeof(CoeffReturnType))); - #ifndef EIGEN_USE_SYCL - // Should initialize the memory in case we're dealing with non POD types. - if (NumTraits::RequireInitialization) { - for (Index i = 0; i < numValues; ++i) { - new(m_buffer+i) CoeffReturnType(); - } - } - #endif typedef TensorEvalToOp< const typename internal::remove_const::type > EvalTo; EvalTo evalToTmp(m_device.get(m_buffer), m_op); @@ -151,6 +143,29 @@ struct TensorEvaluator, Device> return true; } + +#ifdef EIGEN_USE_THREADS + template + EIGEN_STRONG_INLINE EIGEN_DEVICE_FUNC void evalSubExprsIfNeededAsync( + EvaluatorPointerType, EvalSubExprsCallback done) { + const Index numValues = internal::array_prod(m_impl.dimensions()); + m_buffer = m_device.get((CoeffReturnType*)m_device.allocate_temp( + numValues * sizeof(CoeffReturnType))); + typedef TensorEvalToOp::type> + EvalTo; + EvalTo evalToTmp(m_device.get(m_buffer), m_op); + + auto on_done = std::bind([](EvalSubExprsCallback done) { done(true); }, + std::move(done)); + internal::TensorAsyncExecutor< + const EvalTo, typename internal::remove_const::type, + decltype(on_done), + /*Vectorizable=*/internal::IsVectorizable::value, + /*Tiling=*/internal::IsTileable::value>:: + runAsync(evalToTmp, m_device, std::move(on_done)); + } +#endif + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { m_device.deallocate_temp(m_buffer); m_buffer = NULL; diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h index 5549cbdb2..0da2d9e0d 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h @@ -185,12 +185,12 @@ template ::value> class TensorExecutor; -// TODO(ezhulenev): Add TiledEvaluation support to async executor. template ::value, - bool Tileable = IsTileable::BlockAccess> + TiledEvaluation Tiling = IsTileable::value> class TensorAsyncExecutor; + } // end namespace internal } // end namespace Eigen diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h b/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h index 606d49a20..781f1d75b 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorMorphing.h @@ -205,6 +205,14 @@ struct TensorEvaluator, Device> EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dimensions& dimensions() const { return m_dimensions; } +#ifdef EIGEN_USE_THREADS + template + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync( + EvaluatorPointerType data, EvalSubExprsCallback done) { + m_impl.evalSubExprsIfNeededAsync(data, std::move(done)); + } +#endif + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType data) { return m_impl.evalSubExprsIfNeeded(data); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h index a5c293cf9..d826cfb7e 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorReduction.h @@ -689,15 +689,14 @@ struct TensorReductionEvaluatorBase::HasOptimizedImplementation && @@ -802,6 +801,34 @@ struct TensorReductionEvaluatorBase + EIGEN_STRONG_INLINE +#if !defined(EIGEN_HIPCC) + EIGEN_DEVICE_FUNC +#endif + void + evalSubExprsIfNeededAsync(EvaluatorPointerType data, + EvalSubExprsCallback done) { + m_impl.evalSubExprsIfNeededAsync(NULL, [this, data, done](bool) { + done(evalSubExprsIfNeededCommon(data)); + }); + } +#endif + + EIGEN_STRONG_INLINE +#if !defined(EIGEN_HIPCC) + // Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same + // for all the functions being called within here, which then leads to + // proliferation of EIGEN_DEVICE_FUNC markings, one of which will eventually + // result in an NVCC error + EIGEN_DEVICE_FUNC +#endif + bool evalSubExprsIfNeeded(EvaluatorPointerType data) { + m_impl.evalSubExprsIfNeeded(NULL); + return evalSubExprsIfNeededCommon(data); + } + EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { m_impl.cleanup(); if (m_result) { diff --git a/unsupported/test/cxx11_tensor_executor.cpp b/unsupported/test/cxx11_tensor_executor.cpp index dd68ddf17..0e70e1770 100644 --- a/unsupported/test/cxx11_tensor_executor.cpp +++ b/unsupported/test/cxx11_tensor_executor.cpp @@ -604,11 +604,10 @@ static void test_async_execute_unary_expr(Device d) Eigen::Barrier done(1); auto on_done = [&done]() { done.Notify(); }; - static const bool TilingOn = Tiling == TiledEvaluation::Off ? false : true; using Assign = TensorAssignOp; using DoneCallback = decltype(on_done); using Executor = internal::TensorAsyncExecutor; + Vectorizable, Tiling>; Executor::runAsync(Assign(dst, expr), d, on_done); done.Wait(); @@ -641,11 +640,10 @@ static void test_async_execute_binary_expr(Device d) Eigen::Barrier done(1); auto on_done = [&done]() { done.Notify(); }; - static const bool TilingOn = Tiling == TiledEvaluation::Off ? false : true; using Assign = TensorAssignOp; using DoneCallback = decltype(on_done); using Executor = internal::TensorAsyncExecutor; + Vectorizable, Tiling>; Executor::runAsync(Assign(dst, expr), d, on_done); done.Wait();