From b3e7c9132d41da75c0a6af783300cb11101010db Mon Sep 17 00:00:00 2001 From: Paul Tucker Date: Mon, 16 Jul 2018 17:26:05 -0700 Subject: [PATCH 1/4] Add optional Allocator argument to ThreadPoolDevice constructor. When supplied, this allocator will be used in place of internal::aligned_malloc. This permits e.g. use of a NUMA-node specific allocator where the thread-pool is also restricted a single NUMA-node. --- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index 90fd99027..be397e1b6 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -95,14 +95,20 @@ static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { // Build a thread pool device on top the an existing pool of threads. struct ThreadPoolDevice { // The ownership of the thread pool remains with the caller. - ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { } + ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) + : pool_(pool), num_threads_(num_cores), allocator_(nullptr) { } EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { - return internal::aligned_malloc(num_bytes); + return allocator_ ? allocator_->allocate(num_bytes) + : internal::aligned_malloc(num_bytes); } EIGEN_STRONG_INLINE void deallocate(void* buffer) const { - internal::aligned_free(buffer); + if (allocator_) { + allocator_->deallocate(buffer); + } else { + internal::aligned_free(buffer); + } } EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const { @@ -267,9 +273,13 @@ struct ThreadPoolDevice { // Thread pool accessor. ThreadPoolInterface* getPool() const { return pool_; } + // Allocator accessor. + Allocator* getAllocator() const { return allocator_; } + private: ThreadPoolInterface* pool_; int num_threads_; + Allocator* allocator_; }; From 4e9848fa8600be69dfb51405606eafa1dba8d0bf Mon Sep 17 00:00:00 2001 From: Paul Tucker Date: Mon, 16 Jul 2018 17:53:36 -0700 Subject: [PATCH 2/4] Actually add optional Allocator* arg to ThreadPoolDevice(). --- unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index be397e1b6..c9534d400 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -95,8 +95,8 @@ static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { // Build a thread pool device on top the an existing pool of threads. struct ThreadPoolDevice { // The ownership of the thread pool remains with the caller. - ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) - : pool_(pool), num_threads_(num_cores), allocator_(nullptr) { } + ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr) + : pool_(pool), num_threads_(num_cores), allocator_(allocator) { } EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { return allocator_ ? allocator_->allocate(num_bytes) From d4afccde5a9553ddfb48b0f5fad0115cd8bf791a Mon Sep 17 00:00:00 2001 From: Paul Tucker Date: Thu, 19 Jul 2018 17:43:44 -0700 Subject: [PATCH 3/4] Add test coverage for ThreadPoolDevice optional allocator. --- .../CXX11/src/Tensor/TensorDeviceThreadPool.h | 7 +++ unsupported/test/cxx11_tensor_thread_pool.cpp | 45 +++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index c9534d400..f4123b71d 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -91,6 +91,13 @@ static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { } } +// An abstract interface to a device specific memory allocator. +class Allocator { + public: + virtual ~Allocator() {} + EIGEN_DEVICE_FUNC virtual void* allocate(size_t num_bytes) const = 0; + EIGEN_DEVICE_FUNC virtual void deallocate(void* buffer) const = 0; +}; // Build a thread pool device on top the an existing pool of threads. struct ThreadPoolDevice { diff --git a/unsupported/test/cxx11_tensor_thread_pool.cpp b/unsupported/test/cxx11_tensor_thread_pool.cpp index 2ef665f30..200664740 100644 --- a/unsupported/test/cxx11_tensor_thread_pool.cpp +++ b/unsupported/test/cxx11_tensor_thread_pool.cpp @@ -16,6 +16,25 @@ using Eigen::Tensor; +class TestAllocator : public Allocator { + public: + ~TestAllocator() override {} + EIGEN_DEVICE_FUNC void* allocate(size_t num_bytes) const override { + const_cast(this)->alloc_count_++; + return internal::aligned_malloc(num_bytes); + } + EIGEN_DEVICE_FUNC void deallocate(void* buffer) const override { + const_cast(this)->dealloc_count_++; + internal::aligned_free(buffer); + } + + int alloc_count() const { return alloc_count_; } + int dealloc_count() const { return dealloc_count_; } + + private: + int alloc_count_ = 0; + int dealloc_count_ = 0; +}; void test_multithread_elementwise() { @@ -320,14 +339,14 @@ void test_multithread_random() } template -void test_multithread_shuffle() +void test_multithread_shuffle(Allocator* allocator) { Tensor tensor(17,5,7,11); tensor.setRandom(); const int num_threads = internal::random(2, 11); ThreadPool threads(num_threads); - Eigen::ThreadPoolDevice device(&threads, num_threads); + Eigen::ThreadPoolDevice device(&threads, num_threads, allocator); Tensor shuffle(7,5,11,17); array shuffles = {{2,1,3,0}}; @@ -344,6 +363,21 @@ void test_multithread_shuffle() } } +void test_threadpool_allocate(TestAllocator* allocator) +{ + const int num_threads = internal::random(2, 11); + const int num_allocs = internal::random(2, 11); + ThreadPool threads(num_threads); + Eigen::ThreadPoolDevice device(&threads, num_threads, allocator); + + for (int a = 0; a < num_allocs; ++a) { + void* ptr = device.allocate(512); + device.deallocate(ptr); + } + VERIFY(allocator != nullptr); + VERIFY_IS_EQUAL(allocator->alloc_count(), num_allocs); + VERIFY_IS_EQUAL(allocator->dealloc_count(), num_allocs); +} void test_cxx11_tensor_thread_pool() { @@ -368,6 +402,9 @@ void test_cxx11_tensor_thread_pool() CALL_SUBTEST_6(test_memcpy()); CALL_SUBTEST_6(test_multithread_random()); - CALL_SUBTEST_6(test_multithread_shuffle()); - CALL_SUBTEST_6(test_multithread_shuffle()); + + TestAllocator test_allocator; + CALL_SUBTEST_6(test_multithread_shuffle(nullptr)); + CALL_SUBTEST_6(test_multithread_shuffle(&test_allocator)); + CALL_SUBTEST_6(test_threadpool_allocate(&test_allocator)); } From 385f7b8d0ca926d00c71987ab308202511e5c753 Mon Sep 17 00:00:00 2001 From: Paul Tucker Date: Tue, 31 Jul 2018 13:52:18 -0700 Subject: [PATCH 4/4] Change getAllocator() to allocator() in ThreadPoolDevice. --- unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h index f4123b71d..f8188ffde 100644 --- a/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h +++ b/unsupported/Eigen/CXX11/src/Tensor/TensorDeviceThreadPool.h @@ -281,7 +281,7 @@ struct ThreadPoolDevice { ThreadPoolInterface* getPool() const { return pool_; } // Allocator accessor. - Allocator* getAllocator() const { return allocator_; } + Allocator* allocator() const { return allocator_; } private: ThreadPoolInterface* pool_;