evalSubExprsIfNeededAsync + async TensorContractionThreadPool

This commit is contained in:
Eugene Zhulenev 2019-08-30 15:13:38 -07:00
parent 619cea9491
commit f0b36fb9a4
9 changed files with 833 additions and 302 deletions

View File

@ -147,6 +147,18 @@ struct TensorEvaluator<const TensorAssignOp<LeftArgType, RightArgType>, Device>
// by the rhs to the lhs. // by the rhs to the lhs.
return m_rightImpl.evalSubExprsIfNeeded(m_leftImpl.data()); return m_rightImpl.evalSubExprsIfNeeded(m_leftImpl.data());
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType, EvalSubExprsCallback done) {
m_leftImpl.evalSubExprsIfNeededAsync(nullptr, [this, done](bool) {
m_rightImpl.evalSubExprsIfNeededAsync(
m_leftImpl.data(), [done](bool need_assign) { done(need_assign); });
});
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {
m_leftImpl.cleanup(); m_leftImpl.cleanup();
m_rightImpl.cleanup(); m_rightImpl.cleanup();

View File

@ -214,6 +214,14 @@ struct TensorEvaluator<const TensorBroadcastingOp<Broadcast, ArgType>, Device>
return true; return true;
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType, EvalSubExprsCallback done) {
m_impl.evalSubExprsIfNeededAsync(nullptr, [done](bool) { done(true); });
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {
m_impl.cleanup(); m_impl.cleanup();
} }

View File

@ -605,48 +605,99 @@ struct TensorContractionEvaluatorBase
} }
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType dest, EvalSubExprsCallback done) {
m_leftImpl.evalSubExprsIfNeededAsync(nullptr, [this, done, dest](bool) {
m_rightImpl.evalSubExprsIfNeededAsync(nullptr, [this, done, dest](bool) {
if (dest) {
evalToAsync(dest, [done]() { done(false); });
} else {
m_result = static_cast<EvaluatorPointerType>(
m_device.allocate(dimensions().TotalSize() * sizeof(Scalar)));
evalToAsync(m_result, [done]() { done(true); });
}
});
});
}
#endif // EIGEN_USE_THREADS
#define TENSOR_CONTRACTION_DISPATCH(METHOD, ALIGNMENT, ARGS) \ #define TENSOR_CONTRACTION_DISPATCH(METHOD, ALIGNMENT, ARGS) \
if (this->m_lhs_inner_dim_contiguous) { \ if (this->m_lhs_inner_dim_contiguous) { \
if (this->m_rhs_inner_dim_contiguous) { \ if (this->m_rhs_inner_dim_contiguous) { \
if (this->m_rhs_inner_dim_reordered) { \ if (this->m_rhs_inner_dim_reordered) { \
METHOD<true, true, true, ALIGNMENT> ARGS; \ METHOD<true, true, true, ALIGNMENT> ARGS; \
} \ } else { \
else { \
METHOD<true, true, false, ALIGNMENT> ARGS; \ METHOD<true, true, false, ALIGNMENT> ARGS; \
} \ } \
} \ } else { \
else { \
if (this->m_rhs_inner_dim_reordered) { \ if (this->m_rhs_inner_dim_reordered) { \
METHOD<true, false, true, ALIGNMENT> ARGS; \ METHOD<true, false, true, ALIGNMENT> ARGS; \
} \ } else { \
else { \
METHOD<true, false, false, ALIGNMENT> ARGS; \ METHOD<true, false, false, ALIGNMENT> ARGS; \
} \ } \
} \ } \
} \ } else { \
else { \
if (this->m_rhs_inner_dim_contiguous) { \ if (this->m_rhs_inner_dim_contiguous) { \
if (this->m_rhs_inner_dim_reordered) { \ if (this->m_rhs_inner_dim_reordered) { \
METHOD<false, true, true, ALIGNMENT> ARGS; \ METHOD<false, true, true, ALIGNMENT> ARGS; \
} \ } else { \
else { \
METHOD<false, true, false, ALIGNMENT> ARGS; \ METHOD<false, true, false, ALIGNMENT> ARGS; \
} \ } \
} \ } else { \
else { \
if (this->m_rhs_inner_dim_reordered) { \ if (this->m_rhs_inner_dim_reordered) { \
METHOD<false, false, true, ALIGNMENT> ARGS; \ METHOD<false, false, true, ALIGNMENT> ARGS; \
} \ } else { \
else { \
METHOD<false, false, false, ALIGNMENT> ARGS; \ METHOD<false, false, false, ALIGNMENT> ARGS; \
} \ } \
} \ } \
} }
#define TENSOR_CONTRACTION_ASYNC_DISPATCH(METHOD, DONE, ALIGNMENT, ARGS, FN) \
if (this->m_lhs_inner_dim_contiguous) { \
if (this->m_rhs_inner_dim_contiguous) { \
if (this->m_rhs_inner_dim_reordered) { \
(new METHOD<DONE, true, true, true, ALIGNMENT> ARGS)->FN; \
} else { \
(new METHOD<DONE, true, true, false, ALIGNMENT> ARGS)->FN; \
} \
} else { \
if (this->m_rhs_inner_dim_reordered) { \
(new METHOD<DONE, true, false, true, ALIGNMENT> ARGS)->FN; \
} else { \
(new METHOD<DONE, true, false, false, ALIGNMENT> ARGS)->FN; \
} \
} \
} else { \
if (this->m_rhs_inner_dim_contiguous) { \
if (this->m_rhs_inner_dim_reordered) { \
(new METHOD<DONE, false, true, true, ALIGNMENT> ARGS)->FN; \
} else { \
(new METHOD<DONE, false, true, false, ALIGNMENT> ARGS)->FN; \
} \
} else { \
if (this->m_rhs_inner_dim_reordered) { \
(new METHOD<DONE, false, false, true, ALIGNMENT> ARGS)->FN; \
} else { \
(new METHOD<DONE, false, false, false, ALIGNMENT> ARGS)->FN; \
} \
} \
}
EIGEN_DEVICE_FUNC void evalTo(Scalar* buffer) const { EIGEN_DEVICE_FUNC void evalTo(Scalar* buffer) const {
static_cast<const Derived*>(this)->template evalProduct<Unaligned>(buffer); static_cast<const Derived*>(this)->template evalProduct<Unaligned>(buffer);
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalToCallback>
void evalToAsync(Scalar* buffer, EvalToCallback done) const {
static_cast<const Derived*>(this)
->template evalProductAsync<EvalToCallback, Unaligned>(buffer,
std::move(done));
}
#endif // EIGEN_USE_THREADS
template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous,
bool rhs_inner_dim_reordered, int Alignment> bool rhs_inner_dim_reordered, int Alignment>
void evalProductSequential(Scalar* buffer) const { void evalProductSequential(Scalar* buffer) const {

View File

@ -73,6 +73,34 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
template <int Alignment> template <int Alignment>
void evalProduct(Scalar* buffer) const { void evalProduct(Scalar* buffer) const {
evalProductImpl<NoCallback, Alignment>(buffer, NoCallback());
}
template <typename EvalToCallback, int Alignment>
void evalProductAsync(Scalar* buffer, EvalToCallback done) const {
evalProductImpl<EvalToCallback, Alignment>(buffer, std::move(done));
}
template <typename DoneCallback, int Alignment>
void evalProductImpl(Scalar* buffer, DoneCallback done) const {
// This function computes a lot of heuristics in multiple steps, and it
// also has multiple exit points. To keep it sane, readable and all in one
// place, sync/async execution decision is made at runtime at the very end.
//
// (1) In sync mode we allocate Context on the stack, submit computations
// to the device thread pool, and block on a barrier until it is
// completed.
//
// (2) In async mode we allocate Context on the heap, and after all tasks
// are finished, we call provided the done callback, and delete a
// context from the heap.
//
// (*) EvalParallelContext & EvalShardedByInnerDimContext owns all the state
// and temporary buffers, requried for executing the tensor contraction.
// They are responsible for cleaning it up after contraction is done.
static const bool IsEvalInSyncMode =
std::is_same<DoneCallback, NoCallback>::value;
const Index m = this->m_i_size; const Index m = this->m_i_size;
const Index n = this->m_j_size; const Index n = this->m_j_size;
const Index k = this->m_k_size; const Index k = this->m_k_size;
@ -134,8 +162,16 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
if (shardByInnerDim(m, n, k, num_threads, num_threads_by_k)) { if (shardByInnerDim(m, n, k, num_threads, num_threads_by_k)) {
// We are in the scenario where it is more effective to shard by the // We are in the scenario where it is more effective to shard by the
// inner dimension. // inner dimension.
this->template evalShardedByInnerDim<Alignment>(num_threads_by_k, if (IsEvalInSyncMode) {
buffer); EvalShardedByInnerDimContext<DoneCallback> ctx(
this, num_threads_by_k, buffer, m, n, k, std::move(done));
ctx.template run<Alignment>();
} else {
auto* ctx = new EvalShardedByInnerDimContext<DoneCallback>(
this, num_threads_by_k, buffer, m, n, k, std::move(done));
ctx->template runAsync<Alignment>();
}
return; return;
} }
@ -146,6 +182,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
if (num_threads == 1) { if (num_threads == 1) {
TENSOR_CONTRACTION_DISPATCH(this->template evalProductSequential, TENSOR_CONTRACTION_DISPATCH(this->template evalProductSequential,
Unaligned, (buffer)); Unaligned, (buffer));
if (!IsEvalInSyncMode) done();
return; return;
} }
@ -230,21 +267,89 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// optimization. // optimization.
if (parallelize_by_sharding_dim_only) parallel_pack = false; if (parallelize_by_sharding_dim_only) parallel_pack = false;
// TODO(ezhulnev): With if contexpr we don't need SyncEvalParallelContext.
if (IsEvalInSyncMode) {
#define CONTEXT_ARGS \ #define CONTEXT_ARGS \
(this, num_threads, buffer, m, n, k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, \ (this, num_threads, buffer, m, n, k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, \
nn0, shard_by_col, parallel_pack, parallelize_by_sharding_dim_only) \ nn0, shard_by_col, parallel_pack, parallelize_by_sharding_dim_only, \
NoCallback()) \
.run() .run()
TENSOR_CONTRACTION_DISPATCH(SyncEvalParallelContext, Alignment,
TENSOR_CONTRACTION_DISPATCH(Context, Alignment, CONTEXT_ARGS); CONTEXT_ARGS);
#undef CONTEXT_ARGS #undef CONTEXT_ARGS
} else {
#define CONTEXT_ARGS \
(this, num_threads, buffer, m, n, k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, \
nn0, shard_by_col, parallel_pack, parallelize_by_sharding_dim_only, \
std::move(done))
TENSOR_CONTRACTION_ASYNC_DISPATCH(EvalParallelContext, DoneCallback,
Alignment, CONTEXT_ARGS, run());
#undef CONTEXT_ARGS
}
} }
// Context coordinates a single parallel gemm operation. // ------------------------------------------------------------------------ //
template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous,
bool rhs_inner_dim_reordered, int Alignment> // Dummy struct to represent an empty DoneCallback.
class Context {
struct NoCallback {
void operator()() {
eigen_assert(false && "NoCallback should never be called");
}
};
// ------------------------------------------------------------------------ //
template <typename DoneCallback, typename Context>
class EvalParallelNotification;
// Synchronous evaluation notification that blocks caller thread in Wait().
template <typename Context>
class EvalParallelNotification<NoCallback, Context> {
public:
EvalParallelNotification(Context*, NoCallback) {}
void Notify() { done_.Notify(); }
void Wait() { done_.Wait(); }
private:
Eigen::Notification done_;
};
// Asynchronous evaluation notification that does not block in Wait().
template <typename DoneCallback, typename Context>
class EvalParallelNotification {
public:
EvalParallelNotification(Context* ctx, DoneCallback done)
: ctx_(ctx), done_(std::move(done)) {}
void Notify() {
// Make a copy of done callback, because it will be destructed when we
// will delete context in the next line (EvalParallelNotification is a
// data member of EvalParallelContext class).
DoneCallback done_copy = std::move(done_);
// Delete parallel evaluation context.
delete ctx_;
// Now safely call the done callback.
done_copy();
}
void Wait() {}
private:
Context* ctx_;
DoneCallback done_;
};
// Context orchestrates sync/async parallel contraction evaluation. When it is
// executed in asynchronous mode, it owns all the shared state that might be
// accessible by block packing and kernel tasks.
template <typename DoneCallback, bool lhs_inner_dim_contiguous,
bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered,
int Alignment>
class EvalParallelContext {
public: public:
typedef internal::TensorContractionInputMapper< typedef internal::TensorContractionInputMapper<
LhsScalar, Index, internal::Lhs, LeftEvaluator, left_nocontract_t, LhsScalar, Index, internal::Lhs, LeftEvaluator, left_nocontract_t,
@ -267,11 +372,15 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
typedef typename TensorContractionKernel::RhsBlock RhsBlock; typedef typename TensorContractionKernel::RhsBlock RhsBlock;
typedef typename TensorContractionKernel::BlockMemHandle BlockMemHandle; typedef typename TensorContractionKernel::BlockMemHandle BlockMemHandle;
Context(const Self* self, int num_threads, Scalar* buffer, Index tm, Index tn, EvalParallelContext(const Self* self, int num_threads, Scalar* buffer,
Index tk, Index bm, Index bn, Index bk, Index nm, Index nn, Index nk, Index tm, Index tn, Index tk, Index bm, Index bn,
Index gm, Index gn, Index nm0, Index nn0, bool shard_by_col, Index bk, Index nm, Index nn, Index nk, Index gm,
bool parallel_pack, bool parallelize_by_sharding_dim_only) Index gn, Index nm0, Index nn0, bool shard_by_col,
: device_(self->m_device), bool parallel_pack,
bool parallelize_by_sharding_dim_only,
DoneCallback done)
: done_(this, std::move(done)),
device_(self->m_device),
lhs_(self->m_leftImpl, self->m_left_nocontract_strides, lhs_(self->m_leftImpl, self->m_left_nocontract_strides,
self->m_i_strides, self->m_left_contracting_strides, self->m_i_strides, self->m_left_contracting_strides,
self->m_k_strides), self->m_k_strides),
@ -299,8 +408,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
gn_(gn), gn_(gn),
nm0_(nm0), nm0_(nm0),
nn0_(nn0), nn0_(nn0),
kernel_(m_, k_, n_, bm_, bk_, bn_) kernel_(m_, k_, n_, bm_, bk_, bn_) {
{
// These two options are mutually exclusive. // These two options are mutually exclusive.
eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only)); eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only));
@ -371,7 +479,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
} }
} }
~Context() { ~EvalParallelContext() {
for (Index x = 0; x < P; x++) { for (Index x = 0; x < P; x++) {
for (Index m = 0; m < nm_; m++) delete[] state_kernel_[x][m]; for (Index m = 0; m < nm_; m++) delete[] state_kernel_[x][m];
delete[] state_kernel_[x]; delete[] state_kernel_[x];
@ -386,16 +494,28 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
void run() { void run() {
// Kick off packing of the first slice. // Kick off packing of the first slice.
signal_switch(0, 1); signal_switch(0, 1);
// Wait for overall completion. // Wait for overall completion.
// TODO(dvyukov): this wait can lead to deadlock. //
// If nthreads contractions are concurrently submitted from worker // If parallel evaluation is executed in async mode, this is a no-op, and
// threads, this wait will block all worker threads and the system will // Wait() will return immediately. In synchronous mode it will block the
// deadlock. // caller thread until it will receive notification from last task.
//
// In async mode, last task when completed will call done callback from
// the same thread, and will delete this context.
//
// TODO(dvyukov): This wait can lead to deadlock if contraction is
// evaluated in synchronous mode. If nthreads contractions are
// concurrently submitted from worker threads, this wait will block all
// worker threads and the system will deadlock.
done_.Wait(); done_.Wait();
} }
private: private:
Notification done_; // This notification is specialized on the type of DoneCallback and can be
// blocking or non-blocking.
EvalParallelNotification<DoneCallback, EvalParallelContext> done_;
const Device& device_; const Device& device_;
LhsMapper lhs_; LhsMapper lhs_;
RhsMapper rhs_; RhsMapper rhs_;
@ -780,10 +900,344 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
Index gm(Index m) const { return m + 1 < nm_ ? gm_ : nm0_ + gm_ - gm_ * nm_; } Index gm(Index m) const { return m + 1 < nm_ ? gm_ : nm0_ + gm_ - gm_ * nm_; }
Index gn(Index n) const { return n + 1 < nn_ ? gn_ : nn0_ + gn_ - gn_ * nn_; } Index gn(Index n) const { return n + 1 < nn_ ? gn_ : nn0_ + gn_ - gn_ * nn_; }
Context(const Context&) = delete; EvalParallelContext(const EvalParallelContext&) = delete;
void operator=(const Context&) = delete; void operator=(const EvalParallelContext&) = delete;
}; };
template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous,
bool rhs_inner_dim_reordered, int Alignment>
using SyncEvalParallelContext =
EvalParallelContext<NoCallback, lhs_inner_dim_contiguous,
rhs_inner_dim_contiguous, rhs_inner_dim_reordered,
Alignment>;
// ------------------------------------------------------------------------ //
// EvalShardedByInnerDimContext orchestrates sync/async contraction
// evaluation, when we shard by inner dimension. When it is executed in
// asynchronous mode, it owns all the shared state that might be accessible by
// block processing tasks.
template <typename DoneCallback>
struct EvalShardedByInnerDimContext {
EvalShardedByInnerDimContext(const Self* evaluator, int num_threads,
Scalar* result, Index m, Index n, Index k,
DoneCallback done)
: evaluator(evaluator),
m_lhs_inner_dim_contiguous(evaluator->m_lhs_inner_dim_contiguous),
m_rhs_inner_dim_contiguous(evaluator->m_rhs_inner_dim_contiguous),
m_rhs_inner_dim_reordered(evaluator->m_rhs_inner_dim_reordered),
num_threads(num_threads),
result(result),
m(m),
n(n),
k(k),
done(std::move(done)),
buffer_size_bytes(m * n * sizeof(Scalar)),
block_size(blockSize(k, num_threads)),
num_blocks(divup<Index>(k, block_size)),
num_pending_blocks(internal::convert_index<int>(num_blocks)),
l0_ranges(divup<Index>(num_blocks, l0_size)),
l0_state(l0_ranges),
block_buffers(num_blocks) {
// Keep count of pending gemm tasks for each l0 range.
for (int i = 0; i < l0_ranges; ++i) {
const Index num_pending_tasks = actualRangeSize(l0_ranges, l0_size, i);
l0_state.emplace_back(internal::convert_index<int>(num_pending_tasks));
}
// Allocate temporary buffers for each block.
for (Index block_idx = 0; block_idx < num_blocks; ++block_idx) {
Scalar* buf = block_idx == 0
? result
: static_cast<Scalar*>(evaluator->m_device.allocate(
buffer_size_bytes));
block_buffers.emplace_back(buf);
}
}
~EvalShardedByInnerDimContext() {
for (Index i = 1; i < num_blocks; ++i) {
evaluator->m_device.deallocate(block_buffers[i]);
}
}
template <int Alignment>
void run() {
Barrier barrier(internal::convert_index<int>(num_blocks));
for (Index block_idx = 0; block_idx < num_blocks; ++block_idx) {
evaluator->m_device.enqueueNoNotification(
[this, block_idx, &barrier]() {
Index block_start = block_idx * block_size;
Index block_end = block_start + actualBlockSize(block_idx);
processBlock<Alignment>(block_idx, block_start, block_end);
barrier.Notify();
});
}
barrier.Wait();
// Aggregate partial sums from l0 ranges.
aggregateL0Blocks<Alignment>();
// Apply output kernel.
applyOutputKernel();
}
template <int Alignment>
void runAsync() {
for (Index block_idx = 0; block_idx < num_blocks; ++block_idx) {
evaluator->m_device.enqueueNoNotification([this, block_idx]() {
Index block_start = block_idx * block_size;
Index block_end = block_start + actualBlockSize(block_idx);
processBlock<Alignment>(block_idx, block_start, block_end);
int v = num_pending_blocks.fetch_sub(1);
eigen_assert(v >= 1);
if (v == 1) {
// Aggregate partial sums from l0 ranges.
aggregateL0Blocks<Alignment>();
// Apply output kernel.
applyOutputKernel();
// NOTE: If we call `done` callback before deleting this (context),
// it might deallocate Self* pointer captured by context, and we'll
// fail in destructor trying to deallocate temporary buffers.
// Move done call back from context before it will be destructed.
DoneCallback done_copy = std::move(done);
// We are confident that we are the last one who touches context.
delete this;
// Now safely call the done callback.
done_copy();
}
});
}
}
private:
// The underlying GEMM kernel assumes that k is a multiple of
// the packet size and subtle breakage occurs if this is violated.
static const Index packet_size = internal::packet_traits<RhsScalar>::size;
const Self* evaluator; // TensorContraction evaluator
// These fields required fromTENSOR_CONTRACTION_DISPATCH macro.
bool m_lhs_inner_dim_contiguous;
bool m_rhs_inner_dim_contiguous;
bool m_rhs_inner_dim_reordered;
int num_threads;
Scalar* result;
Index m;
Index n;
Index k;
DoneCallback done;
// ----------------------------------------------------------------------//
// Algorithm parameters.
// We will compute partial results into the buffers of this size.
Index buffer_size_bytes;
Index block_size;
Index num_blocks;
// Keep track of pending tasks when evaluate in async mode.
std::atomic<int> num_pending_blocks;
// We compute partial gemm results in parallel, and to get the final result
// we need to add them all together. For the large number of threads (>= 48)
// this adds a very expensive sequential step at the end.
//
// We split the [0, num_blocks) into small ranges, and when a task for the
// block finishes its partial gemm computation, it checks if it was the last
// gemm in the range, and if so, it will add all blocks of the range.
//
// After all tasks done, we need to add only these pre-aggregated blocks.
// For now we use just a single level of ranges to compute pre-aggregated
// partial sums, but in general we can use more layers to compute tree
// aggregation in parallel and reduce the size of the sequential step.
//
// TODO(ezhulenev): Add multilevel tree aggregation? Probably will make
// sense only if number of threads >= ~128?
static const Index l0_size = 4;
Index l0_ranges;
// Keep count of pending gemm tasks for each l0 range.
MaxSizeVector<std::atomic<int>> l0_state; // [0, l0_ranges)
// Buffers allocated for each temporary block computation.
MaxSizeVector<Scalar*> block_buffers; // [0, num_blocks)
template <int Alignment>
void processBlock(Index block_idx, Index begin, Index end) {
Scalar* buf = block_buffers[block_idx];
::memset(buf, 0, buffer_size_bytes);
TENSOR_CONTRACTION_DISPATCH(
evaluator->template evalGemmPartialWithoutOutputKernel, Alignment,
(buf, begin, end,
/*num_threads=*/internal::convert_index<int>(num_blocks)));
// Check if it was the last task in l0 range.
const Index l0_index = block_idx / l0_size;
const int v = l0_state[l0_index].fetch_sub(1);
eigen_assert(v >= 1);
// If we processed the last block of the range, we can aggregate all
// partial results into the first block of the range.
if (v == 1) {
const Index rng_size = actualRangeSize(l0_ranges, l0_size, l0_index);
const Index dst_block_idx = l0_index * l0_size;
if (rng_size == l0_size) {
addAllToBuffer<Alignment>(
m * n,
/*src_buf0=*/block_buffers[dst_block_idx + 1],
/*src_buf1=*/block_buffers[dst_block_idx + 2],
/*src_buf2=*/block_buffers[dst_block_idx + 3],
/*dst_buf= */ block_buffers[dst_block_idx]);
} else {
// Aggregate blocks of potentially incomplete last range.
for (int i = 1; i < rng_size; ++i) {
addToBuffer<Alignment>(m * n,
/*src_buf=*/block_buffers[dst_block_idx + i],
/*dst_buf=*/block_buffers[dst_block_idx]);
}
}
}
}
// Aggregate partial sums from l0 ranges.
template <int Alignment>
void aggregateL0Blocks() const {
Index l0_index = 1;
for (; l0_index + 2 < l0_ranges; l0_index += 3) {
addAllToBuffer<Alignment>(
m * n,
/*src_buf0=*/block_buffers[(l0_index + 0) * l0_size],
/*src_buf1=*/block_buffers[(l0_index + 1) * l0_size],
/*src_buf2=*/block_buffers[(l0_index + 2) * l0_size],
/*dst_buf= */ block_buffers[0]);
}
for (; l0_index < l0_ranges; ++l0_index) {
addToBuffer<Alignment>(m * n, block_buffers[l0_index * l0_size],
block_buffers[0]);
}
}
void applyOutputKernel() const {
typedef internal::blas_data_mapper<Scalar, Index, ColMajor> OutputMapper;
evaluator->m_output_kernel(
OutputMapper(result, m), evaluator->m_tensor_contraction_params,
static_cast<Eigen::Index>(0), static_cast<Eigen::Index>(0), m, n);
}
// Compute block size with accounting for potentially incomplete last block.
Index actualBlockSize(Index block_idx) const {
return block_idx + 1 < num_blocks
? block_size
: k + block_size - block_size * num_blocks;
};
// Compute range size with accounting for potentially incomplete last range.
Index actualRangeSize(Index num_ranges, Index range_size,
Index range_idx) const {
eigen_assert(range_idx < num_ranges);
return range_idx + 1 < num_ranges
? range_size
: num_blocks + range_size - range_size * num_ranges;
};
template <int Alignment>
EIGEN_STRONG_INLINE static void addToBuffer(size_t n, const Scalar* src_buf,
Scalar* tgt_buf) {
const int output_packet_size =
internal::unpacket_traits<PacketReturnType>::size;
size_t i = 0;
const size_t num_packets = n / output_packet_size;
for (; i < output_packet_size * num_packets; i += output_packet_size) {
const PacketReturnType src_val =
internal::pload<PacketReturnType>(src_buf + i);
const PacketReturnType tgt_val =
internal::ploadt<PacketReturnType, Alignment>(tgt_buf + i);
const PacketReturnType sum = internal::padd(src_val, tgt_val);
internal::pstoret<Scalar, PacketReturnType, Alignment>(tgt_buf + i,
sum);
}
for (; i < n; ++i) {
tgt_buf[i] += src_buf[i];
}
}
template <int Alignment>
EIGEN_STRONG_INLINE static void addAllToBuffer(size_t n,
const Scalar* src_buf0,
const Scalar* src_buf1,
const Scalar* src_buf2,
Scalar* dst_buf) {
using ::Eigen::internal::padd;
using ::Eigen::internal::pload;
using ::Eigen::internal::ploadt;
using ::Eigen::internal::pstoret;
const int output_packet_size =
internal::unpacket_traits<PacketReturnType>::size;
size_t i = 0;
const size_t num_packets = n / output_packet_size;
for (; i < output_packet_size * num_packets; i += output_packet_size) {
const auto src_val0 = pload<PacketReturnType>(src_buf0 + i);
const auto src_val1 = pload<PacketReturnType>(src_buf1 + i);
const auto src_val2 = pload<PacketReturnType>(src_buf2 + i);
const auto dst_val = ploadt<PacketReturnType, Alignment>(dst_buf + i);
const auto sum =
padd(padd(dst_val, src_val0), padd(src_val1, src_val2));
pstoret<Scalar, PacketReturnType, Alignment>(dst_buf + i, sum);
}
for (; i < n; ++i) {
dst_buf[i] += src_buf0[i] + src_buf1[i] + src_buf2[i];
}
}
// Cost model doesn't capture well the cost associated with constructing
// tensor contraction mappers and computing loop bounds in gemm_pack_lhs
// and gemm_pack_rhs, so we specify minimum desired block size.
static Index blockSize(Index k, int num_threads) {
const auto round_up = [=](Index index) -> Index {
const Index kmultiple = packet_size <= 8 ? 8 : packet_size;
return divup<Index>(index, kmultiple) * kmultiple;
};
const Index target_block_size = round_up(divup<Index>(k, num_threads));
const Index desired_min_block_size = 12 * packet_size;
return numext::mini<Index>(
k, numext::maxi<Index>(desired_min_block_size, target_block_size));
}
EvalShardedByInnerDimContext(const EvalShardedByInnerDimContext&) = delete;
void operator=(const EvalShardedByInnerDimContext&) = delete;
};
// ------------------------------------------------------------------------ //
// Below are the function used by evalProductImpl heuristics, trying to select
// optimcal parameters for parallelization algorithm.
// Decide whether we want to shard m x n contraction by columns or by rows. // Decide whether we want to shard m x n contraction by columns or by rows.
static bool shardByCol(Index m, Index n, Index num_threads) { static bool shardByCol(Index m, Index n, Index num_threads) {
// Note: we are comparing both n and m against Traits::nr, it is not // Note: we are comparing both n and m against Traits::nr, it is not
@ -916,55 +1370,6 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
return cost + lhsCost + rhsCost; return cost + lhsCost + rhsCost;
} }
template <int Alignment>
EIGEN_STRONG_INLINE void addToBuffer(size_t n, const Scalar* src_buf,
Scalar* tgt_buf) const {
const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
size_t i = 0;
const size_t num_packets = n / output_packet_size;
for (; i < output_packet_size * num_packets; i += output_packet_size) {
const PacketReturnType src_val =
internal::pload<PacketReturnType>(src_buf + i);
const PacketReturnType tgt_val =
internal::ploadt<PacketReturnType, Alignment>(tgt_buf + i);
const PacketReturnType sum = internal::padd(src_val, tgt_val);
internal::pstoret<Scalar, PacketReturnType, Alignment>(tgt_buf + i, sum);
}
for (; i < n; ++i) {
tgt_buf[i] += src_buf[i];
}
}
template <int Alignment>
EIGEN_STRONG_INLINE void addAllToBuffer(size_t n, const Scalar* src_buf0,
const Scalar* src_buf1,
const Scalar* src_buf2,
Scalar* dst_buf) const {
using ::Eigen::internal::padd;
using ::Eigen::internal::pload;
using ::Eigen::internal::ploadt;
using ::Eigen::internal::pstoret;
const int output_packet_size =
internal::unpacket_traits<PacketReturnType>::size;
size_t i = 0;
const size_t num_packets = n / output_packet_size;
for (; i < output_packet_size * num_packets; i += output_packet_size) {
const auto src_val0 = pload<PacketReturnType>(src_buf0 + i);
const auto src_val1 = pload<PacketReturnType>(src_buf1 + i);
const auto src_val2 = pload<PacketReturnType>(src_buf2 + i);
const auto dst_val = ploadt<PacketReturnType, Alignment>(dst_buf + i);
const auto sum = padd(padd(dst_val, src_val0), padd(src_val1, src_val2));
pstoret<Scalar, PacketReturnType, Alignment>(dst_buf + i, sum);
}
for (; i < n; ++i) {
dst_buf[i] += src_buf0[i] + src_buf1[i] + src_buf2[i];
}
}
// Decide whether we want to shard m x k x n contraction over the inner // Decide whether we want to shard m x k x n contraction over the inner
// (contraction) dimension (k). // (contraction) dimension (k).
static bool shardByInnerDim(Index m, Index n, Index k, int num_threads, static bool shardByInnerDim(Index m, Index n, Index k, int num_threads,
@ -992,163 +1397,6 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
return shard_by_k; return shard_by_k;
} }
template <int Alignment>
void evalShardedByInnerDim(int num_threads, Scalar* result) const {
const Index m = this->m_i_size;
const Index n = this->m_j_size;
const Index k = this->m_k_size;
// We will compute partial results into the buffers of this size.
const Index buffer_size_bytes = m * n * sizeof(Scalar);
// The underlying GEMM kernel assumes that k is a multiple of
// the packet size and subtle breakage occurs if this is violated.
const Index packet_size = internal::packet_traits<RhsScalar>::size;
const auto round_up = [=](Index index) -> Index {
const Index kmultiple = packet_size <= 8 ? 8 : packet_size;
return divup<Index>(index, kmultiple) * kmultiple;
};
// Cost model doesn't capture well the cost associated with constructing
// tensor contraction mappers and computing loop bounds in gemm_pack_lhs and
// gemm_pack_rhs, so we specify minimum desired block size.
const Index target_block_size = round_up(divup<Index>(k, num_threads));
const Index desired_min_block_size = 12 * packet_size;
const Index block_size = numext::mini<Index>(
k, numext::maxi<Index>(desired_min_block_size, target_block_size));
const Index num_blocks = divup<Index>(k, block_size);
// Compute block size with accounting for potentially incomplete last block.
const auto actual_block_size = [=](Index block_idx) -> Index {
return block_idx + 1 < num_blocks
? block_size
: k + block_size - block_size * num_blocks;
};
// We compute partial gemm results in parallel, and to get the final result
// we need to add them all together. For the large number of threads (>= 48)
// this adds a very expensive sequential step at the end.
//
// We split the [0, num_blocks) into small ranges, and when a task for the
// block finishes its partial gemm computation, it checks if it was the last
// gemm in the range, and if so, it will add all blocks of the range.
//
// After all tasks finihes, we need to add only these pre-aggregated blocks.
// Compute range size with accounting for potentially incomplete last range.
const auto actual_range_size = [=](Index num_ranges, Index range_size,
Index range_idx) -> Index {
eigen_assert(range_idx < num_ranges);
return range_idx + 1 < num_ranges
? range_size
: num_blocks + range_size - range_size * num_ranges;
};
// For now we use just a single level of ranges to compute pre-aggregated
// partial sums, but in general we can use more layers to compute tree
// aggregation in parallel and reduce the size of the sequential step.
//
// TODO(ezhulenev): Add multilevel tree aggregation? Probably will make
// sense only if number of threads >= ~128?
static const Index l0_size = 4;
const Index l0_ranges = divup<Index>(num_blocks, l0_size);
// Keep count of pending gemm tasks for each l0 range.
MaxSizeVector<std::atomic<int>> l0_state(l0_ranges);
for (int i = 0; i < l0_ranges; ++i) {
const Index num_pending_tasks = actual_range_size(l0_ranges, l0_size, i);
l0_state.emplace_back(internal::convert_index<int>(num_pending_tasks));
}
MaxSizeVector<Scalar*> block_buffers(num_blocks);
auto process_block = [&, this](Index block_idx, Index begin, Index end) {
Scalar* buf = block_buffers[block_idx];
::memset(buf, 0, buffer_size_bytes);
TENSOR_CONTRACTION_DISPATCH(
this->template evalGemmPartialWithoutOutputKernel, Alignment,
(buf, begin, end,
/*num_threads=*/internal::convert_index<int>(num_blocks)));
// Check if it was the last task in l0 range.
const Index l0_index = block_idx / l0_size;
const int v = l0_state[l0_index].fetch_sub(1);
eigen_assert(v >= 1);
// If we processed the last block of the range, we can aggregate all
// partial results into the first block of the range.
if (v == 1) {
const Index rng_size = actual_range_size(l0_ranges, l0_size, l0_index);
const Index dst_block_idx = l0_index * l0_size;
if (rng_size == l0_size) {
addAllToBuffer<Alignment>(
m * n,
/*src_buf0=*/block_buffers[dst_block_idx + 1],
/*src_buf1=*/block_buffers[dst_block_idx + 2],
/*src_buf2=*/block_buffers[dst_block_idx + 3],
/*dst_buf= */ block_buffers[dst_block_idx]);
} else {
// Aggregate blocks of potentially incomplete last range.
for (int i = 1; i < rng_size; ++i) {
addToBuffer<Alignment>(m * n,
/*src_buf=*/block_buffers[dst_block_idx + i],
/*dst_buf=*/block_buffers[dst_block_idx]);
}
}
}
};
Barrier barrier(internal::convert_index<int>(num_blocks));
for (Index block_idx = 0; block_idx < num_blocks; ++block_idx) {
Scalar* buf = block_idx == 0
? result
: static_cast<Scalar*>(
this->m_device.allocate(buffer_size_bytes));
block_buffers.push_back(buf);
Index block_start = block_idx * block_size;
Index block_end = block_start + actual_block_size(block_idx);
this->m_device.enqueueNoNotification([=, &barrier, &process_block]() {
process_block(block_idx, block_start, block_end);
barrier.Notify();
});
}
barrier.Wait();
// Aggregate partial sums from l0 ranges.
Index l0_index = 1;
for (; l0_index + 2 < l0_ranges; l0_index += 3) {
addAllToBuffer<Alignment>(
m * n,
/*src_buf0=*/block_buffers[(l0_index + 0) * l0_size],
/*src_buf1=*/block_buffers[(l0_index + 1) * l0_size],
/*src_buf2=*/block_buffers[(l0_index + 2) * l0_size],
/*dst_buf= */block_buffers[0]);
}
for (; l0_index < l0_ranges; ++l0_index) {
addToBuffer<Alignment>(m * n, block_buffers[l0_index * l0_size],
block_buffers[0]);
}
// Don't forget to deallocate ALL temporary buffers.
for (Index i = 1; i < num_blocks; ++i) {
this->m_device.deallocate(block_buffers[i]);
}
// Finally call output kernel with finalized output buffer.
typedef internal::blas_data_mapper<Scalar, Index, ColMajor> OutputMapper;
this->m_output_kernel(OutputMapper(result, m),
this->m_tensor_contraction_params,
static_cast<Eigen::Index>(0),
static_cast<Eigen::Index>(0),
m, n);
}
TensorOpCost contractionCostPerInnerDim(Index m, Index n, Index k) const { TensorOpCost contractionCostPerInnerDim(Index m, Index n, Index k) const {
// Compute cost. // Compute cost.
const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size; const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
@ -1188,7 +1436,6 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
return num_threads; return num_threads;
} }
double computeBandwidth(bool shard_by_col, Index bm, Index bn, double computeBandwidth(bool shard_by_col, Index bm, Index bn,
Index bk) const { Index bk) const {
// Peak VFMA bandwidth is 0.5. However if we have not enough data for // Peak VFMA bandwidth is 0.5. However if we have not enough data for

View File

@ -52,7 +52,7 @@ class Allocator {
// Build a thread pool device on top the an existing pool of threads. // Build a thread pool device on top the an existing pool of threads.
struct ThreadPoolDevice { struct ThreadPoolDevice {
// The ownership of the thread pool remains with the caller. // The ownership of the thread pool remains with the caller.
ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = NULL) ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
: pool_(pool), num_threads_(num_cores), allocator_(allocator) { } : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
@ -234,7 +234,7 @@ struct ThreadPoolDevice {
// Convenience wrapper for parallelFor that does not align blocks. // Convenience wrapper for parallelFor that does not align blocks.
void parallelFor(Index n, const TensorOpCost& cost, void parallelFor(Index n, const TensorOpCost& cost,
std::function<void(Index, Index)> f) const { std::function<void(Index, Index)> f) const {
parallelFor(n, cost, NULL, std::move(f)); parallelFor(n, cost, nullptr, std::move(f));
} }
// WARNING: This function is asynchronous and will not block the calling thread. // WARNING: This function is asynchronous and will not block the calling thread.
@ -248,6 +248,14 @@ struct ThreadPoolDevice {
std::function<Index(Index)> block_align, std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f, std::function<void(Index, Index)> f,
std::function<void()> done) const { std::function<void()> done) const {
// Compute small problems directly in the caller thread.
if (n <= 1 || numThreads() == 1 ||
CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
f(0, n);
done();
return;
}
// Compute block size and total count of blocks. // Compute block size and total count of blocks.
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
@ -269,24 +277,26 @@ struct ThreadPoolDevice {
// Single block or less, execute directly. // Single block or less, execute directly.
ctx->f(firstIdx, lastIdx); ctx->f(firstIdx, lastIdx);
// Call 'done' callback if it was the last block. // Delete async context if it was the last block.
if (ctx->count.fetch_sub(1) == 1) { if (ctx->count.fetch_sub(1) == 1) delete ctx;
(ctx->done)();
// We can't delete ctx right now, because it will deallocate the closure
// we are currently in.
pool_->Schedule([ctx]() { delete ctx; });
}
}; };
// Execute the root in the thread pool. if (block.count <= numThreads()) {
// Avoid a thread hop by running the root of the tree and one block on the
// main thread.
ctx->handle_range(0, n);
} else {
// Execute the root in the thread pool to avoid running work on more than
// numThreads() threads.
pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); }); pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
} }
}
// Convenience wrapper for parallelForAsync that does not align blocks. // Convenience wrapper for parallelForAsync that does not align blocks.
void parallelForAsync(Index n, const TensorOpCost& cost, void parallelForAsync(Index n, const TensorOpCost& cost,
std::function<void(Index, Index)> f, std::function<void(Index, Index)> f,
std::function<void()> done) const { std::function<void()> done) const {
parallelForAsync(n, cost, NULL, std::move(f), std::move(done)); parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
} }
// Thread pool accessor. // Thread pool accessor.
@ -307,6 +317,7 @@ struct ThreadPoolDevice {
: count(block_count), : count(block_count),
f(std::move(block_f)), f(std::move(block_f)),
done(std::move(done_callback)) {} done(std::move(done_callback)) {}
~ParallelForAsyncContext() { done(); }
std::atomic<Index> count; std::atomic<Index> count;
std::function<void(Index, Index)> f; std::function<void(Index, Index)> f;

View File

@ -79,6 +79,15 @@ struct TensorEvaluator
return true; return true;
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType dest, EvalSubExprsCallback done) {
// TODO(ezhulenev): ThreadPoolDevice memcpy is blockign operation.
done(evalSubExprsIfNeeded(dest));
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {} EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoeffReturnType coeff(Index index) const { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoeffReturnType coeff(Index index) const {
@ -247,6 +256,15 @@ struct TensorEvaluator<const Derived, Device>
return true; return true;
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType dest, EvalSubExprsCallback done) {
// TODO(ezhulenev): ThreadPoolDevice memcpy is a blockign operation.
done(evalSubExprsIfNeeded(dest));
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { } EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { }
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoeffReturnType coeff(Index index) const { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoeffReturnType coeff(Index index) const {
@ -346,6 +364,15 @@ struct TensorEvaluator<const TensorCwiseNullaryOp<NullaryOp, ArgType>, Device>
EIGEN_DEVICE_FUNC const Dimensions& dimensions() const { return m_argImpl.dimensions(); } EIGEN_DEVICE_FUNC const Dimensions& dimensions() const { return m_argImpl.dimensions(); }
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType) { return true; } EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType) { return true; }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType, EvalSubExprsCallback done) {
done(true);
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { } EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { }
EIGEN_DEVICE_FUNC CoeffReturnType coeff(Index index) const EIGEN_DEVICE_FUNC CoeffReturnType coeff(Index index) const
@ -425,6 +452,15 @@ struct TensorEvaluator<const TensorCwiseUnaryOp<UnaryOp, ArgType>, Device>
m_argImpl.evalSubExprsIfNeeded(NULL); m_argImpl.evalSubExprsIfNeeded(NULL);
return true; return true;
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType, EvalSubExprsCallback done) {
m_argImpl.evalSubExprsIfNeededAsync(nullptr, [done](bool) { done(true); });
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {
m_argImpl.cleanup(); m_argImpl.cleanup();
} }
@ -546,6 +582,19 @@ struct TensorEvaluator<const TensorCwiseBinaryOp<BinaryOp, LeftArgType, RightArg
m_rightImpl.evalSubExprsIfNeeded(NULL); m_rightImpl.evalSubExprsIfNeeded(NULL);
return true; return true;
} }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType, EvalSubExprsCallback done) {
// TODO(ezhulenev): Evaluate two expression in parallel?
m_leftImpl.evalSubExprsIfNeededAsync(nullptr, [this, done](bool) {
m_rightImpl.evalSubExprsIfNeededAsync(nullptr,
[done](bool) { done(true); });
});
}
#endif // EIGEN_USE_THREADS
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() { EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {
m_leftImpl.cleanup(); m_leftImpl.cleanup();
m_rightImpl.cleanup(); m_rightImpl.cleanup();

View File

@ -430,12 +430,14 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable>
std::function<void()> done) { std::function<void()> done) {
TensorAsyncExecutorContext* const ctx = TensorAsyncExecutorContext* const ctx =
new TensorAsyncExecutorContext(expr, device, std::move(done)); new TensorAsyncExecutorContext(expr, device, std::move(done));
// TODO(ezhulenev): This is a potentially blocking operation. Make it async!
const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); const auto on_eval_subexprs = [ctx, &device](bool need_assign) -> void {
if (!need_assign) {
delete ctx;
return;
}
typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange; typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
if (needs_assign) {
const StorageIndex size = array_prod(ctx->evaluator.dimensions()); const StorageIndex size = array_prod(ctx->evaluator.dimensions());
device.parallelForAsync( device.parallelForAsync(
size, ctx->evaluator.costPerCoeff(Vectorizable), size, ctx->evaluator.costPerCoeff(Vectorizable),
@ -444,7 +446,9 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable>
EvalRange::run(&ctx->evaluator, firstIdx, lastIdx); EvalRange::run(&ctx->evaluator, firstIdx, lastIdx);
}, },
[ctx]() { delete ctx; }); [ctx]() { delete ctx; });
} };
ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs);
} }
private: private:
@ -496,26 +500,32 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable
return; return;
} }
// TODO(ezhulenev): This is a potentially blocking operation. Make it async! const auto on_eval_subexprs = [ctx, &device](bool need_assign) -> void {
const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr); if (!need_assign) {
delete ctx;
return;
}
if (needs_assign) {
ctx->tiling = ctx->tiling =
internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper, GetTensorExecutorTilingContext<Evaluator, TensorBlockMapper,
Vectorizable>(device, ctx->evaluator); Vectorizable>(device, ctx->evaluator);
device.parallelForAsync( device.parallelForAsync(
ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost, ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost,
[ctx](StorageIndex firstIdx, StorageIndex lastIdx) { [ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
ScalarNoConst* thread_buf = ScalarNoConst* thread_buf =
ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(ctx->device); ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(
ctx->device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) { for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf); auto block =
ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf);
ctx->evaluator.evalBlock(&block); ctx->evaluator.evalBlock(&block);
} }
}, },
[ctx]() { delete ctx; }); [ctx]() { delete ctx; });
} };
ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs);
} }
private: private:

View File

@ -25,6 +25,9 @@ class Barrier {
void Notify() { void Notify() {
unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
if (v != 1) { if (v != 1) {
// Clear the lowest bit (waiter flag) and check that the original state
// value was not zero. If it was zero, it means that notify was called
// more times than the original count.
eigen_plain_assert(((v + 2) & ~1) != 0); eigen_plain_assert(((v + 2) & ~1) != 0);
return; // either count has not dropped to 0, or waiter is not waiting return; // either count has not dropped to 0, or waiter is not waiting
} }

View File

@ -330,6 +330,52 @@ static void test_multithread_contraction_with_output_kernel() {
} }
} }
template<int DataLayout>
void test_async_multithread_contraction_agrees_with_singlethread()
{
int contract_size = internal::random<int>(100, 500);
Tensor<float, 3, DataLayout> left(internal::random<int>(10, 40),
contract_size,
internal::random<int>(10, 40));
Tensor<float, 4, DataLayout> right(
internal::random<int>(1, 20), internal::random<int>(1, 20), contract_size,
internal::random<int>(1, 20));
left.setRandom();
right.setRandom();
// add constants to shift values away from 0 for more precision
left += left.constant(1.5f);
right += right.constant(1.5f);
typedef Tensor<float, 1>::DimensionPair DimPair;
Eigen::array<DimPair, 1> dims({{DimPair(1, 2)}});
Eigen::ThreadPool tp(internal::random<int>(2, 11));
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(8, 32));
Tensor<float, 5, DataLayout> st_result;
st_result = left.contract(right, dims);
Tensor<float, 5, DataLayout> tp_result(st_result.dimensions());
Eigen::Barrier barrier(1);
tp_result.device(thread_pool_device, [&barrier]() { barrier.Notify(); }) =
left.contract(right, dims);
barrier.Wait();
VERIFY(dimensions_match(st_result.dimensions(), tp_result.dimensions()));
for (ptrdiff_t i = 0; i < st_result.size(); i++) {
// if both of the values are very small, then do nothing (because the test
// will fail due to numerical precision issues when values are small)
if (numext::abs(st_result.data()[i] - tp_result.data()[i]) >= 1e-4f) {
VERIFY_IS_APPROX(st_result.data()[i], tp_result.data()[i]);
}
}
}
// We are triggering 'evalShardedByInnerDim' optimization. // We are triggering 'evalShardedByInnerDim' optimization.
template <int DataLayout> template <int DataLayout>
static void test_sharded_by_inner_dim_contraction() static void test_sharded_by_inner_dim_contraction()
@ -410,6 +456,93 @@ static void test_sharded_by_inner_dim_contraction_with_output_kernel()
} }
} }
// We are triggering 'evalShardedByInnerDim' optimization.
template <int DataLayout>
static void test_async_sharded_by_inner_dim_contraction()
{
typedef Tensor<float, 1>::DimensionPair DimPair;
const int num_threads = internal::random<int>(4, 16);
ThreadPool threads(num_threads);
Eigen::ThreadPoolDevice device(&threads, num_threads);
Tensor<float, 2, DataLayout> t_left(2, 10000);
Tensor<float, 2, DataLayout> t_right(10000, 10);
Tensor<float, 2, DataLayout> t_result(2, 10);
t_left.setRandom();
t_right.setRandom();
// Put trash in t_result to verify contraction clears output memory.
t_result.setRandom();
// Add a little offset so that the results won't be close to zero.
t_left += t_left.constant(1.0f);
t_right += t_right.constant(1.0f);
typedef Map<Eigen::Matrix<float, Dynamic, Dynamic, DataLayout>> MapXf;
MapXf m_left(t_left.data(), 2, 10000);
MapXf m_right(t_right.data(), 10000, 10);
Eigen::Matrix<float, Dynamic, Dynamic, DataLayout> m_result(2, 10);
// this contraction should be equivalent to a single matrix multiplication
Eigen::array<DimPair, 1> dims({{DimPair(1, 0)}});
// compute results by separate methods
Eigen::Barrier barrier(1);
t_result.device(device, [&barrier]() { barrier.Notify(); }) =
t_left.contract(t_right, dims);
barrier.Wait();
m_result = m_left * m_right;
for (Index i = 0; i < t_result.dimensions().TotalSize(); i++) {
VERIFY_IS_APPROX(t_result.data()[i], m_result.data()[i]);
}
}
// We are triggering 'evalShardedByInnerDim' optimization with output kernel.
template <int DataLayout>
static void test_async_sharded_by_inner_dim_contraction_with_output_kernel()
{
typedef Tensor<float, 1>::DimensionPair DimPair;
const int num_threads = internal::random<int>(4, 16);
ThreadPool threads(num_threads);
Eigen::ThreadPoolDevice device(&threads, num_threads);
Tensor<float, 2, DataLayout> t_left(2, 10000);
Tensor<float, 2, DataLayout> t_right(10000, 10);
Tensor<float, 2, DataLayout> t_result(2, 10);
t_left.setRandom();
t_right.setRandom();
// Put trash in t_result to verify contraction clears output memory.
t_result.setRandom();
// Add a little offset so that the results won't be close to zero.
t_left += t_left.constant(1.0f);
t_right += t_right.constant(1.0f);
typedef Map<Eigen::Matrix<float, Dynamic, Dynamic, DataLayout>> MapXf;
MapXf m_left(t_left.data(), 2, 10000);
MapXf m_right(t_right.data(), 10000, 10);
Eigen::Matrix<float, Dynamic, Dynamic, DataLayout> m_result(2, 10);
// this contraction should be equivalent to a single matrix multiplication
Eigen::array<DimPair, 1> dims({{DimPair(1, 0)}});
// compute results by separate methods
Eigen::Barrier barrier(1);
t_result.device(device, [&barrier]() { barrier.Notify(); }) =
t_left.contract(t_right, dims, SqrtOutputKernel());
barrier.Wait();
m_result = m_left * m_right;
for (Index i = 0; i < t_result.dimensions().TotalSize(); i++) {
VERIFY_IS_APPROX(t_result.data()[i], std::sqrt(m_result.data()[i]));
}
}
template<int DataLayout> template<int DataLayout>
void test_full_contraction() { void test_full_contraction() {
int contract_size1 = internal::random<int>(1, 500); int contract_size1 = internal::random<int>(1, 500);
@ -550,11 +683,18 @@ EIGEN_DECLARE_TEST(cxx11_tensor_thread_pool)
CALL_SUBTEST_3(test_multithread_contraction_agrees_with_singlethread<RowMajor>()); CALL_SUBTEST_3(test_multithread_contraction_agrees_with_singlethread<RowMajor>());
CALL_SUBTEST_3(test_multithread_contraction_with_output_kernel<ColMajor>()); CALL_SUBTEST_3(test_multithread_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_3(test_multithread_contraction_with_output_kernel<RowMajor>()); CALL_SUBTEST_3(test_multithread_contraction_with_output_kernel<RowMajor>());
CALL_SUBTEST_3(test_async_multithread_contraction_agrees_with_singlethread<ColMajor>());
CALL_SUBTEST_3(test_async_multithread_contraction_agrees_with_singlethread<RowMajor>());
// Test EvalShardedByInnerDimContext parallelization strategy.
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction<ColMajor>()); CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction<ColMajor>());
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction<RowMajor>()); CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction<RowMajor>());
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>()); CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>()); CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction<ColMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction<RowMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>());
// Exercise various cases that have been problematic in the past. // Exercise various cases that have been problematic in the past.
CALL_SUBTEST_5(test_contraction_corner_cases<ColMajor>()); CALL_SUBTEST_5(test_contraction_corner_cases<ColMajor>());