diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool index d046af9b2..613fdb57a 100644 --- a/unsupported/Eigen/CXX11/ThreadPool +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -43,7 +43,7 @@ #include #include #include -#include +#include #include "src/util/CXX11Meta.h" #include "src/util/MaxSizeVector.h" diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h index dbacf494e..095c85dc4 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBase.h @@ -1063,6 +1063,17 @@ class TensorBase : public TensorBase { return TensorDevice(dev, derived()); } +#ifdef EIGEN_USE_THREADS + // Select the async device on which to evaluate the expression. + template + typename internal::enable_if< + internal::is_same::value, + TensorAsyncDevice>::type + device(const DeviceType& dev, std::function done) { + return TensorAsyncDevice(dev, derived(), std::move(done)); + } +#endif // EIGEN_USE_THREADS + protected: EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Derived& derived() { return *static_cast(this); } diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h b/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h index 49fb21dc8..c8a8b16db 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorBlock.h @@ -932,6 +932,7 @@ class TensorBlockMapper { typedef TensorBlock Block; typedef DSizes Dimensions; + TensorBlockMapper() {} TensorBlockMapper(const Dimensions& dims, const TensorBlockShapeType block_shape, Index min_target_size) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h index 29e50a3b2..5122b3623 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDevice.h @@ -63,6 +63,47 @@ template class TensorDevice { ExpressionType& m_expression; }; +#ifdef EIGEN_USE_THREADS + +/** \class TensorAsyncDevice + * \ingroup CXX11_Tensor_Module + * + * \brief Pseudo expression providing an operator = that will evaluate its + * argument asynchronously on the specified device (currently supports only + * ThreadPoolDevice). + * + * Example: + * std::function done = []() {}; + * C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B; + */ + +template +class TensorAsyncDevice { + public: + TensorAsyncDevice(const DeviceType& device, ExpressionType& expression, + std::function done) + : m_device(device), m_expression(expression), m_done(std::move(done)) {} + + template + EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) { + typedef TensorAssignOp Assign; + typedef internal::TensorAsyncExecutor Executor; + + // WARNING: After assignment 'm_done' callback will be in undefined state. + Assign assign(m_expression, other); + Executor::runAsync(assign, m_device, std::move(m_done)); + + return *this; + } + + protected: + const DeviceType& m_device; + ExpressionType& m_expression; + std::function m_done; +}; + +#endif // EIGEN_USE_THREADS + } // end namespace Eigen #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_H diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h index fddb90d77..18d9de9e6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorExecutor.h @@ -84,7 +84,7 @@ class TensorExecutor { EIGEN_DEVICE_FUNC static EIGEN_STRONG_INLINE void run(const Expression& expr, - const Device& device = Device()) { + const Device& device = Device()) { TensorEvaluator evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -97,6 +97,14 @@ class TensorExecutor { } }; +/** + * Default async execution strategy is not implemented. Currently it's only + * available for ThreadPoolDevice (see definition below). + */ +template +class TensorAsyncExecutor {}; + /** * Process all the data with a single cpu thread, using vectorized instructions. */ @@ -107,8 +115,8 @@ class TensorExecutor evaluator(expr, device); const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); if (needs_assign) { @@ -206,8 +214,81 @@ class TensorExecutor +struct TensorExecutorTilingContext { + typedef typename TensorBlockMapper::Block TensorBlock; + + TensorExecutorTilingContext() : buffer(nullptr) {} + TensorExecutorTilingContext(const TensorBlockMapper& b_mapper, + const TensorOpCost& b_cost, void* b_buffer, + size_t b_aligned_size) + : block_mapper(b_mapper), + cost(b_cost), + buffer(b_buffer), + aligned_blocksize(b_aligned_size) {} + + template + Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const { + // ThreadPoolDevice::currentThreadId() returns -1 if called from a thread + // not in the thread pool, such as the main thread dispatching Eigen + // expressions. + const int thread_idx = device.currentThreadId(); + eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads()); + + const Index offset = aligned_blocksize * (thread_idx + 1); + return reinterpret_cast(static_cast(buffer) + offset); + } + + TensorBlockMapper block_mapper; // navigate through blocks + TensorOpCost cost; // cost of computing a single block + void* buffer; // temporary buffer for blocks + size_t aligned_blocksize; // block size after memory alignment +}; + +// Computes a block evaluation parameters, and allocates temporary memory buffer +// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below. +template +TensorExecutorTilingContext GetTensorExecutorTilingContext( + const ThreadPoolDevice& device, const Evaluator& evaluator) { + // Prefer blocks skewed toward inner dimension. + TensorBlockShapeType block_shape = kSkewedInnerDims; + Index block_total_size = 0; + + // Query expression tree for desired block size/shape. + std::vector resources; + evaluator.getResourceRequirements(&resources); + MergeResourceRequirements(resources, &block_shape, &block_total_size); + int num_threads = device.numThreads(); + + // Estimate minimum block size based on cost. + TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); + double taskSize = TensorCostModel::taskSize(1, cost); + size_t block_size = static_cast(1.0 / taskSize); + + TensorBlockMapper block_mapper( + typename TensorBlockMapper::Dimensions(evaluator.dimensions()), + block_shape, block_size); + + block_size = block_mapper.block_dims_total_size(); + const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); + const size_t aligned_blocksize = + align * + divup(block_size * sizeof(typename Evaluator::Scalar), align); + void* buf = device.allocate((num_threads + 1) * aligned_blocksize); + + return {block_mapper, cost * block_size, buf, aligned_blocksize}; +} + template struct EvalRange { static void run(Evaluator* evaluator_in, const StorageIndex firstIdx, @@ -274,7 +355,7 @@ class TensorExecutor { typedef EvalRange EvalRange; Evaluator evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const StorageIndex size = array_prod(evaluator.dimensions()); device.parallelFor(size, evaluator.costPerCoeff(Vectorizable), @@ -290,18 +371,18 @@ class TensorExecutor { template class TensorExecutor { public: + typedef typename traits::Index StorageIndex; typedef typename traits::Scalar Scalar; typedef typename remove_const::type ScalarNoConst; - typedef TensorEvaluator Evaluator; - typedef typename traits::Index StorageIndex; - static const int NumDims = traits::NumDimensions; + typedef TensorEvaluator Evaluator; + typedef TensorBlockMapper BlockMapper; + typedef TensorExecutorTilingContext TilingContext; + static EIGEN_STRONG_INLINE void run(const Expression& expr, const ThreadPoolDevice& device) { - typedef TensorBlockMapper TensorBlockMapper; - Evaluator evaluator(expr, device); Index total_size = array_prod(evaluator.dimensions()); Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); @@ -315,50 +396,152 @@ class TensorExecutor resources; - evaluator.getResourceRequirements(&resources); - MergeResourceRequirements(resources, &block_shape, &block_total_size); - int num_threads = device.numThreads(); + const TilingContext tiling = + internal::GetTensorExecutorTilingContext(device, evaluator); - // Estimate minimum block size based on cost. - TensorOpCost cost = evaluator.costPerCoeff(Vectorizable); - double taskSize = TensorCostModel::taskSize(1, cost); - size_t block_size = static_cast(1.0 / taskSize); - TensorBlockMapper block_mapper( - typename TensorBlockMapper::Dimensions(evaluator.dimensions()), - block_shape, block_size); - block_size = block_mapper.block_dims_total_size(); - const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1); - const size_t aligned_blocksize = - align * divup(block_size * sizeof(Scalar), align); - void* buf = device.allocate((num_threads + 1) * aligned_blocksize); device.parallelFor( - block_mapper.total_block_count(), cost * block_size, - [=, &device, &evaluator, &block_mapper](StorageIndex firstIdx, - StorageIndex lastIdx) { - // currentThreadId() returns -1 if called from a thread not in the - // thread pool, such as the main thread dispatching Eigen - // expressions. - const int thread_idx = device.currentThreadId(); - eigen_assert(thread_idx >= -1 && thread_idx < num_threads); - ScalarNoConst* thread_buf = reinterpret_cast( - static_cast(buf) + aligned_blocksize * (thread_idx + 1)); + tiling.block_mapper.total_block_count(), tiling.cost, + [=, &device, &evaluator, &tiling](StorageIndex firstIdx, + StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + tiling.template GetCurrentThreadBuffer(device); for (StorageIndex i = firstIdx; i < lastIdx; ++i) { - auto block = block_mapper.GetBlockForIndex(i, thread_buf); + auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf); evaluator.evalBlock(&block); } }); - device.deallocate(buf); + device.deallocate(tiling.buffer); } evaluator.cleanup(); } }; +template +class TensorAsyncExecutor { + public: + typedef typename Expression::Index StorageIndex; + typedef TensorEvaluator Evaluator; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + typedef EvalRange EvalRange; + + if (needs_assign) { + const StorageIndex size = array_prod(ctx->evaluator.dimensions()); + device.parallelForAsync( + size, ctx->evaluator.costPerCoeff(Vectorizable), + EvalRange::alignBlockSize, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + EvalRange::run(&ctx->evaluator, firstIdx, lastIdx); + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function done) + : evaluator(expr, thread_pool), on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + evaluator.cleanup(); + } + + Evaluator evaluator; + + private: + std::function on_done; + }; +}; + +template +class TensorAsyncExecutor { + public: + typedef typename traits::Index StorageIndex; + typedef typename traits::Scalar Scalar; + typedef typename remove_const::type ScalarNoConst; + + static const int NumDims = traits::NumDimensions; + + typedef TensorEvaluator Evaluator; + typedef TensorBlockMapper BlockMapper; + typedef TensorExecutorTilingContext TilingContext; + + static EIGEN_STRONG_INLINE void runAsync(const Expression& expr, + const ThreadPoolDevice& device, + std::function done) { + TensorAsyncExecutorContext* const ctx = + new TensorAsyncExecutorContext(expr, device, std::move(done)); + + Index total_size = array_prod(ctx->evaluator.dimensions()); + Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar); + + if (total_size < cache_size && + !ExpressionHasTensorBroadcastingOp::value) { + internal::TensorAsyncExecutor::runAsync( + expr, device, [ctx]() { delete ctx; }); + return; + } + + // TODO(ezhulenev): This is a potentially blocking operation. Make it async! + const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); + + if (needs_assign) { + ctx->tiling = + internal::GetTensorExecutorTilingContext(device, ctx->evaluator); + + device.parallelForAsync( + ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, + [ctx](StorageIndex firstIdx, StorageIndex lastIdx) { + ScalarNoConst* thread_buf = + ctx->tiling.template GetCurrentThreadBuffer(ctx->device); + for (StorageIndex i = firstIdx; i < lastIdx; ++i) { + auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); + ctx->evaluator.evalBlock(&block); + } + }, + [ctx]() { delete ctx; }); + } + } + + private: + struct TensorAsyncExecutorContext { + TensorAsyncExecutorContext(const Expression& expr, + const ThreadPoolDevice& thread_pool, + std::function done) + : device(thread_pool), + evaluator(expr, thread_pool), + on_done(std::move(done)) {} + + ~TensorAsyncExecutorContext() { + on_done(); + device.deallocate(tiling.buffer); + evaluator.cleanup(); + } + + const ThreadPoolDevice& device; + Evaluator evaluator; + TilingContext tiling; + + private: + std::function on_done; + }; +}; + #endif // EIGEN_USE_THREADS @@ -419,7 +602,7 @@ template EIGEN_STRONG_INLINE void TensorExecutor::run( const Expression& expr, const GpuDevice& device) { TensorEvaluator evaluator(expr, device); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { const int block_size = device.maxGpuThreadsPerBlock(); @@ -517,10 +700,10 @@ struct ExecExprFunctorKernel template class TensorExecutor { public: - typedef typename Expression::Index Index; + typedef typename Expression::Index Index; static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) { Eigen::TensorEvaluator evaluator(expr, dev); - const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL); + const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr); if (needs_assign) { Index range, GRange, tileSize; Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions()); diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h index 3cca0c7e9..e823bd932 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorForwardDeclarations.h @@ -94,6 +94,7 @@ template class MakePointer_ = MakePointer> cl template class TensorForcedEvalOp; template class TensorDevice; +template class TensorAsyncDevice; template struct TensorEvaluator; struct NoOpOutputKernel; @@ -167,6 +168,11 @@ template ::value> class TensorExecutor; +template ::value, + bool Tileable = IsTileable::value> +class TensorAsyncExecutor; + } // end namespace internal } // end namespace Eigen diff --git a/unsupported/test/cxx11_tensor_executor.cpp b/unsupported/test/cxx11_tensor_executor.cpp index e9922a48d..f4d0401da 100644 --- a/unsupported/test/cxx11_tensor_executor.cpp +++ b/unsupported/test/cxx11_tensor_executor.cpp @@ -562,37 +562,112 @@ static void test_execute_reverse_rvalue(Device d) } } +template +static void test_async_execute_unary_expr(Device d) +{ + static constexpr int Options = 0 | Layout; + + // Pick a large enough tensor size to bypass small tensor block evaluation + // optimization. + auto dims = RandomDims(50 / NumDims, 100 / NumDims); + + Tensor src(dims); + Tensor dst(dims); + + src.setRandom(); + const auto expr = src.square(); + + using Assign = TensorAssignOp; + using Executor = internal::TensorAsyncExecutor; + Eigen::Barrier done(1); + Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); }); + done.Wait(); + + for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) { + T square = src.coeff(i) * src.coeff(i); + VERIFY_IS_EQUAL(square, dst.coeff(i)); + } +} + +template +static void test_async_execute_binary_expr(Device d) +{ + static constexpr int Options = 0 | Layout; + + // Pick a large enough tensor size to bypass small tensor block evaluation + // optimization. + auto dims = RandomDims(50 / NumDims, 100 / NumDims); + + Tensor lhs(dims); + Tensor rhs(dims); + Tensor dst(dims); + + lhs.setRandom(); + rhs.setRandom(); + + const auto expr = lhs + rhs; + + using Assign = TensorAssignOp; + using Executor = internal::TensorAsyncExecutor; + + Eigen::Barrier done(1); + Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); }); + done.Wait(); + + for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) { + T sum = lhs.coeff(i) + rhs.coeff(i); + VERIFY_IS_EQUAL(sum, dst.coeff(i)); + } +} + #ifdef EIGEN_DONT_VECTORIZE #define VECTORIZABLE(VAL) !EIGEN_DONT_VECTORIZE && VAL -#else +#else #define VECTORIZABLE(VAL) VAL #endif #define CALL_SUBTEST_PART(PART) \ CALL_SUBTEST_##PART -#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \ - CALL_SUBTEST_PART(PART)((NAME(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME(default_device))); \ +#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \ + CALL_SUBTEST_PART(PART)((NAME(default_device))); \ + CALL_SUBTEST_PART(PART)((NAME(default_device))); \ CALL_SUBTEST_PART(PART)((NAME(default_device))); \ CALL_SUBTEST_PART(PART)((NAME(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME(default_device))); \ + CALL_SUBTEST_PART(PART)((NAME(default_device))); \ + CALL_SUBTEST_PART(PART)((NAME(default_device))); \ CALL_SUBTEST_PART(PART)((NAME(default_device))); \ CALL_SUBTEST_PART(PART)((NAME(default_device))); \ - CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ - CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ - CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ - CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ CALL_SUBTEST_PART(PART)((NAME(tp_device))) +// NOTE: Currently only ThreadPoolDevice supports async expression evaluation. +#define CALL_ASYNC_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))); \ + CALL_SUBTEST_PART(PART)((NAME(tp_device))) + EIGEN_DECLARE_TEST(cxx11_tensor_executor) { Eigen::DefaultDevice default_device; + // Default device is unused in ASYNC tests. + EIGEN_UNUSED_VARIABLE(default_device); - const auto num_threads = internal::random(1, 24); + const auto num_threads = internal::random(20, 24); Eigen::ThreadPool tp(num_threads); Eigen::ThreadPoolDevice tp_device(&tp, num_threads); @@ -660,8 +735,16 @@ EIGEN_DECLARE_TEST(cxx11_tensor_executor) { CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 4); CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 5); + CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 3); + CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 4); + CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 5); + + CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 3); + CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 4); + CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 5); + // Force CMake to split this test. - // EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14 + // EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14;15;16 } #undef CALL_SUBTEST_COMBINATIONS diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp index f8a7b3662..53b50d1ed 100644 --- a/unsupported/test/cxx11_tensor_thread_pool.cpp +++ b/unsupported/test/cxx11_tensor_thread_pool.cpp @@ -38,9 +38,9 @@ class TestAllocator : public Allocator { void test_multithread_elementwise() { - Tensor in1(2,3,7); - Tensor in2(2,3,7); - Tensor out(2,3,7); + Tensor in1(200, 30, 70); + Tensor in2(200, 30, 70); + Tensor out(200, 30, 70); in1.setRandom(); in2.setRandom(); @@ -49,15 +49,39 @@ void test_multithread_elementwise() Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random(3, 11)); out.device(thread_pool_device) = in1 + in2 * 3.14f; - for (int i = 0; i < 2; ++i) { - for (int j = 0; j < 3; ++j) { - for (int k = 0; k < 7; ++k) { - VERIFY_IS_APPROX(out(i,j,k), in1(i,j,k) + in2(i,j,k) * 3.14f); + for (int i = 0; i < 200; ++i) { + for (int j = 0; j < 30; ++j) { + for (int k = 0; k < 70; ++k) { + VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f); } } } } +void test_async_multithread_elementwise() +{ + Tensor in1(200, 30, 70); + Tensor in2(200, 30, 70); + Tensor out(200, 30, 70); + + in1.setRandom(); + in2.setRandom(); + + Eigen::ThreadPool tp(internal::random(3, 11)); + Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random(3, 11)); + + Eigen::Barrier b(1); + out.device(thread_pool_device, [&b]() { b.Notify(); }) = in1 + in2 * 3.14f; + b.Wait(); + + for (int i = 0; i < 200; ++i) { + for (int j = 0; j < 30; ++j) { + for (int k = 0; k < 70; ++k) { + VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f); + } + } + } +} void test_multithread_compound_assignment() { @@ -516,6 +540,7 @@ void test_threadpool_allocate(TestAllocator* allocator) EIGEN_DECLARE_TEST(cxx11_tensor_thread_pool) { CALL_SUBTEST_1(test_multithread_elementwise()); + CALL_SUBTEST_1(test_async_multithread_elementwise()); CALL_SUBTEST_1(test_multithread_compound_assignment()); CALL_SUBTEST_2(test_multithread_contraction());