Clean up TensorDeviceThreadPool.h

This commit is contained in:
Rasmus Munk Larsen 2025-03-07 18:14:17 +00:00
parent 43810fc1be
commit 350544eb01
4 changed files with 45 additions and 83 deletions

View File

@ -932,7 +932,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
kernel(m, n, k, use_thread_local);
} else {
eigen_assert(!use_thread_local);
device_.enqueueNoNotification([this, m, n, k, use_thread_local]() {
device_.enqueue([this, m, n, k, use_thread_local]() {
kernel(m, n, k, use_thread_local);
});
}
@ -982,7 +982,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
} else {
while (end - start > 1) {
Index mid = (start + end) / 2;
device_.enqueueNoNotification([this, mid, end, k, rhs]() {
device_.enqueue([this, mid, end, k, rhs]() {
enqueue_packing_helper(mid, end, k, rhs);
});
end = mid;
@ -1000,7 +1000,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
(k > 0 || std::this_thread::get_id() == created_by_thread_id_);
if (pack_async) {
device_.enqueueNoNotification([this, start, end, k, rhs]() {
device_.enqueue([this, start, end, k, rhs]() {
enqueue_packing_helper(start, end, k, rhs);
});
} else {
@ -1264,7 +1264,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
void eval(Barrier& barrier, Index start_block_idx, Index end_block_idx) {
while (end_block_idx - start_block_idx > 1) {
Index mid_block_idx = (start_block_idx + end_block_idx) / 2;
evaluator->m_device.enqueueNoNotification([this, &barrier, mid_block_idx, end_block_idx]() {
evaluator->m_device.enqueue([this, &barrier, mid_block_idx, end_block_idx]() {
eval<Alignment>(barrier, mid_block_idx, end_block_idx);
});
end_block_idx = mid_block_idx;
@ -1282,7 +1282,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
void evalAsync(Index start_block_idx, Index end_block_idx) {
while (end_block_idx - start_block_idx > 1) {
Index mid_block_idx = (start_block_idx + end_block_idx) / 2;
evaluator->m_device.enqueueNoNotification(
evaluator->m_device.enqueue(
[this, mid_block_idx, end_block_idx]() {
evalAsync<Alignment>(mid_block_idx, end_block_idx);
});

View File

@ -15,35 +15,6 @@
namespace Eigen {
// Runs an arbitrary function and then calls Notify() on the passed in
// Notification.
template <typename Function, typename... Args>
struct FunctionWrapperWithNotification {
static void run(Notification* n, Function f, Args... args) {
f(args...);
if (n) {
n->Notify();
}
}
};
template <typename Function, typename... Args>
struct FunctionWrapperWithBarrier {
static void run(Barrier* b, Function f, Args... args) {
f(args...);
if (b) {
b->Notify();
}
}
};
template <typename SyncType>
static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
if (n) {
n->Wait();
}
}
// An abstract interface to a device specific memory allocator.
class Allocator {
public:
@ -98,8 +69,9 @@ struct ThreadPoolDevice {
Barrier barrier(static_cast<int>(num_threads - 1));
// Launch the last 3 blocks on worker threads.
for (size_t i = 1; i < num_threads; ++i) {
enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
enqueue([n, i, src_ptr, dst_ptr, blocksize, &barrier] {
::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize)));
barrier.Notify();
});
}
// Launch the first block on the main thread.
@ -140,24 +112,22 @@ struct ThreadPoolDevice {
return 1;
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
Notification* n = new Notification();
pool_->Schedule(
std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, std::forward<Function>(f), args...));
return n;
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f, Args&&... args) const {
pool_->Schedule(
std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b, std::forward<Function>(f), args...));
}
// TODO(rmlarsen): Remove this deprecated interface when all users have been converted.
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
enqueue(std::forward<Function>(f), std::forward<Args>(args)...);
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueue(Function&& f, Args&&... args) const {
if (sizeof...(args) > 0) {
pool_->Schedule(std::bind(std::forward<Function>(f), args...));
#if EIGEN_COMP_CXXVER >= 20
auto run_f = [f = std::forward<Function>(f),
...args = std::forward<Args>(args)]() { f(args...); };
#else
auto run_f = [f = std::forward<Function>(f), &args...]() { f(args...); };
#endif
pool_->Schedule(std::move(run_f));
} else {
pool_->Schedule(std::forward<Function>(f));
}
@ -191,27 +161,16 @@ struct ThreadPoolDevice {
// Division code rounds mid to block_size, so we are guaranteed to get
// block_count leaves that do actual computations.
Barrier barrier(static_cast<unsigned int>(block.count));
std::function<void(Index, Index)> handleRange;
handleRange = [this, block, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
while (lastIdx - firstIdx > block.size) {
// Split into halves and schedule the second half on a different thread.
const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
lastIdx = midIdx;
}
// Single block or less, execute directly.
f(firstIdx, lastIdx);
barrier.Notify();
};
if (block.count <= numThreads()) {
// Avoid a thread hop by running the root of the tree and one block on the
// main thread.
handleRange(0, n);
handleRange(0, n, block.size, &barrier, pool_, f);
} else {
// Execute the root in the thread pool to avoid running work on more than
// numThreads() threads.
pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
pool_->Schedule([this, n, &block, &barrier, &f]() {
handleRange(0, n, block.size, &barrier, pool_, f);
});
}
barrier.Wait();
@ -287,6 +246,21 @@ struct ThreadPoolDevice {
private:
typedef TensorCostModel<ThreadPoolDevice> CostModel;
static void handleRange(Index firstIdx, Index lastIdx, Index granularity,
Barrier* barrier, ThreadPoolInterface* pool, const std::function<void(Index, Index)>& f) {
while (lastIdx - firstIdx > granularity) {
// Split into halves and schedule the second half on a different thread.
const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, granularity) * granularity;
pool->Schedule([=, &f]() {
handleRange(midIdx, lastIdx, granularity, barrier, pool, f);
});
lastIdx = midIdx;
}
// Single block or less, execute directly.
f(firstIdx, lastIdx);
barrier->Notify();
}
// For parallelForAsync we must keep passed in closures on the heap, and
// delete them only after `done` callback finished.
struct ParallelForAsyncContext {

View File

@ -352,7 +352,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable,
TensorBlockDesc desc(0, tiling.block_mapper.blockDimensions());
evaluator.evalBlock(desc, scratch);
} else {
device.parallelFor(tiling.block_mapper.blockCount(), tiling.cost, eval_block);
device.parallelFor(tiling.block_mapper.blockCount(), tiling.cost, std::move(eval_block));
}
}
evaluator.cleanup();

View File

@ -357,17 +357,6 @@ struct FullReducer {
};
#ifdef EIGEN_USE_THREADS
// Multithreaded full reducers
template <typename Self, typename Op,
bool Vectorizable = (Self::InputPacketAccess && Self::ReducerTraits::PacketAccess)>
struct FullReducerShard {
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(const Self& self, typename Self::Index firstIndex,
typename Self::Index numValuesToReduce, Op& reducer,
typename Self::CoeffReturnType* output) {
*output = InnerMostDimReducer<Self, Op, Vectorizable>::reduce(self, firstIndex, numValuesToReduce, reducer);
}
};
// Multithreaded full reducer
template <typename Self, typename Op, bool Vectorizable>
struct FullReducer<Self, Op, ThreadPoolDevice, Vectorizable> {
@ -397,8 +386,11 @@ struct FullReducer<Self, Op, ThreadPoolDevice, Vectorizable> {
Barrier barrier(internal::convert_index<unsigned int>(numblocks));
MaxSizeVector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) {
device.enqueue_with_barrier(&barrier, &FullReducerShard<Self, Op, Vectorizable>::run, self, i * blocksize,
blocksize, reducer, &shards[i]);
auto run_shard = [i, blocksize, &self, &barrier, &shards, &reducer](){
shards[i] = InnerMostDimReducer<Self, Op, Vectorizable>::reduce(self, i * blocksize, blocksize, reducer);
barrier.Notify();
};
device.enqueue(std::move(run_shard));
}
typename Self::CoeffReturnType finalShard;
if (numblocks * blocksize < num_coeffs) {
@ -888,10 +880,6 @@ struct TensorReductionEvaluatorBase<const TensorReductionOp<Op, Dims, ArgType, M
friend struct internal::InnerMostDimPreserver;
template <typename S, typename O, typename D, bool V>
friend struct internal::FullReducer;
#ifdef EIGEN_USE_THREADS
template <typename S, typename O, bool V>
friend struct internal::FullReducerShard;
#endif
#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC))
template <int B, int N, typename S, typename R, typename I_>
KERNEL_FRIEND void internal::FullReductionKernel(R, const S, I_, typename S::CoeffReturnType*, unsigned int*);