Don't do parallel_pack if we can use thread_local memory in tensor contractions

This commit is contained in:
Eugene Zhulenev 2019-02-07 09:21:25 -08:00
parent 013cc3a6b3
commit 59998117bb

View File

@ -208,6 +208,23 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
Index nm = divup(nm0, gm); Index nm = divup(nm0, gm);
Index nn = divup(nn0, gn); Index nn = divup(nn0, gn);
// If there is enough concurrency in the sharding dimension, we choose not
// to paralellize by the other dimension, and execute all kernels in sync
// mode. This reduces parallelism from the nm x nn down to nn
// (shard_by_col==true) or nm (shard_by_col==false).
const Index sharding_dim_tasks = shard_by_col ? nn : nm;
const int num_worker_threads = this->m_device.numThreadsInPool();
// With small number of threads we want to make sure that we do not reduce
// parallelism too much.
const int oversharding_factor =
num_worker_threads <= 4 ? 8 :
num_worker_threads <= 8 ? 4 :
num_worker_threads <= 16 ? 2 : 1;
const bool parallelize_by_sharding_dim_only =
sharding_dim_tasks >= oversharding_factor * num_worker_threads;
// Last by not least, decide whether we want to issue both lhs and rhs // Last by not least, decide whether we want to issue both lhs and rhs
// packing in parallel; or issue lhs packing first, and then issue rhs // packing in parallel; or issue lhs packing first, and then issue rhs
// packing when lhs packing completes (for !shard_by_col lhs and rhs are // packing when lhs packing completes (for !shard_by_col lhs and rhs are
@ -223,10 +240,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// But don't do it if we will use each rhs only once. Locality seems to be // But don't do it if we will use each rhs only once. Locality seems to be
// more important in this case. // more important in this case.
if ((shard_by_col ? nm : nn) == 1) parallel_pack = false; if ((shard_by_col ? nm : nn) == 1) parallel_pack = false;
// Also don't get in the way of parallelize_by_sharding_dim_only
// optimization.
if (parallelize_by_sharding_dim_only) parallel_pack = false;
#define CONTEXT_ARGS \ #define CONTEXT_ARGS \
(this, num_threads, buffer, m, n, k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, \ (this, num_threads, buffer, m, n, k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, \
nn0, shard_by_col, parallel_pack) \ nn0, shard_by_col, parallel_pack, parallelize_by_sharding_dim_only) \
.run() .run()
TENSOR_CONTRACTION_DISPATCH(Context, Alignment, CONTEXT_ARGS); TENSOR_CONTRACTION_DISPATCH(Context, Alignment, CONTEXT_ARGS);
@ -260,7 +280,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
Context(const Self* self, int num_threads, Scalar* buffer, Index tm, Index tn, Context(const Self* self, int num_threads, Scalar* buffer, Index tm, Index tn,
Index tk, Index bm, Index bn, Index bk, Index nm, Index nn, Index nk, Index tk, Index bm, Index bn, Index bk, Index nm, Index nn, Index nk,
Index gm, Index gn, Index nm0, Index nn0, bool shard_by_col, Index gm, Index gn, Index nm0, Index nn0, bool shard_by_col,
bool parallel_pack) bool parallel_pack, bool parallelize_by_sharding_dim_only)
: device_(self->m_device), : device_(self->m_device),
lhs_(self->m_leftImpl, self->m_left_nocontract_strides, lhs_(self->m_leftImpl, self->m_left_nocontract_strides,
self->m_i_strides, self->m_left_contracting_strides, self->m_i_strides, self->m_left_contracting_strides,
@ -275,6 +295,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
num_threads_(num_threads), num_threads_(num_threads),
shard_by_col_(shard_by_col), shard_by_col_(shard_by_col),
parallel_pack_(parallel_pack), parallel_pack_(parallel_pack),
parallelize_by_sharding_dim_only_(parallelize_by_sharding_dim_only),
m_(tm), m_(tm),
n_(tn), n_(tn),
k_(tk), k_(tk),
@ -289,6 +310,9 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
nm0_(nm0), nm0_(nm0),
nn0_(nn0) nn0_(nn0)
{ {
// These two options are mutually exclusive.
eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only));
for (Index x = 0; x < P; x++) { for (Index x = 0; x < P; x++) {
// Normal number of notifications for k slice switch is // Normal number of notifications for k slice switch is
// nm_ + nn_ + nm_ * nn_. However, first P - 1 slices will receive only // nm_ + nn_ + nm_ * nn_. However, first P - 1 slices will receive only
@ -336,22 +360,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
} }
} }
// If there is enough available parallelism in sharding dimension we can if (parallelize_by_sharding_dim_only_) {
// call kernels in sync mode and use thread local memory for packed data. const int num_worker_threads = device_.numThreadsInPool();
const Index sharding_dim_tasks = shard_by_col ? nn : nm;
const int num_worker_threads = device_.numThreadsInPool();
// With small number of threads we want to make sure that we do not reduce
// parallelism too much.
const int oversharding_factor =
num_worker_threads <= 4 ? 8 :
num_worker_threads <= 8 ? 4 :
num_worker_threads <= 16 ? 2 : 1;
if (!parallel_pack_ &&
sharding_dim_tasks >= oversharding_factor * num_worker_threads) {
parallelize_by_sharding_dim_only_ = true;
if (shard_by_col) { if (shard_by_col) {
can_use_thread_local_packed_ = new std::atomic<bool>[nn_]; can_use_thread_local_packed_ = new std::atomic<bool>[nn_];
@ -422,6 +432,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const int num_threads_; const int num_threads_;
const bool shard_by_col_; const bool shard_by_col_;
const bool parallel_pack_; const bool parallel_pack_;
const bool parallelize_by_sharding_dim_only_;
// Matrix sizes. // Matrix sizes.
const Index m_; const Index m_;
const Index n_; const Index n_;
@ -481,12 +492,6 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::vector<LhsScalar*> packed_lhs_[P - 1]; std::vector<LhsScalar*> packed_lhs_[P - 1];
std::vector<RhsScalar*> packed_rhs_[P - 1]; std::vector<RhsScalar*> packed_rhs_[P - 1];
// If there is enough concurrency in the sharding dimension, we choose not
// to paralellize by the other dimension, and execute all kernels in sync
// mode. This reduces parallelism from the nm_ x nn_ down to nn_
// (shard_by_col==true) or nm_ (shard_by_col==false).
bool parallelize_by_sharding_dim_only_ = false;
// If we choose to parallelize only by the sharding dimension, each thread // If we choose to parallelize only by the sharding dimension, each thread
// will have it's own "thead local" (not a c++ thread local storage) memory // will have it's own "thead local" (not a c++ thread local storage) memory
// for packed_lhs or packed_rhs (shard_by_col = false of true). This memory // for packed_lhs or packed_rhs (shard_by_col = false of true). This memory