Pulled latest updates from trunk.

This commit is contained in:
Benoit Steiner 2016-05-17 07:21:48 -07:00
commit 86da77cb9b
8 changed files with 813 additions and 14 deletions

View File

@ -1,4 +1,6 @@
Each benchmark comes in 2 flavors: one that runs on CPU, and one that runs on GPU. The tensor benchmark suite is made of several parts.
The first part is a generic suite, in which each benchmark comes in 2 flavors: one that runs on CPU, and one that runs on GPU.
To compile the floating point CPU benchmarks, simply call: To compile the floating point CPU benchmarks, simply call:
g++ tensor_benchmarks_cpu.cc benchmark_main.cc -I ../../ -std=c++11 -O3 -DNDEBUG -pthread -mavx -o benchmarks_cpu g++ tensor_benchmarks_cpu.cc benchmark_main.cc -I ../../ -std=c++11 -O3 -DNDEBUG -pthread -mavx -o benchmarks_cpu
@ -6,7 +8,8 @@ g++ tensor_benchmarks_cpu.cc benchmark_main.cc -I ../../ -std=c++11 -O3 -DNDEBUG
To compile the floating point GPU benchmarks, simply call: To compile the floating point GPU benchmarks, simply call:
nvcc tensor_benchmarks_gpu.cu benchmark_main.cc -I ../../ -std=c++11 -O2 -DNDEBUG -arch compute_35 -o benchmarks_gpu nvcc tensor_benchmarks_gpu.cu benchmark_main.cc -I ../../ -std=c++11 -O2 -DNDEBUG -arch compute_35 -o benchmarks_gpu
We also provide a version of the generic GPU tensor benchmarks that uses half floats (aka fp16) instead of regular floats. To compile these benchmarks, simply call the command line below. You'll need a recent GPU that supports compute capability 5.3 or higher to run them and nvcc 7.5 or higher to compile the code.
To compile the half float GPU benchmarks, simply call the command line below. You'll need a recent GPU that supports compute capability 5.3 or higher to run them and nvcc 7.5 or higher to compile the code.
nvcc tensor_benchmarks_fp16_gpu.cu benchmark_main.cc -I ../../ -std=c++11 -O2 -DNDEBUG -arch compute_53 -o benchmarks_fp16_gpu nvcc tensor_benchmarks_fp16_gpu.cu benchmark_main.cc -I ../../ -std=c++11 -O2 -DNDEBUG -arch compute_53 -o benchmarks_fp16_gpu
last but not least, we also provide a suite of benchmarks to measure the scalability of the contraction code on CPU. To compile these benchmarks, call
g++ contraction_benchmarks_cpu.cc benchmark_main.cc -I ../../ -std=c++11 -O3 -DNDEBUG -pthread -mavx -o benchmarks_cpu

View File

@ -0,0 +1,39 @@
#define EIGEN_USE_THREADS
#include <string>
#include "tensor_benchmarks.h"
#define CREATE_THREAD_POOL(threads) \
Eigen::ThreadPool pool(threads); \
Eigen::ThreadPoolDevice device(&pool, threads);
// Contractions for number of threads ranging from 1 to 32
// Dimensions are Rows, Cols, Depth
#define BM_ContractionCPU(D1, D2, D3) \
static void BM_##Contraction##_##D1##x##D2##x##D3(int iters, int Threads) { \
StopBenchmarkTiming(); \
CREATE_THREAD_POOL(Threads); \
BenchmarkSuite<Eigen::ThreadPoolDevice, float> suite(device, D1, D2, D3); \
suite.contraction(iters); \
} \
BENCHMARK_RANGE(BM_##Contraction##_##D1##x##D2##x##D3, 1, 32);
// Vector Matrix and Matrix Vector products
BM_ContractionCPU(1, 2000, 500);
BM_ContractionCPU(2000, 1, 500);
// Various skinny matrices
BM_ContractionCPU(250, 3, 512);
BM_ContractionCPU(1500, 3, 512);
BM_ContractionCPU(512, 800, 4);
BM_ContractionCPU(512, 80, 800);
BM_ContractionCPU(512, 80, 13522);
BM_ContractionCPU(1, 80, 13522);
BM_ContractionCPU(3200, 512, 4);
BM_ContractionCPU(3200, 512, 80);
BM_ContractionCPU(3200, 80, 512);

View File

@ -254,6 +254,14 @@ struct TensorEvaluator<const TensorTupleReducerOp<ReduceOp, Dims, ArgType>, Devi
EIGEN_DEVICE_FUNC Scalar* data() const { return NULL; } EIGEN_DEVICE_FUNC Scalar* data() const { return NULL; }
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE TensorOpCost
costPerCoeff(bool vectorized) const {
const double compute_cost = 1.0 +
(m_return_dim < 0 ? 0.0 : (TensorOpCost::ModCost<Index>() + TensorOpCost::DivCost<Index>()));
return m_orig_impl.costPerCoeff(vectorized) +
m_impl.costPerCoeff(vectorized) + TensorOpCost(0, 0, compute_cost);
}
private: private:
EIGEN_DEVICE_FUNC void gen_strides(const InputDimensions& dims, StrideDims& strides) { EIGEN_DEVICE_FUNC void gen_strides(const InputDimensions& dims, StrideDims& strides) {
if (m_return_dim < 0) { if (m_return_dim < 0) {

View File

@ -354,11 +354,11 @@ struct TensorEvaluator<const TensorBroadcastingOp<Broadcast, ArgType>, Device>
if (NumDims > 0) { if (NumDims > 0) {
for (int i = NumDims - 1; i > 0; --i) { for (int i = NumDims - 1; i > 0; --i) {
compute_cost += TensorOpCost::DivCost<Index>(); compute_cost += TensorOpCost::DivCost<Index>();
if (internal::index_statically_eq<Broadcast>()(i, 1)) { if (internal::index_statically_eq<Broadcast>(i, 1)) {
compute_cost += compute_cost +=
TensorOpCost::MulCost<Index>() + TensorOpCost::AddCost<Index>(); TensorOpCost::MulCost<Index>() + TensorOpCost::AddCost<Index>();
} else { } else {
if (!internal::index_statically_eq<InputDimensions>()(i, 1)) { if (!internal::index_statically_eq<InputDimensions>(i, 1)) {
compute_cost += TensorOpCost::MulCost<Index>() + compute_cost += TensorOpCost::MulCost<Index>() +
TensorOpCost::ModCost<Index>() + TensorOpCost::ModCost<Index>() +
TensorOpCost::AddCost<Index>(); TensorOpCost::AddCost<Index>();

View File

@ -14,6 +14,8 @@
#ifdef EIGEN_USE_THREADS #ifdef EIGEN_USE_THREADS
namespace Eigen { namespace Eigen {
#ifdef EIGEN_USE_SIMPLE_THREAD_POOL
namespace internal { namespace internal {
template<typename LhsScalar, typename LhsMapper, typename Index> template<typename LhsScalar, typename LhsMapper, typename Index>
@ -52,7 +54,7 @@ struct packRhsAndKernelArg {
}; };
} // end namespace internal } // end namespace internal
#endif // EIGEN_USE_SIMPLE_THREAD_POOL
template<typename Indices, typename LeftArgType, typename RightArgType> template<typename Indices, typename LeftArgType, typename RightArgType>
struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType>, ThreadPoolDevice> : struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgType>, ThreadPoolDevice> :
@ -110,6 +112,627 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
TensorEvaluator(const XprType& op, const Device& device) : TensorEvaluator(const XprType& op, const Device& device) :
Base(op, device) {} Base(op, device) {}
#ifndef EIGEN_USE_SIMPLE_THREAD_POOL
template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous,
bool rhs_inner_dim_reordered, int Alignment>
void evalProduct(Scalar* buffer) const {
typedef
typename internal::remove_const<typename EvalLeftArgType::Scalar>::type
LhsScalar;
typedef
typename internal::remove_const<typename EvalRightArgType::Scalar>::type
RhsScalar;
typedef typename internal::gebp_traits<LhsScalar, RhsScalar> Traits;
typedef TensorEvaluator<EvalLeftArgType, Device> LeftEvaluator;
typedef TensorEvaluator<EvalRightArgType, Device> RightEvaluator;
typedef internal::TensorContractionInputMapper<
LhsScalar, Index, internal::Lhs, LeftEvaluator, left_nocontract_t,
contract_t, internal::packet_traits<LhsScalar>::size,
lhs_inner_dim_contiguous, false, Unaligned>
LhsMapper;
typedef internal::TensorContractionInputMapper<
RhsScalar, Index, internal::Rhs, RightEvaluator, right_nocontract_t,
contract_t, internal::packet_traits<RhsScalar>::size,
rhs_inner_dim_contiguous, rhs_inner_dim_reordered, Unaligned>
RhsMapper;
typedef internal::blas_data_mapper<Scalar, Index, ColMajor> OutputMapper;
typedef internal::gemm_pack_lhs<LhsScalar, Index,
typename LhsMapper::SubMapper, Traits::mr,
Traits::LhsProgress, ColMajor>
LhsPacker;
typedef internal::gemm_pack_rhs<
RhsScalar, Index, typename RhsMapper::SubMapper, Traits::nr, ColMajor>
RhsPacker;
typedef internal::gebp_kernel<LhsScalar, RhsScalar, Index, OutputMapper,
Traits::mr, Traits::nr, false, false>
GebpKernel;
const Index m = this->m_i_size;
const Index n = this->m_j_size;
const Index k = this->m_k_size;
if (m == 0 || n == 0 || k == 0) return;
// Compute a set of algorithm parameters:
// - kernel block sizes (bm, bn, bk)
// - task grain sizes (number of kernels executed per task: gm, gn)
// - number of threads
// - sharding by row/column
// - parallel packing or first lhs then rhs
// and some derived parameters:
// - number of tasks (nm, nn, nk)
// - number of kernels (nm0, nn0)
// Unfortunately, all these parameters are tightly interdependent.
// So in some cases we first compute approximate values, then compute other
// values based on these approximations and then refine the approximations.
// There are lots of heuristics here. There is some reasoning behind them,
// but ultimately they are just tuned on contraction benchmarks for
// different input configurations, thread counts and instruction sets.
// So feel free to question any of them.
// Compute whether we want to shard by row or by column.
// This is a first approximation, it will be refined later. Since we don't
// know number of threads yet we use 2, because what's we are most
// interested in at this point is whether it makes sense to use
// parallelization at all or not.
bool shard_by_col = shardByCol(m, n, 2);
// First approximation of kernel blocking sizes.
// Again, we don't know number of threads yet, so we use 2.
Index bm, bn, bk;
if (shard_by_col) {
internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index,
internal::ShardByCol>
blocking(k, m, n, 2);
bm = blocking.mc();
bn = blocking.nc();
bk = blocking.kc();
} else {
internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index,
internal::ShardByRow>
blocking(k, m, n, 2);
bm = blocking.mc();
bn = blocking.nc();
bk = blocking.kc();
}
// Compute optimal number of threads.
// Note: we use bk instead of k here because we are interested in amount of
// _parallelizable_ computations, and computations are not parallelizable
// across k dimension.
const TensorOpCost cost =
contractionCost(m, n, bm, bn, bk, shard_by_col, false);
Index num_threads = TensorCostModel<ThreadPoolDevice>::numThreads(
static_cast<double>(n) * m, cost, this->m_device.numThreads());
// TODO(dvyukov): this is a stop-gap to prevent regressions while the cost
// model is not tuned. Remove this when the cost model is tuned.
if (n == 1) num_threads = 1;
if (num_threads == 1) {
// The single-threaded algorithm should be faster in this case.
if (n == 1)
this->template evalGemv<lhs_inner_dim_contiguous,
rhs_inner_dim_contiguous,
rhs_inner_dim_reordered, Alignment>(buffer);
else
this->template evalGemm<lhs_inner_dim_contiguous,
rhs_inner_dim_contiguous,
rhs_inner_dim_reordered, Alignment>(buffer);
return;
}
// Now that we know number of threads, recalculate sharding and blocking.
shard_by_col = shardByCol(m, n, num_threads);
if (shard_by_col) {
internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index,
internal::ShardByCol>
blocking(k, m, n, num_threads);
bm = blocking.mc();
bn = blocking.nc();
bk = blocking.kc();
} else {
internal::TensorContractionBlocking<LhsMapper, RhsMapper, Index,
internal::ShardByRow>
blocking(k, m, n, num_threads);
bm = blocking.mc();
bn = blocking.nc();
bk = blocking.kc();
}
// Number of kernels for each dimension.
Index nm0 = divup(m, bm);
Index nn0 = divup(n, bn);
Index nk = divup(k, bk);
// Calculate task grain size (number of kernels executed per task).
// This task size coarsening serves two purposes:
// 1. It reduces per-task overheads including synchronization overheads.
// 2. It allows to use caches better (reuse the same packed rhs in several
// consecutive kernels).
Index gm = 1;
Index gn = 1;
// If we are sharding by column, then we prefer to reduce rows first.
if (shard_by_col) {
gm = coarsenM(m, n, bm, bn, bk, gn, num_threads, shard_by_col);
gn = coarsenN(m, n, bm, bn, bk, gm, num_threads, shard_by_col);
} else {
gn = coarsenN(m, n, bm, bn, bk, gm, num_threads, shard_by_col);
gm = coarsenM(m, n, bm, bn, bk, gn, num_threads, shard_by_col);
}
// Number of tasks in each dimension.
Index nm = divup(nm0, gm);
Index nn = divup(nn0, gn);
// Last by not least, decide whether we want to issue both lhs and rhs
// packing in parallel; or issue lhs packing first, and then issue rhs
// packing when lhs packing completes (for !shard_by_col lhs and rhs are
// swapped). Parallel packing allows more parallelism (for both packing and
// kernels), while sequential packing provides better locality (once
// a thread finishes rhs packing it proceed to kernels with that rhs).
// First, we are interested in parallel packing if there are few tasks.
bool parallel_pack = num_threads >= nm * nn;
// Also do parallel packing if all data fits into L2$.
if (m * bk * sizeof(LhsScalar) + n * bk * sizeof(RhsScalar) <=
l2CacheSize() * num_threads)
parallel_pack = true;
// But don't do it if we will use each rhs only once. Locality seems to be
// more important in this case.
if ((shard_by_col ? nm : nn) == 1) parallel_pack = false;
LhsMapper lhs(this->m_leftImpl, this->m_left_nocontract_strides,
this->m_i_strides, this->m_left_contracting_strides,
this->m_k_strides);
RhsMapper rhs(this->m_rightImpl, this->m_right_nocontract_strides,
this->m_j_strides, this->m_right_contracting_strides,
this->m_k_strides);
Context<LhsPacker, RhsPacker, GebpKernel, LhsMapper, RhsMapper,
OutputMapper>(this->m_device, num_threads, lhs, rhs, buffer, m, n,
k, bm, bn, bk, nm, nn, nk, gm, gn, nm0, nn0,
shard_by_col, parallel_pack)
.run();
}
// Context coordinates a single parallel gemm operation.
template <typename LhsPacker, typename RhsPacker, typename GebpKernel,
typename LhsMapper, typename RhsMapper, typename OutputMapper>
class Context {
public:
Context(const Device& device, int num_threads, LhsMapper& lhs,
RhsMapper& rhs, Scalar* buffer, Index m, Index n, Index k, Index bm,
Index bn, Index bk, Index nm, Index nn, Index nk, Index gm,
Index gn, Index nm0, Index nn0, bool shard_by_col,
bool parallel_pack)
: device_(device),
lhs_(lhs),
rhs_(rhs),
buffer_(buffer),
output_(buffer, m),
num_threads_(num_threads),
shard_by_col_(shard_by_col),
parallel_pack_(parallel_pack),
m_(m),
n_(n),
k_(k),
bm_(bm),
bn_(bn),
bk_(bk),
nm_(nm),
nn_(nn),
nk_(nk),
gm_(gm),
gn_(gn),
nm0_(nm0),
nn0_(nn0)
{
for (Index x = 0; x < P; x++) {
// Normal number of notifications for k slice switch is
// nm_ + nn_ + nm_ * nn_. However, first P - 1 slices will receive only
// nm_ + nn_ notifications, because they will not receive notifications
// from preceeding kernels.
state_switch_[x] =
x == 0
? 1
: (parallel_pack_ ? nn_ + nm_ : (shard_by_col_ ? nn_ : nm_)) +
(x == P - 1 ? nm_ * nn_ : 0);
state_packing_ready_[x] =
parallel_pack_ ? 0 : (shard_by_col_ ? nm_ : nn_);
state_kernel_[x] = new std::atomic<uint8_t>*[nm_];
for (Index m = 0; m < nm_; m++) {
state_kernel_[x][m] = new std::atomic<uint8_t>[nn_];
// Kernels generally receive 3 notifications (previous kernel + 2
// packing), but the first slice won't get notifications from previous
// kernels.
for (Index n = 0; n < nn_; n++)
state_kernel_[x][m][n].store(
(x == 0 ? 0 : 1) + (parallel_pack_ ? 2 : 1),
std::memory_order_relaxed);
}
}
// Allocate memory for packed rhs/lhs matrices.
size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
size_t lhs_size =
divup<size_t>(bm_ * bk_ * sizeof(LhsScalar), align) * align;
size_t rhs_size =
divup<size_t>(bn_ * bk_ * sizeof(RhsScalar), align) * align;
packed_mem_ = static_cast<char*>(internal::aligned_malloc(
(nm0_ * lhs_size + nn0_ * rhs_size) * std::min<size_t>(nk_, P - 1)));
char* mem = static_cast<char*>(packed_mem_);
for (Index x = 0; x < numext::mini<size_t>(nk_, P - 1); x++) {
packed_lhs_[x].resize(nm0_);
for (Index m = 0; m < nm0_; m++) {
packed_lhs_[x][m] = reinterpret_cast<LhsScalar*>(mem);
mem += lhs_size;
}
packed_rhs_[x].resize(nn0_);
for (Index n = 0; n < nn0_; n++) {
packed_rhs_[x][n] = reinterpret_cast<RhsScalar*>(mem);
mem += rhs_size;
}
}
}
~Context() {
for (Index x = 0; x < P; x++) {
for (Index m = 0; m < nm_; m++) delete[] state_kernel_[x][m];
delete[] state_kernel_[x];
}
internal::aligned_free(packed_mem_);
}
void run() {
// Kick off packing of the first slice.
signal_switch(0, 1);
// Wait for overall completion.
// TODO(dvyukov): this wait can lead to deadlock.
// If nthreads contractions are concurrently submitted from worker
// threads, this wait will block all worker threads and the system will
// deadlock.
done_.Wait();
}
private:
Notification done_;
const Device& device_;
LhsMapper& lhs_;
RhsMapper& rhs_;
Scalar* const buffer_;
OutputMapper output_;
const int num_threads_;
const bool shard_by_col_;
const bool parallel_pack_;
// Matrix sizes.
const Index m_;
const Index n_;
const Index k_;
// Block sizes.
const Index bm_;
const Index bn_;
const Index bk_;
// Number of tasks.
const Index nm_;
const Index nn_;
const Index nk_;
// Task grain sizes (number of kernels executed per task).
const Index gm_;
const Index gn_;
// Number of blocks (this is different from ni_/nn_ because of task size
// coarsening).
const Index nm0_;
const Index nn0_;
// Parallelization strategy.
//
// Blocks related to the same k block can run in parallel because they write
// to different output blocks. So we parallelize within k slices, this
// gives us parallelism level of m x n. Before we can start any kernels
// related to k-th slice, we need to issue m lhs packing tasks and n rhs
// packing tasks.
//
// However, there is a bottleneck when we are finishing kernels for k-th
// slice (at the very end there is only 1 runnable kernel). To mitigate this
// bottleneck we allow kernels from k-th and k+1-th slices to run in
// parallel. Note that (m, n, k) and (m, n, k+1) kernels write to the same
// output block, so they must not run in parallel.
//
// This gives us the following dependency graph.
// On each k slice we have m x n kernel tasks, m lhs paking tasks and n rhs
// packing tasks.
// Kernel (m, n, k) can start when:
// - kernel (m, n, k-1) has finished
// - lhs packing (m, k) has finished
// - rhs packing (n, k) has finished
// Lhs/rhs packing can start when:
// - all k-1 packing has finished (artificially imposed to limit amount of
// parallel packing)
//
// On top of that we limit runnable tasks to two consecutive k slices.
// This is done to limit amount of memory we need for packed lhs/rhs
// (for each k slice we need m*bk + n*bk memory in packed_lhs_/packed_rhs_).
//
// state_switch_ tracks when we are ready to switch to the next k slice.
// state_kernel_[m][n] tracks when we are ready to kick off kernel (m, n).
// These variable are rolling over 3 consecutive k slices: first two we are
// actively executing + one to track completion of kernels in the second
// slice.
static const Index P = 3;
void* packed_mem_;
std::vector<LhsScalar*> packed_lhs_[P - 1];
std::vector<RhsScalar*> packed_rhs_[P - 1];
std::atomic<uint8_t>** state_kernel_[P];
// state_switch_ is frequently modified by worker threads, while other
// fields are read-only after constructor. Let's move it to a separate cache
// line to reduce cache-coherency traffic.
char pad_[128];
std::atomic<Index> state_packing_ready_[P];
std::atomic<Index> state_switch_[P];
void pack_lhs(Index m, Index k) {
const Index mend = m * gm_ + gm(m);
for (Index m1 = m * gm_; m1 < mend; m1++)
LhsPacker()(packed_lhs_[k % (P - 1)][m1],
lhs_.getSubMapper(m1 * bm_, k * bk_), bk(k), bm(m1));
if (!parallel_pack_ && shard_by_col_) {
signal_packing(k);
} else {
signal_switch(k + 1);
for (Index n = nn_ - 1; n >= 0; n--) signal_kernel(m, n, k, n == 0);
}
}
void pack_rhs(Index n, Index k) {
const Index nend = n * gn_ + gn(n);
for (Index n1 = n * gn_; n1 < nend; n1++) {
if (k == 0) {
// Zero the output memory in parallel.
// On 10000x2x10000 mm zeroing can easily take half of time.
// Zero (bn x m) row. Safe to do here because all kernels that will
// write to this memory depend on completion of this task.
// Note: don't call device_.memset() here. device_.memset() blocks on
// thread pool worker thread, which can lead to underutilization and
// deadlocks.
memset(buffer_ + n1 * bn_ * m_, 0, bn(n1) * m_ * sizeof(Scalar));
}
RhsPacker()(packed_rhs_[k % (P - 1)][n1],
rhs_.getSubMapper(k * bk_, n1 * bn_), bk(k), bn(n1));
}
if (parallel_pack_ || shard_by_col_) {
signal_switch(k + 1);
for (Index m = nm_ - 1; m >= 0; m--) signal_kernel(m, n, k, m == 0);
} else {
signal_packing(k);
}
}
void kernel(Index m, Index n, Index k) {
// Note: order of iteration matters here. Iteration over m is innermost
// because we want to reuse the same packed rhs in consequetive tasks
// (rhs fits into L2$ while lhs only into L3$).
const Index nend = n * gn_ + gn(n);
const Index mend = m * gm_ + gm(m);
if (shard_by_col_) {
for (Index n1 = n * gn_; n1 < nend; n1++) {
for (Index m1 = m * gm_; m1 < mend; m1++)
GebpKernel()(output_.getSubMapper(m1 * bm_, n1 * bn_),
packed_lhs_[k % (P - 1)][m1],
packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1),
Scalar(1), -1, -1, 0, 0);
}
} else {
for (Index m1 = m * gm_; m1 < mend; m1++)
for (Index n1 = n * gn_; n1 < nend; n1++) {
GebpKernel()(output_.getSubMapper(m1 * bm_, n1 * bn_),
packed_lhs_[k % (P - 1)][m1],
packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1),
Scalar(1), -1, -1, 0, 0);
}
}
signal_kernel(m, n, k + 1, false);
signal_switch(k + 2);
}
void signal_packing(Index k) {
eigen_assert(!parallel_pack_);
Index s = state_packing_ready_[k % P].fetch_sub(1);
eigen_assert(s > 0);
if (s != 1) return;
state_packing_ready_[k % P] = shard_by_col_ ? nm_ : nn_;
enqueue_packing(k, shard_by_col_);
}
void signal_kernel(Index m, Index n, Index k, bool sync) {
std::atomic<uint8_t>* state = &state_kernel_[k % P][m][n];
Index s = state->load();
eigen_assert(s > 0);
if (s != 1 && state->fetch_sub(1) != 1) return;
state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed);
if (sync)
kernel(m, n, k);
else
device_.enqueueNoNotification([=]() { kernel(m, n, k); });
}
void signal_switch(Index k, Index v = 1) {
Index s = state_switch_[k % P].fetch_sub(v);
eigen_assert(s >= v);
if (s != v) return;
// Ready to switch to the next k slice.
// Reset counter for the next iteration.
state_switch_[k % P] =
(parallel_pack_ ? nm_ + nn_ : (shard_by_col_ ? nn_ : nm_)) +
nm_ * nn_;
if (k < nk_) {
// It is important to copy out nm_ and nn_, because once we kick off
// the last packing operation this and device_ can be destroyed.
Index nm = nm_;
Index nn = nn_;
// Issue lhs/rhs packing. Their completion will in turn kick off
// kernels.
if (parallel_pack_) {
enqueue_packing(k, !shard_by_col_);
enqueue_packing(k, shard_by_col_);
} else if (shard_by_col_) {
enqueue_packing(k, false);
} else {
enqueue_packing(k, true);
}
// Termination handling.
// Because kernel completion signals k + 2 switch, we need to finish nk
// + 2 slices without issuing any tasks on nk + 1 slice. So here we
// pretend that all nk + 1 packing tasks just finish instantly; so that
// nk + 2 switch only waits for completion of nk kernels.
} else if (k == nk_) {
signal_switch(k + 1,
parallel_pack_ ? nm_ + nn_ : (shard_by_col_ ? nn_ : nm_));
} else {
done_.Notify();
}
}
// Enqueue all rhs/lhs packing for k-th slice.
void enqueue_packing(Index k, bool rhs) {
enqueue_packing_helper(0, rhs ? nn_ : nm_, k, rhs);
}
void enqueue_packing_helper(Index start, Index end, Index k, bool rhs) {
if (end - start == 1) {
if (rhs)
pack_rhs(start, k);
else
pack_lhs(start, k);
} else {
Index mid = (start + end) / 2;
device_.enqueueNoNotification(
[=]() { enqueue_packing_helper(mid, end, k, rhs); });
device_.enqueueNoNotification(
[=]() { enqueue_packing_helper(start, mid, k, rhs); });
}
}
// Block sizes with accounting for potentially incomplete last block.
Index bm(Index m) const { return m + 1 < nm0_ ? bm_ : m_ + bm_ - bm_ * nm0_; }
Index bn(Index n) const { return n + 1 < nn0_ ? bn_ : n_ + bn_ - bn_ * nn0_; }
Index bk(Index k) const { return k + 1 < nk_ ? bk_ : k_ + bk_ - bk_ * nk_; }
// Task grain sizes accounting for potentially incomplete last task.
Index gm(Index m) const { return m + 1 < nm_ ? gm_ : nm0_ + gm_ - gm_ * nm_; }
Index gn(Index n) const { return n + 1 < nn_ ? gn_ : nn0_ + gn_ - gn_ * nn_; }
Context(const Context&) = delete;
void operator=(const Context&) = delete;
};
// Decide whether we want to shard m x n contraction by columns or by rows.
static bool shardByCol(Index m, Index n, int num_threads) {
// Note: we are comparing both n and m against Traits::nr, it is not
// a mistake. We are trying to figure out how both n and m will fit into
// the main sharding dimension.
// Sharding by column is the default
// ... unless there is enough data for vectorization over rows
if (m / num_threads >= Traits::nr &&
// and not enough data for vectorization over columns
(n / num_threads < Traits::nr ||
// ... or barely enough data for vectorization over columns,
// but it is not evenly dividable across threads
(n / num_threads < 4 * Traits::nr &&
(n % (num_threads * Traits::nr)) != 0 &&
// ... and it is evenly dividable across threads for rows
((m % (num_threads * Traits::nr)) == 0 ||
// .. or it is not evenly dividable for both dimensions but
// there is much more data over rows so that corner effects are
// mitigated.
(m / n >= 6)))))
return false;
// Wait, or if matrices are just substantially prolonged over the other
// dimension.
if (n / num_threads < 16 * Traits::nr && m > n * 32) return false;
return true;
}
Index coarsenM(Index m, Index n, Index bm, Index bn, Index bk, Index gn,
int num_threads, bool shard_by_col) const {
Index gm = 1;
Index gm1 = 1;
Index nm0 = divup(m, bm);
Index nm1 = nm0;
for (;;) {
// Find the next candidate for m grain size. It needs to result in
// different number of blocks. E.g. if we have 10 kernels, we want to try
// 5 and 10, but not 6, 7, 8 and 9.
while (gm1 <= nm0 && nm1 == divup(nm0, gm1)) gm1++;
if (gm1 > nm0) break;
// Check the candidate.
int res = checkGrain(m, n, bm, bn, bk, gm1, gn, gm, gn, num_threads,
shard_by_col);
if (res < 0) break;
nm1 = divup(nm0, gm1);
if (res == 0) continue;
// Commit new grain size.
gm = gm1;
}
return gm;
}
Index coarsenN(Index m, Index n, Index bm, Index bn, Index bk, Index gm,
int num_threads, bool shard_by_col) const {
Index gn = 1;
Index gn1 = 1;
Index nn0 = divup(n, bn);
Index nn1 = nn0;
for (;;) {
while (gn1 <= nn0 && nn1 == divup(nn0, gn1)) gn1++;
if (gn1 > nn0) break;
int res = checkGrain(m, n, bm, bn, bk, gm, gn1, gm, gn, num_threads,
shard_by_col);
if (res < 0) break;
nn1 = divup(nn0, gn1);
if (res == 0) continue;
gn = gn1;
}
return gn;
}
// checkGrain checks whether grain (gm, gn) is suitable and is better than
// (oldgm, oldgn).
int checkGrain(Index m, Index n, Index bm, Index bn, Index bk, Index gm,
Index gn, Index oldgm, Index oldgn, int num_threads,
bool shard_by_col) const {
const TensorOpCost cost =
contractionCost(bm * gm, bn * gn, bm, bn, bk, shard_by_col, true);
double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(
static_cast<double>(bm) * gm * bn * gn, cost);
// If the task is too small, then we agree on it regardless of anything
// else. Otherwise synchronization overheads will dominate.
if (taskSize < 1) return 1;
// If it is too large, then we reject it and all larger tasks.
if (taskSize > 2) return -1;
// Now we are in presumably good task size range.
// The main deciding factor here is parallelism. Consider that we have 12
// kernels and 4 threads. Grains of 2, 3 and 4 all yield good task sizes.
// But 2/4 yield 6/3 tasks, which gives us parallelism of 0.75 (at most 3/4
// of cores will be busy). While grain size 3 gives us 4 tasks, which gives
// us parallelism of 1 (we can load all cores).
Index nm0 = divup(m, bm);
Index nn0 = divup(n, bn);
Index new_tasks = divup(nm0, gm) * divup(nn0, gn);
double new_parallelism = static_cast<double>(new_tasks) /
(divup<int>(new_tasks, num_threads) * num_threads);
Index old_tasks = divup(nm0, oldgm) * divup(nn0, oldgn);
double old_parallelism = static_cast<double>(old_tasks) /
(divup<int>(old_tasks, num_threads) * num_threads);
if (new_parallelism > old_parallelism || new_parallelism == 1) return 1;
return 0;
}
#else // EIGEN_USE_SIMPLE_THREAD_POOL
template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered, int Alignment> template <bool lhs_inner_dim_contiguous, bool rhs_inner_dim_contiguous, bool rhs_inner_dim_reordered, int Alignment>
void evalProduct(Scalar* buffer) const { void evalProduct(Scalar* buffer) const {
if (this->m_j_size == 1) { if (this->m_j_size == 1) {
@ -384,6 +1007,47 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
} }
} }
} }
#endif // EIGEN_USE_SIMPLE_THREAD_POOL
TensorOpCost contractionCost(Index m, Index n, Index bm, Index bn, Index bk,
bool shard_by_col, bool prepacked) const {
const int packed_size = std::min<int>(PacketType<LhsScalar, Device>::size,
PacketType<RhsScalar, Device>::size);
const int output_packet_size = internal::unpacket_traits<PacketReturnType>::size;
const double kd = static_cast<double>(bk);
// Peak VFMA bandwidth is 0.5. However if we have not enough data for
// vectorization bandwidth drops. The 4.0 and 2.0 bandwidth is determined
// experimentally.
double computeBandwidth = bk == 1 ? 4.0 :
(shard_by_col ? bn : bm) < Traits::nr ||
(shard_by_col ? bm : bn) < Traits::mr ? 2.0 : 0.5;
#ifndef EIGEN_VECTORIZE_FMA
// Bandwidth of all of VFMA/MULPS/ADDPS is 0.5 on latest Intel processors.
// However for MULPS/ADDPS we have dependent sequence of 2 such instructions,
// so overall bandwidth is 1.0.
if (computeBandwidth == 0.5) computeBandwidth = 1.0;
#endif
// Computations.
TensorOpCost cost = TensorOpCost(0, 0, kd * computeBandwidth, true, packed_size);
// Output stores.
cost += TensorOpCost(0, sizeof(CoeffReturnType), 0, true, output_packet_size);
if (prepacked) {
// Packing and kernels are executed in different tasks. When we calculate
// task grain size we look only at kernel cost assuming that kernel
// is more expensive than packing.
return cost;
}
// Lhs/rhs loads + computations.
TensorOpCost lhsCost = this->m_leftImpl.costPerCoeff(true) * (kd / n);
TensorOpCost rhsCost = this->m_rightImpl.costPerCoeff(true) * (kd / m);
// Lhs packing memory cost does not contribute considerably to overall
// execution time because lhs is prefetched early and accessed sequentially.
if (shard_by_col)
lhsCost.dropMemoryCost();
else
rhsCost.dropMemoryCost();
return cost + lhsCost + rhsCost;
}
}; };
} // end namespace Eigen } // end namespace Eigen

View File

@ -10,9 +10,8 @@
#ifndef EIGEN_CXX11_TENSOR_TENSOR_COST_MODEL_H #ifndef EIGEN_CXX11_TENSOR_TENSOR_COST_MODEL_H
#define EIGEN_CXX11_TENSOR_TENSOR_COST_MODEL_H #define EIGEN_CXX11_TENSOR_TENSOR_COST_MODEL_H
//#if !defined(EIGEN_USE_GPU) // Turn on the cost model by default
//#define EIGEN_USE_COST_MODEL #define EIGEN_USE_COST_MODEL
//#endif
namespace Eigen { namespace Eigen {

View File

@ -14,7 +14,7 @@ namespace Eigen {
// Use the SimpleThreadPool by default. We'll switch to the new non blocking // Use the SimpleThreadPool by default. We'll switch to the new non blocking
// thread pool later. // thread pool later.
#ifdef EIGEN_USE_NONBLOCKING_THREAD_POOL #ifndef EIGEN_USE_SIMPLE_THREAD_POOL
template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>;
typedef NonBlockingThreadPool ThreadPool; typedef NonBlockingThreadPool ThreadPool;
#else #else

View File

@ -177,7 +177,7 @@ static __global__ void FullReductionKernelHalfFloat(Reducer reducer, const Self
} }
half2 accum = reducer.template initializePacket<half2>(); half2 accum = reducer.template initializePacket<half2>();
Index max_iter = numext::mini<Index>((num_coeffs - first_index) / 2, NumPerThread*BlockSize / 2); const Index max_iter = numext::mini<Index>((num_coeffs - first_index) / 2, NumPerThread*BlockSize / 2);
for (Index i = 0; i < max_iter; i += BlockSize) { for (Index i = 0; i < max_iter; i += BlockSize) {
const Index index = first_index + 2*i; const Index index = first_index + 2*i;
eigen_assert(index + 1 < num_coeffs); eigen_assert(index + 1 < num_coeffs);
@ -333,7 +333,7 @@ __global__ void InnerReductionKernel(Reducer reducer, const Self input, Index nu
for (Index j = 0; j < NumPerThread; j += unroll_times) { for (Index j = 0; j < NumPerThread; j += unroll_times) {
const Index last_col = col_begin + blockDim.x * (j + unroll_times - 1); const Index last_col = col_begin + blockDim.x * (j + unroll_times - 1);
if (last_col >= num_coeffs_to_reduce) { if (last_col >= num_coeffs_to_reduce) {
for (Index col = col_begin + blockDim.x * j; col < num_coeffs_to_reduce; col +=blockDim.x) { for (Index col = col_begin + blockDim.x * j; col < num_coeffs_to_reduce; col += blockDim.x) {
const float val = input.m_impl.coeff(row * num_coeffs_to_reduce + col); const float val = input.m_impl.coeff(row * num_coeffs_to_reduce + col);
reducer.reduce(val, &reduced_val); reducer.reduce(val, &reduced_val);
} }
@ -357,11 +357,97 @@ __global__ void InnerReductionKernel(Reducer reducer, const Self input, Index nu
atomicReduce(&(output[row]), reduced_val, reducer); atomicReduce(&(output[row]), reduced_val, reducer);
} }
} }
__syncthreads();
} }
} }
#ifdef EIGEN_HAS_CUDA_FP16
/*
template <int NumPerThread, typename Self,
typename Reducer, typename Index>
__global__ void InnerReductionKernelHalfFloat(Reducer reducer, const Self input, Index num_coeffs_to_reduce, Index num_preserved_coeffs,
half* output, half2* scratch) {
eigen_assert(blockDim.y == 1);
eigen_assert(blockDim.z == 1);
eigen_assert(gridDim.y == 1);
eigen_assert(gridDim.z == 1);
const int unroll_times = 16;
eigen_assert(NumPerThread % unroll_times == 0);
eigen_assert(unroll_times % 2 == 0);
const Index input_col_blocks = divup<Index>(num_coeffs_to_reduce, blockDim.x * NumPerThread);
const Index num_input_blocks = input_col_blocks * num_preserved_coeffs;
const Index num_threads = blockDim.x * gridDim.x;
const Index thread_id = blockIdx.x * blockDim.x + threadIdx.x;
// Initialize the output values if they weren't initialized by the ReductionInitKernel
if (gridDim.x == 1) {
Index i = thread_id;
for (; i < num_preserved_coeffs; i += 2*num_threads) {
((half2*)output)[i] = reducer.initializePacket();
}
if (i + 1 < num_preserved_coeffs) {
output[i] = reducer.initialize();
}
__syncthreads();
}
for (Index i = blockIdx.x; i < num_input_blocks; i += gridDim.x) {
const Index row = i / input_col_blocks;
if (row + 1 < num_preserved_coeffs) {
const Index col_block = i % input_col_blocks;
const Index col_begin = col_block * blockDim.x * NumPerThread + threadIdx.x;
half2 reduced_val1 = reducer.initializePacket();
half2 reduced_val2 = reducer.initializePacket();
for (Index j = 0; j < NumPerThread; j += unroll_times) {
const Index last_col = col_begin + blockDim.x * (j + unroll_times - 1);
if (last_col >= num_coeffs_to_reduce) {
Index col = col_begin + blockDim.x * j;
for (; col + 1 < num_coeffs_to_reduce; col += blockDim.x) {
const half2 val = input.m_impl.packet(row * num_coeffs_to_reduce + col);
reducer.reduce(val, &reduced_val);
// do the same for reduce val2 here
}
if (col < num_coeffs_to_reduce) {
// Peel;
const half last = input.m_impl.coeff(row * num_coeffs_to_reduce + col+1);
const half2 val = __halves2half2(last, reducer.initialize());
reducer.reducePacket(val, &reduced_val);
}
break;
} else {
// Faster version of the loop with no branches after unrolling.
#pragma unroll
for (int k = 0; k < unroll_times; ++k) {
const Index col = col_begin + blockDim.x * (j + k);
reducer.reduce(input.m_impl.packet(row * num_coeffs_to_reduce + col), &reduced_val);
}
}
}
#pragma unroll
for (int offset = warpSize/2; offset > 0; offset /= 2) {
reducer.reducePacket(__shfl_down(reduced_val, offset, warpSize), &reduced_val);
}
if ((threadIdx.x & (warpSize - 1)) == 0) {
if (row + 1 < num_preserved_coeffs) {
atomicReduce(&(output[row]), reduced_val, reducer);
}
else {
atomicReduce(scratch, reduced_val, reducer);
}
}
}
}
}
*/
#endif
template <typename Self, typename Op> template <typename Self, typename Op>
struct InnerReducer<Self, Op, GpuDevice> { struct InnerReducer<Self, Op, GpuDevice> {
// Unfortunately nvidia doesn't support well exotic types such as complex, // Unfortunately nvidia doesn't support well exotic types such as complex,