diff --git a/Eigen/Core b/Eigen/Core index 93d47890a..700d81540 100644 --- a/Eigen/Core +++ b/Eigen/Core @@ -349,6 +349,9 @@ using std::ptrdiff_t; #include "src/Core/TriangularMatrix.h" #include "src/Core/SelfAdjointView.h" #include "src/Core/products/GeneralBlockPanelKernel.h" +#ifdef EIGEN_GEMM_THREADPOOL +#include "ThreadPool" +#endif #include "src/Core/products/Parallelizer.h" #include "src/Core/ProductEvaluators.h" #include "src/Core/products/GeneralMatrixVector.h" diff --git a/Eigen/src/Core/products/GeneralMatrixMatrix.h b/Eigen/src/Core/products/GeneralMatrixMatrix.h index e8bb8218c..3e7784d10 100644 --- a/Eigen/src/Core/products/GeneralMatrixMatrix.h +++ b/Eigen/src/Core/products/GeneralMatrixMatrix.h @@ -84,12 +84,12 @@ static void run(Index rows, Index cols, Index depth, gemm_pack_rhs pack_rhs; gebp_kernel gebp; -#ifdef EIGEN_HAS_OPENMP +#if defined(EIGEN_HAS_OPENMP) || defined(EIGEN_GEMM_THREADPOOL) if(info) { // this is the parallel version! - int tid = omp_get_thread_num(); - int threads = omp_get_num_threads(); + int tid = info->logical_thread_id; + int threads = info->num_threads; LhsScalar* blockA = blocking.blockA(); eigen_internal_assert(blockA!=0); @@ -110,15 +110,15 @@ static void run(Index rows, Index cols, Index depth, // each thread packs the sub block A_k,i to A'_i where i is the thread id. // However, before copying to A'_i, we have to make sure that no other thread is still using it, - // i.e., we test that info[tid].users equals 0. - // Then, we set info[tid].users to the number of threads to mark that all other threads are going to use it. - while(info[tid].users!=0) {} - info[tid].users = threads; + // i.e., we test that info->task_info[tid].users equals 0. + // Then, we set info->task_info[tid].users to the number of threads to mark that all other threads are going to use it. + while(info->task_info[tid].users!=0) {} + info->task_info[tid].users = threads; - pack_lhs(blockA+info[tid].lhs_start*actual_kc, lhs.getSubMapper(info[tid].lhs_start,k), actual_kc, info[tid].lhs_length); + pack_lhs(blockA+info->task_info[tid].lhs_start*actual_kc, lhs.getSubMapper(info->task_info[tid].lhs_start,k), actual_kc, info->task_info[tid].lhs_length); // Notify the other threads that the part A'_i is ready to go. - info[tid].sync = k; + info->task_info[tid].sync = k; // Computes C_i += A' * B' per A'_i for(int shift=0; shift0) { - while(info[i].sync!=k) { - } + while(info->task_info[i].sync!=k) {} } - gebp(res.getSubMapper(info[i].lhs_start, 0), blockA+info[i].lhs_start*actual_kc, blockB, info[i].lhs_length, actual_kc, nc, alpha); + gebp(res.getSubMapper(info->task_info[i].lhs_start, 0), blockA+info->task_info[i].lhs_start*actual_kc, blockB, info->task_info[i].lhs_length, actual_kc, nc, alpha); } // Then keep going as usual with the remaining B' @@ -151,11 +150,11 @@ static void run(Index rows, Index cols, Index depth, // Release all the sub blocks A'_i of A' for the current thread, // i.e., we simply decrement the number of users by 1 for(Index i=0; itask_info[i].users -= 1; } } else -#endif // EIGEN_HAS_OPENMP +#endif // defined(EIGEN_HAS_OPENMP) || defined(EIGEN_GEMM_THREADPOOL) { EIGEN_UNUSED_VARIABLE(info); diff --git a/Eigen/src/Core/products/Parallelizer.h b/Eigen/src/Core/products/Parallelizer.h index 3d9cb5fb3..7dc92aa15 100644 --- a/Eigen/src/Core/products/Parallelizer.h +++ b/Eigen/src/Core/products/Parallelizer.h @@ -13,49 +13,41 @@ // IWYU pragma: private #include "../InternalHeaderCheck.h" +// Note that in the following, there are 3 different uses of the concept +// "number of threads": +// 1. Max number of threads used by OpenMP or ThreadPool. +// * For OpenMP this is typically the value set by the OMP_NUM_THREADS +// environment variable, or by a call to omp_set_num_threads() prior to +// calling Eigen. +// * For ThreadPool, this is the number of threads in the ThreadPool. +// 2. Max number of threads currently allowed to be used by parallel Eigen +// operations. This is set by setNbThreads(), and cannot exceed the value +// in 1. +// 3. The actual number of threads used for a given parallel Eigen operation. +// This is typically computed on the fly using a cost model and cannot exceed +// the value in 2. +// * For OpenMP, this is typically the number of threads specified in individual +// "omp parallel" pragmas associated with an Eigen operation. +// * For ThreadPool, it is the number of concurrent tasks scheduled in the +// threadpool for a given Eigen operation. Notice that since the threadpool +// uses task stealing, there is no way to limit the number of concurrently +// executing tasks to below the number in 1. except by limiting the total +// number of tasks in flight. + +#if defined(EIGEN_HAS_OPENMP) && defined(EIGEN_GEMM_THREADPOOL) +#error "EIGEN_HAS_OPENMP and EIGEN_GEMM_THREADPOOL may not both be defined." +#endif + namespace Eigen { namespace internal { - -/** \internal */ -inline void manage_multi_threading(Action action, int* v) -{ - static int m_maxThreads = -1; - EIGEN_UNUSED_VARIABLE(m_maxThreads) - - if(action==SetAction) - { - eigen_internal_assert(v!=0); - m_maxThreads = *v; - } - else if(action==GetAction) - { - eigen_internal_assert(v!=0); - #ifdef EIGEN_HAS_OPENMP - if(m_maxThreads>0) - *v = m_maxThreads; - else - *v = omp_get_max_threads(); - #else - *v = 1; - #endif - } - else - { - eigen_internal_assert(false); - } + inline void manage_multi_threading(Action action, int* v); } -} +// Public APIs. /** Must be call first when calling Eigen from multiple threads */ -inline void initParallel() -{ - int nbt; - internal::manage_multi_threading(GetAction, &nbt); - std::ptrdiff_t l1, l2, l3; - internal::manage_caching_sizes(GetAction, &l1, &l2, &l3); -} +EIGEN_DEPRECATED inline void initParallel() {} /** \returns the max number of threads reserved for Eigen * \sa setNbThreads */ @@ -73,42 +65,102 @@ inline void setNbThreads(int v) internal::manage_multi_threading(SetAction, &v); } -namespace internal { +#ifdef EIGEN_GEMM_THREADPOOL +// Sets the ThreadPool used by Eigen parallel Gemm. +// +// NOTICE: This function has a known race condition with +// parallelize_gemm below, and should not be called while +// an instance of that function is running. +// +// TODO(rmlarsen): Make the device API available instead of +// storing a local static pointer variable to avoid this issue. +inline ThreadPool* setGemmThreadPool(ThreadPool* new_pool) { + static ThreadPool* pool; + if (new_pool != nullptr) { + // This will wait for work in all threads in *pool to finish, + // then destroy the old ThreadPool, and then replace it with new_pool. + pool = new_pool; + // Reset the number of threads to the number of threads on the new pool. + setNbThreads(pool->NumThreads()); + } + return pool; +} -template struct GemmParallelInfo -{ - -#ifdef EIGEN_HAS_OPENMP - GemmParallelInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {} - std::atomic sync; - std::atomic users; -#else - GemmParallelInfo() : lhs_start(0), lhs_length(0) {} +// Gets the ThreadPool used by Eigen parallel Gemm. +inline ThreadPool* getGemmThreadPool() { + return setGemmThreadPool(nullptr); +} #endif + +namespace internal { + +// Implementation. + +#if defined(EIGEN_USE_BLAS) || (!defined(EIGEN_HAS_OPENMP) && !defined(EIGEN_GEMM_THREADPOOL)) + +inline void manage_multi_threading(Action /*unused*/, int* /*unused*/) {} +template struct GemmParallelInfo {}; +template +EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, + Index /*unused*/, bool /*unused*/) { + func(0,rows, 0,cols); +} + +#else + +template struct GemmParallelTaskInfo { + GemmParallelTaskInfo() : sync(-1), users(0), lhs_start(0), lhs_length(0) {} + std::atomic sync; + std::atomic users; Index lhs_start; Index lhs_length; }; -template -void parallelize_gemm(const Functor& func, Index rows, Index cols, Index depth, bool transpose) -{ - // TODO when EIGEN_USE_BLAS is defined, - // we should still enable OMP for other scalar types - // Without C++11, we have to disable GEMM's parallelization on - // non x86 architectures because there volatile is not enough for our purpose. - // See bug 1572. -#if (! defined(EIGEN_HAS_OPENMP)) || defined(EIGEN_USE_BLAS) - // FIXME the transpose variable is only needed to properly split - // the matrix product when multithreading is enabled. This is a temporary - // fix to support row-major destination matrices. This whole - // parallelizer mechanism has to be redesigned anyway. - EIGEN_UNUSED_VARIABLE(depth); - EIGEN_UNUSED_VARIABLE(transpose); - func(0,rows, 0,cols); -#else +template struct GemmParallelInfo { + const int logical_thread_id; + const int num_threads; + GemmParallelTaskInfo* task_info; - // Dynamically check whether we should enable or disable OpenMP. + GemmParallelInfo(int logical_thread_id_, int num_threads_, + GemmParallelTaskInfo* task_info_) + : logical_thread_id(logical_thread_id_), + num_threads(num_threads_), + task_info(task_info_) {} +}; + +inline void manage_multi_threading(Action action, int* v) { + static int m_maxThreads = -1; + if (action == SetAction) { + eigen_internal_assert(v != nullptr); +#if defined(EIGEN_HAS_OPENMP) + // Calling action == SetAction and *v = 0 means + // restoring m_maxThreads to the maximum number of threads specified + // for OpenMP. + eigen_internal_assert(*v >= 0); + int omp_threads = omp_get_max_threads(); + m_maxThreads = (*v == 0 ? omp_threads : std::min(*v, omp_threads)); +#elif defined(EIGEN_GEMM_THREADPOOL) + // Calling action == SetAction and *v = 0 means + // restoring m_maxThreads to the number of threads in the ThreadPool, + // which defaults to 1 if no pool was provided. + eigen_internal_assert(*v >= 0); + ThreadPool* pool = getGemmThreadPool(); + int pool_threads = pool != nullptr ? pool->NumThreads() : 1; + m_maxThreads = (*v == 0 ? pool_threads : numext::mini(pool_threads, *v)); +#endif + } else if (action == GetAction) { + eigen_internal_assert(*v != nullptr); + *v = m_maxThreads; + } else { + eigen_internal_assert(false); + } +} + +template +EIGEN_STRONG_INLINE void parallelize_gemm(const Functor& func, Index rows, Index cols, + Index depth, bool transpose) { + // Dynamically check whether we should even try to execute in parallel. // The conditions are: // - the max number of threads we can create is greater than 1 // - we are not already in a parallel code @@ -126,27 +178,41 @@ void parallelize_gemm(const Functor& func, Index rows, Index cols, Index depth, pb_max_threads = std::max(1, std::min(pb_max_threads, static_cast( work / kMinTaskSize ) )); // compute the number of threads we are going to use - Index threads = std::min(nbThreads(), pb_max_threads); + int threads = std::min(nbThreads(), pb_max_threads); - // if multi-threading is explicitly disabled, not useful, or if we already are in a parallel session, - // then abort multi-threading - // FIXME omp_get_num_threads()>1 only works for openmp, what if the user does not use openmp? - if((!Condition) || (threads==1) || (omp_get_num_threads()>1)) + // if multi-threading is explicitly disabled, not useful, or if we already are + // inside a parallel session, then abort multi-threading + bool dont_parallelize = (!Condition) || (threads<=1); +#if defined(EIGEN_HAS_OPENMP) + // don't parallelize if we are executing in a parallel context already. + dont_parallelize |= omp_get_num_threads() > 1; +#elif defined(EIGEN_GEMM_THREADPOOL) + // don't parallelize if we have a trivial threadpool or the current thread id + // is != -1, indicating that we are already executing on a thread inside the pool. + // In other words, we do not allow nested parallelism, since this would lead to + // deadlocks due to the workstealing nature of the threadpool. + ThreadPool* pool = getGemmThreadPool(); + dont_parallelize |= (pool == nullptr || pool->CurrentThreadId() != -1); +#endif + if (dont_parallelize) return func(0,rows, 0,cols); - Eigen::initParallel(); func.initParallelSession(threads); if(transpose) std::swap(rows,cols); - ei_declare_aligned_stack_constructed_variable(GemmParallelInfo,info,threads,0); + ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo,task_info,threads,0); + +#if defined(EIGEN_HAS_OPENMP) #pragma omp parallel num_threads(threads) { Index i = omp_get_thread_num(); - // Note that the actual number of threads might be lower than the number of request ones. + // Note that the actual number of threads might be lower than the number of + // requested ones Index actual_threads = omp_get_num_threads(); + GemmParallelInfo info(i, actual_threads, task_info); Index blockCols = (cols / actual_threads) & ~Index(0x3); Index blockRows = (rows / actual_threads); @@ -158,17 +224,52 @@ void parallelize_gemm(const Functor& func, Index rows, Index cols, Index depth, Index c0 = i*blockCols; Index actualBlockCols = (i+1==actual_threads) ? cols-c0 : blockCols; - info[i].lhs_start = r0; - info[i].lhs_length = actualBlockRows; + info.task_info[i].lhs_start = r0; + info.task_info[i].lhs_length = actualBlockRows; - if(transpose) func(c0, actualBlockCols, 0, rows, info); - else func(0, rows, c0, actualBlockCols, info); + if(transpose) func(c0, actualBlockCols, 0, rows, &info); + else func(0, rows, c0, actualBlockCols, &info); } + +#elif defined(EIGEN_GEMM_THREADPOOL) + ei_declare_aligned_stack_constructed_variable(GemmParallelTaskInfo,meta_info,threads,0); + Barrier barrier(threads); + auto task = [=, &func, &barrier, &task_info](int i) + { + Index actual_threads = threads; + GemmParallelInfo info(i, actual_threads, task_info); + Index blockCols = (cols / actual_threads) & ~Index(0x3); + Index blockRows = (rows / actual_threads); + blockRows = (blockRows/Functor::Traits::mr)*Functor::Traits::mr; + + Index r0 = i*blockRows; + Index actualBlockRows = (i+1==actual_threads) ? rows-r0 : blockRows; + + Index c0 = i*blockCols; + Index actualBlockCols = (i+1==actual_threads) ? cols-c0 : blockCols; + + info.task_info[i].lhs_start = r0; + info.task_info[i].lhs_length = actualBlockRows; + + if(transpose) func(c0, actualBlockCols, 0, rows, &info); + else func(0, rows, c0, actualBlockCols, &info); + + barrier.Notify(); + }; + // Notice that we do not schedule more than "threads" tasks, which allows us to + // limit number of running threads, even if the threadpool itself was constructed + // with a larger number of threads. + for (int i=0; i < threads - 1; ++i) { + pool->Schedule([=, task = std::move(task)] { task(i); }); + } + task(threads - 1); + barrier.Wait(); #endif } -} // end namespace internal +#endif +} // end namespace internal } // end namespace Eigen #endif // EIGEN_PARALLELIZER_H diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e0b2c834b..317a6ba1b 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -234,6 +234,7 @@ ei_add_test(product_trmm) ei_add_test(product_trsolve) ei_add_test(product_mmtr) ei_add_test(product_notemporary) +ei_add_test(product_threaded) ei_add_test(stable_norm) ei_add_test(permutationmatrices) ei_add_test(bandmatrix) diff --git a/test/product_threaded.cpp b/test/product_threaded.cpp new file mode 100644 index 000000000..95dad7977 --- /dev/null +++ b/test/product_threaded.cpp @@ -0,0 +1,33 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2023 Rasmus Munk Larsen +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#define EIGEN_GEMM_THREADPOOL +#include "main.h" + +void test_parallelize_gemm() { + constexpr int n = 1024; + constexpr int num_threads = 4; + MatrixXf a(n,n); + MatrixXf b(n,n); + MatrixXf c(n,n); + c.noalias() = a*b; + + ThreadPool pool(num_threads); + MatrixXf c_threaded(n,n); + c_threaded.noalias() = a*b; + + VERIFY_IS_APPROX(c, c_threaded); +} + + + +EIGEN_DECLARE_TEST(product_threaded) +{ + CALL_SUBTEST(test_parallelize_gemm()); +}