Vectorize and parallelize TensorScanOp.

TensorScanOp is used in TensorFlow for a number of operations, such as cumulative logexp reduction and cumulative sum and product reductions.

The benchmarks numbers below are for cumulative row- and column reductions of NxN matrices.

name                                                         old time/op             new time/op     delta
BM_cumSumRowReduction_1T/4    [using 1 threads ]             25.1ns ± 1%             35.2ns ± 1%    +40.45%
BM_cumSumRowReduction_1T/8    [using 1 threads ]             73.4ns ± 0%             82.7ns ± 3%    +12.74%
BM_cumSumRowReduction_1T/32   [using 1 threads ]              988ns ± 0%              832ns ± 0%    -15.77%
BM_cumSumRowReduction_1T/64   [using 1 threads ]             4.07µs ± 2%             3.47µs ± 0%    -14.70%
BM_cumSumRowReduction_1T/128  [using 1 threads ]             18.0µs ± 0%             16.8µs ± 0%     -6.58%
BM_cumSumRowReduction_1T/512  [using 1 threads ]              287µs ± 0%              281µs ± 0%     -2.22%
BM_cumSumRowReduction_1T/2k   [using 1 threads ]             4.78ms ± 1%             4.78ms ± 2%       ~
BM_cumSumRowReduction_1T/10k  [using 1 threads ]              117ms ± 1%              117ms ± 1%       ~
BM_cumSumRowReduction_8T/4    [using 8 threads ]             25.0ns ± 0%             35.2ns ± 0%    +40.82%
BM_cumSumRowReduction_8T/8    [using 8 threads ]             77.2ns ±16%             81.3ns ± 0%       ~
BM_cumSumRowReduction_8T/32   [using 8 threads ]              988ns ± 0%              833ns ± 0%    -15.67%
BM_cumSumRowReduction_8T/64   [using 8 threads ]             4.08µs ± 2%             3.47µs ± 0%    -14.95%
BM_cumSumRowReduction_8T/128  [using 8 threads ]             18.0µs ± 0%             17.3µs ±10%       ~
BM_cumSumRowReduction_8T/512  [using 8 threads ]              287µs ± 0%               58µs ± 6%    -79.92%
BM_cumSumRowReduction_8T/2k   [using 8 threads ]             4.79ms ± 1%             0.64ms ± 1%    -86.58%
BM_cumSumRowReduction_8T/10k  [using 8 threads ]              117ms ± 1%               18ms ± 6%    -84.50%

BM_cumSumColReduction_1T/4    [using 1 threads ]             23.9ns ± 0%             33.4ns ± 1%    +39.68%
BM_cumSumColReduction_1T/8    [using 1 threads ]             71.6ns ± 1%             49.1ns ± 3%    -31.40%
BM_cumSumColReduction_1T/32   [using 1 threads ]              973ns ± 0%              165ns ± 2%    -83.10%
BM_cumSumColReduction_1T/64   [using 1 threads ]             4.06µs ± 1%             0.57µs ± 1%    -85.94%
BM_cumSumColReduction_1T/128  [using 1 threads ]             33.4µs ± 1%              4.1µs ± 1%    -87.67%
BM_cumSumColReduction_1T/512  [using 1 threads ]             1.72ms ± 4%             0.21ms ± 5%    -87.91%
BM_cumSumColReduction_1T/2k   [using 1 threads ]              119ms ±53%               11ms ±35%    -90.42%
BM_cumSumColReduction_1T/10k  [using 1 threads ]              1.59s ±67%              0.35s ±49%    -77.96%
BM_cumSumColReduction_8T/4    [using 8 threads ]             23.8ns ± 0%             33.3ns ± 0%    +40.06%
BM_cumSumColReduction_8T/8    [using 8 threads ]             71.6ns ± 1%             49.2ns ± 5%    -31.33%
BM_cumSumColReduction_8T/32   [using 8 threads ]             1.01µs ±12%             0.17µs ± 3%    -82.93%
BM_cumSumColReduction_8T/64   [using 8 threads ]             4.15µs ± 4%             0.58µs ± 1%    -86.09%
BM_cumSumColReduction_8T/128  [using 8 threads ]             33.5µs ± 0%              4.1µs ± 4%    -87.65%
BM_cumSumColReduction_8T/512  [using 8 threads ]             1.71ms ± 3%             0.06ms ±16%    -96.21%
BM_cumSumColReduction_8T/2k   [using 8 threads ]             97.1ms ±14%              3.0ms ±23%    -96.88%
BM_cumSumColReduction_8T/10k  [using 8 threads ]              1.97s ± 8%              0.06s ± 2%    -96.74%
This commit is contained in:
Rasmus Munk Larsen 2020-05-05 00:19:43 +00:00
parent a74a278abd
commit 7b76c85daf
2 changed files with 255 additions and 80 deletions

View File

@ -190,8 +190,10 @@ struct ThreadPoolDevice {
void parallelFor(Index n, const TensorOpCost& cost, void parallelFor(Index n, const TensorOpCost& cost,
std::function<Index(Index)> block_align, std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f) const { std::function<void(Index, Index)> f) const {
if (EIGEN_PREDICT_FALSE(n <= 0)){
return;
// Compute small problems directly in the caller thread. // Compute small problems directly in the caller thread.
if (n <= 1 || numThreads() == 1 || } else if (n == 1 || numThreads() == 1 ||
CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
f(0, n); f(0, n);
return; return;

View File

@ -77,8 +77,256 @@ protected:
const bool m_exclusive; const bool m_exclusive;
}; };
template <typename Self, typename Reducer, typename Device> template <typename Self>
struct ScanLauncher; inline void ReduceScalar(Self& self, Index offset,
typename Self::CoeffReturnType* data) {
// Compute the scan along the axis, starting at the given offset
typename Self::CoeffReturnType accum = self.accumulator().initialize();
if (self.stride() == 1) {
if (self.exclusive()) {
for (Index curr = offset; curr < offset + self.size(); ++curr) {
data[curr] = self.accumulator().finalize(accum);
self.accumulator().reduce(self.inner().coeff(curr), &accum);
}
} else {
for (Index curr = offset; curr < offset + self.size(); ++curr) {
self.accumulator().reduce(self.inner().coeff(curr), &accum);
data[curr] = self.accumulator().finalize(accum);
}
}
} else {
if (self.exclusive()) {
for (Index idx3 = 0; idx3 < self.size(); idx3++) {
Index curr = offset + idx3 * self.stride();
data[curr] = self.accumulator().finalize(accum);
self.accumulator().reduce(self.inner().coeff(curr), &accum);
}
} else {
for (Index idx3 = 0; idx3 < self.size(); idx3++) {
Index curr = offset + idx3 * self.stride();
self.accumulator().reduce(self.inner().coeff(curr), &accum);
data[curr] = self.accumulator().finalize(accum);
}
}
}
}
template <typename Self>
inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
// Compute the scan along the axis, starting at the calculated offset
Packet accum = self.accumulator().template initializePacket<Packet>();
if (self.stride() == 1) {
if (self.exclusive()) {
for (Index curr = offset; curr < offset + self.size(); ++curr) {
internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum));
self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum);
}
} else {
for (Index curr = offset; curr < offset + self.size(); ++curr) {
self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum);
internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum));
}
}
} else {
if (self.exclusive()) {
for (Index idx3 = 0; idx3 < self.size(); idx3++) {
const Index curr = offset + idx3 * self.stride();
internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum));
self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum);
}
} else {
for (Index idx3 = 0; idx3 < self.size(); idx3++) {
const Index curr = offset + idx3 * self.stride();
self.accumulator().reducePacket(self.inner().template packet<Unaligned>(curr), &accum);
internal::pstoreu<Scalar, Packet>(data + curr, self.accumulator().finalizePacket(accum));
}
}
}
}
template <typename Self, bool Vectorized>
struct ReduceBlock {
void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) {
for (Index idx2 = 0; idx2 < self.stride(); idx2++) {
// Calculate the starting offset for the scan
Index offset = idx1 + idx2;
ReduceScalar(self, offset, data);
}
}
};
// Specialization for vectorized reduction.
template <typename Self>
struct ReduceBlock<Self, true> {
void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) {
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
Index idx2 = 0;
for (; idx2 + PacketSize <= self.stride(); idx2 += PacketSize) {
// Calculate the starting offset for the packet scan
Index offset = idx1 + idx2;
ReducePacket(self, offset, data);
}
for (; idx2 < self.stride(); idx2++) {
// Calculate the starting offset for the scan
Index offset = idx1 + idx2;
ReduceScalar(self, offset, data);
}
}
};
// CPU implementation of scan
template <typename Self, typename Reducer, typename Device, bool Vectorized =
TensorEvaluator<typename Self::ChildTypeNoConst, Device>::PacketAccess &&
internal::reducer_traits<Reducer, Device>::PacketAccess>
struct ScanLauncher {
void operator()(Self& self, typename Self::CoeffReturnType *data) {
Index total_size = internal::array_prod(self.dimensions());
// We fix the index along the scan axis to 0 and perform a
// scan per remaining entry. The iteration is split into two nested
// loops to avoid an integer division by keeping track of each idx1 and idx2.
for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) {
ReduceBlock<Self, Vectorized> block_reducer;
block_reducer(self, idx1, data);
}
}
};
#ifdef EIGEN_USE_THREADS
// Specialization for multi-threaded, vectorized execution.
template <typename Self, typename Reducer>
struct ScanLauncher<Self, Reducer, ThreadPoolDevice, true> {
void operator()(Self& self, typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
const Index total_size = internal::array_prod(self.dimensions());
const Index inner_block_size = self.stride() * self.size();
const Index num_outer_blocks = total_size / inner_block_size;
// Block alignment used to avoid false sharing of cachelines among threads.
// Currently set to twice the cache line size on Intel and ARM processors.
EIGEN_CONSTEXPR Index kBlockAlignment = 128;
if ((num_outer_blocks >= self.stride() && total_size <= 4096) ||
(num_outer_blocks < self.stride() && self.stride() < PacketSize)) {
ScanLauncher<Self, Reducer, DefaultDevice, true> launcher;
launcher(self, data);
return;
}
if (num_outer_blocks >= self.stride()) {
// Parallelize over outer blocks.
self.device().parallelFor(
num_outer_blocks,
TensorOpCost(inner_block_size, inner_block_size,
16 * PacketSize * inner_block_size, true, PacketSize),
// Make the shard size large enough that two neighboring threads won't
// write to the same cacheline of `data`.
[=](Index blk_size) {
const Index inner_blocks_cacheline =
numext::maxi<Index>(1, kBlockAlignment / (inner_block_size * sizeof(Scalar)));
return inner_blocks_cacheline *
divup(blk_size, inner_blocks_cacheline);
},
[&](Index first, Index last) {
for (Index idx = first; idx < last; ++idx) {
ReduceBlock<Self, true> block_reducer;
block_reducer(self, idx * inner_block_size, data);
}
});
} else {
// Parallelize over packets/scalars of the dimensions when the reduction
// axis is not an inner dimension.
const Index num_packets = self.stride() / PacketSize;
for (Index idx1 = 0; idx1 < total_size;
idx1 += self.stride() * self.size()) {
self.device().parallelFor(
num_packets,
TensorOpCost(PacketSize * self.size(), PacketSize * self.size(),
16 * PacketSize * self.size(), true, PacketSize),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
const Index packets_per_cacheline =
numext::maxi<Index>(1, kBlockAlignment / (PacketSize * sizeof(Scalar)));
return packets_per_cacheline *
divup(blk_size, packets_per_cacheline);
},
[&](Index first, Index last) {
for (Index packet = first; packet < last; ++packet) {
const Index idx2 = packet * PacketSize;
ReducePacket(self, idx1 + idx2, data);
}
});
const Index num_scalars = self.stride() - num_packets * PacketSize;
self.device().parallelFor(
num_scalars,
TensorOpCost(self.size(), self.size(), 16 * self.size()),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
const Index scalars_per_cacheline =
numext::maxi<Index>(1, kBlockAlignment / sizeof(Scalar));
return scalars_per_cacheline *
divup(blk_size, scalars_per_cacheline);
},
[&](Index first, Index last) {
for (Index scalar = first; scalar < last; ++scalar) {
const Index idx2 = num_packets * PacketSize + scalar;
ReduceScalar(self, idx1 + idx2, data);
}
});
}
}
}
};
#endif // EIGEN_USE_THREADS
#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC))
// GPU implementation of scan
// TODO(ibab) This placeholder implementation performs multiple scans in
// parallel, but it would be better to use a parallel scan algorithm and
// optimize memory access.
template <typename Self, typename Reducer>
__global__ void ScanKernel(Self self, Index total_size, typename Self::CoeffReturnType* data) {
// Compute offset as in the CPU version
Index val = threadIdx.x + blockIdx.x * blockDim.x;
Index offset = (val / self.stride()) * self.stride() * self.size() + val % self.stride();
if (offset + (self.size() - 1) * self.stride() < total_size) {
// Compute the scan along the axis, starting at the calculated offset
typename Self::CoeffReturnType accum = self.accumulator().initialize();
for (Index idx = 0; idx < self.size(); idx++) {
Index curr = offset + idx * self.stride();
if (self.exclusive()) {
data[curr] = self.accumulator().finalize(accum);
self.accumulator().reduce(self.inner().coeff(curr), &accum);
} else {
self.accumulator().reduce(self.inner().coeff(curr), &accum);
data[curr] = self.accumulator().finalize(accum);
}
}
}
__syncthreads();
}
template <typename Self, typename Reducer>
struct ScanLauncher<Self, Reducer, GpuDevice, false> {
void operator()(const Self& self, typename Self::CoeffReturnType* data) {
Index total_size = internal::array_prod(self.dimensions());
Index num_blocks = (total_size / self.size() + 63) / 64;
Index block_size = 64;
LAUNCH_GPU_KERNEL((ScanKernel<Self, Reducer>), num_blocks, block_size, 0, self.device(), self, total_size, data);
}
};
#endif // EIGEN_USE_GPU && (EIGEN_GPUCC)
// Eval as rvalue // Eval as rvalue
template <typename Op, typename ArgType, typename Device> template <typename Op, typename ArgType, typename Device>
@ -86,6 +334,7 @@ struct TensorEvaluator<const TensorScanOp<Op, ArgType>, Device> {
typedef TensorScanOp<Op, ArgType> XprType; typedef TensorScanOp<Op, ArgType> XprType;
typedef typename XprType::Index Index; typedef typename XprType::Index Index;
typedef const ArgType ChildTypeNoConst;
typedef const ArgType ChildType; typedef const ArgType ChildType;
static const int NumDims = internal::array_size<typename TensorEvaluator<ArgType, Device>::Dimensions>::value; static const int NumDims = internal::array_size<typename TensorEvaluator<ArgType, Device>::Dimensions>::value;
typedef DSizes<Index, NumDims> Dimensions; typedef DSizes<Index, NumDims> Dimensions;
@ -232,82 +481,6 @@ protected:
EvaluatorPointerType m_output; EvaluatorPointerType m_output;
}; };
// CPU implementation of scan
// TODO(ibab) This single-threaded implementation should be parallelized,
// at least by running multiple scans at the same time.
template <typename Self, typename Reducer, typename Device>
struct ScanLauncher {
void operator()(Self& self, typename Self::CoeffReturnType *data) {
Index total_size = internal::array_prod(self.dimensions());
// We fix the index along the scan axis to 0 and perform a
// scan per remaining entry. The iteration is split into two nested
// loops to avoid an integer division by keeping track of each idx1 and idx2.
for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) {
for (Index idx2 = 0; idx2 < self.stride(); idx2++) {
// Calculate the starting offset for the scan
Index offset = idx1 + idx2;
// Compute the scan along the axis, starting at the calculated offset
typename Self::CoeffReturnType accum = self.accumulator().initialize();
for (Index idx3 = 0; idx3 < self.size(); idx3++) {
Index curr = offset + idx3 * self.stride();
if (self.exclusive()) {
data[curr] = self.accumulator().finalize(accum);
self.accumulator().reduce(self.inner().coeff(curr), &accum);
} else {
self.accumulator().reduce(self.inner().coeff(curr), &accum);
data[curr] = self.accumulator().finalize(accum);
}
}
}
}
}
};
#if defined(EIGEN_USE_GPU) && (defined(EIGEN_GPUCC))
// GPU implementation of scan
// TODO(ibab) This placeholder implementation performs multiple scans in
// parallel, but it would be better to use a parallel scan algorithm and
// optimize memory access.
template <typename Self, typename Reducer>
__global__ void ScanKernel(Self self, Index total_size, typename Self::CoeffReturnType* data) {
// Compute offset as in the CPU version
Index val = threadIdx.x + blockIdx.x * blockDim.x;
Index offset = (val / self.stride()) * self.stride() * self.size() + val % self.stride();
if (offset + (self.size() - 1) * self.stride() < total_size) {
// Compute the scan along the axis, starting at the calculated offset
typename Self::CoeffReturnType accum = self.accumulator().initialize();
for (Index idx = 0; idx < self.size(); idx++) {
Index curr = offset + idx * self.stride();
if (self.exclusive()) {
data[curr] = self.accumulator().finalize(accum);
self.accumulator().reduce(self.inner().coeff(curr), &accum);
} else {
self.accumulator().reduce(self.inner().coeff(curr), &accum);
data[curr] = self.accumulator().finalize(accum);
}
}
}
__syncthreads();
}
template <typename Self, typename Reducer>
struct ScanLauncher<Self, Reducer, GpuDevice> {
void operator()(const Self& self, typename Self::CoeffReturnType* data) {
Index total_size = internal::array_prod(self.dimensions());
Index num_blocks = (total_size / self.size() + 63) / 64;
Index block_size = 64;
LAUNCH_GPU_KERNEL((ScanKernel<Self, Reducer>), num_blocks, block_size, 0, self.device(), self, total_size, data);
}
};
#endif // EIGEN_USE_GPU && (EIGEN_GPUCC)
} // end namespace Eigen } // end namespace Eigen
#endif // EIGEN_CXX11_TENSOR_TENSOR_SCAN_H #endif // EIGEN_CXX11_TENSOR_TENSOR_SCAN_H