Add parallelization of TensorScanOp for types without packet ops.

Clean up the code a bit and do a few micro-optimizations to improve performance for small tensors.

Benchmark numbers for Tensor<uint32_t>:

name                                                       old time/op             new time/op             delta
BM_cumSumRowReduction_1T/8   [using 1 threads]             76.5ns ± 0%             61.3ns ± 4%    -19.80%          (p=0.008 n=5+5)
BM_cumSumRowReduction_1T/64  [using 1 threads]             2.47µs ± 1%             2.40µs ± 1%     -2.77%          (p=0.008 n=5+5)
BM_cumSumRowReduction_1T/256 [using 1 threads]             39.8µs ± 0%             39.6µs ± 0%     -0.60%          (p=0.008 n=5+5)
BM_cumSumRowReduction_1T/4k  [using 1 threads]             13.9ms ± 0%             13.4ms ± 1%     -4.19%          (p=0.008 n=5+5)
BM_cumSumRowReduction_2T/8   [using 2 threads]             76.8ns ± 0%             59.1ns ± 0%    -23.09%          (p=0.016 n=5+4)
BM_cumSumRowReduction_2T/64  [using 2 threads]             2.47µs ± 1%             2.41µs ± 1%     -2.53%          (p=0.008 n=5+5)
BM_cumSumRowReduction_2T/256 [using 2 threads]             39.8µs ± 0%             34.7µs ± 6%    -12.74%          (p=0.008 n=5+5)
BM_cumSumRowReduction_2T/4k  [using 2 threads]             13.8ms ± 1%              7.2ms ± 6%    -47.74%          (p=0.008 n=5+5)
BM_cumSumRowReduction_8T/8   [using 8 threads]             76.4ns ± 0%             61.8ns ± 3%    -19.02%          (p=0.008 n=5+5)
BM_cumSumRowReduction_8T/64  [using 8 threads]             2.47µs ± 1%             2.40µs ± 1%     -2.84%          (p=0.008 n=5+5)
BM_cumSumRowReduction_8T/256 [using 8 threads]             39.8µs ± 0%             28.3µs ±11%    -28.75%          (p=0.008 n=5+5)
BM_cumSumRowReduction_8T/4k  [using 8 threads]             13.8ms ± 0%              2.7ms ± 5%    -80.39%          (p=0.008 n=5+5)
BM_cumSumColReduction_1T/8   [using 1 threads]             59.1ns ± 0%             80.3ns ± 0%    +35.94%          (p=0.029 n=4+4)
BM_cumSumColReduction_1T/64  [using 1 threads]             3.06µs ± 0%             3.08µs ± 1%       ~             (p=0.114 n=4+4)
BM_cumSumColReduction_1T/256 [using 1 threads]              175µs ± 0%              176µs ± 0%       ~             (p=0.190 n=4+5)
BM_cumSumColReduction_1T/4k  [using 1 threads]              824ms ± 1%              844ms ± 1%     +2.37%          (p=0.008 n=5+5)
BM_cumSumColReduction_2T/8   [using 2 threads]             59.0ns ± 0%             90.7ns ± 0%    +53.74%          (p=0.029 n=4+4)
BM_cumSumColReduction_2T/64  [using 2 threads]             3.06µs ± 0%             3.10µs ± 0%     +1.08%          (p=0.016 n=4+5)
BM_cumSumColReduction_2T/256 [using 2 threads]              176µs ± 0%              189µs ±18%       ~             (p=0.151 n=5+5)
BM_cumSumColReduction_2T/4k  [using 2 threads]              836ms ± 2%              611ms ±14%    -26.92%          (p=0.008 n=5+5)
BM_cumSumColReduction_8T/8   [using 8 threads]             59.3ns ± 2%             90.6ns ± 0%    +52.79%          (p=0.008 n=5+5)
BM_cumSumColReduction_8T/64  [using 8 threads]             3.07µs ± 0%             3.10µs ± 0%     +0.99%          (p=0.016 n=5+4)
BM_cumSumColReduction_8T/256 [using 8 threads]              176µs ± 0%               80µs ±19%    -54.51%          (p=0.008 n=5+5)
BM_cumSumColReduction_8T/4k  [using 8 threads]              827ms ± 2%              180ms ±14%    -78.24%          (p=0.008 n=5+5)
This commit is contained in:
Rasmus Munk Larsen 2020-05-06 14:48:37 -07:00
parent 0e59f786e1
commit 2fd8a5a08f

View File

@ -77,9 +77,12 @@ protected:
const bool m_exclusive;
};
namespace internal {
template <typename Self>
inline void ReduceScalar(Self& self, Index offset,
typename Self::CoeffReturnType* data) {
EIGEN_STRONG_INLINE void ReduceScalar(Self& self, Index offset,
typename Self::CoeffReturnType* data) {
// Compute the scan along the axis, starting at the given offset
typename Self::CoeffReturnType accum = self.accumulator().initialize();
if (self.stride() == 1) {
@ -112,7 +115,8 @@ inline void ReduceScalar(Self& self, Index offset,
}
template <typename Self>
inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnType* data) {
EIGEN_STRONG_INLINE void ReducePacket(Self& self, Index offset,
typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
// Compute the scan along the axis, starting at the calculated offset
@ -146,9 +150,10 @@ inline void ReducePacket(Self& self, Index offset, typename Self::CoeffReturnTyp
}
}
template <typename Self, bool Vectorized>
template <typename Self, bool Vectorize, bool Parallel>
struct ReduceBlock {
void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) {
EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
typename Self::CoeffReturnType* data) {
for (Index idx2 = 0; idx2 < self.stride(); idx2++) {
// Calculate the starting offset for the scan
Index offset = idx1 + idx2;
@ -159,8 +164,9 @@ struct ReduceBlock {
// Specialization for vectorized reduction.
template <typename Self>
struct ReduceBlock<Self, true> {
void operator()(Self& self, Index idx1, typename Self::CoeffReturnType* data) {
struct ReduceBlock<Self, /*Vectorize=*/true, /*Parallel=*/false> {
EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
typename Self::CoeffReturnType* data) {
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
Index idx2 = 0;
@ -177,109 +183,144 @@ struct ReduceBlock<Self, true> {
}
};
// CPU implementation of scan
template <typename Self, typename Reducer, typename Device, bool Vectorized =
TensorEvaluator<typename Self::ChildTypeNoConst, Device>::PacketAccess &&
internal::reducer_traits<Reducer, Device>::PacketAccess>
// Single-threaded CPU implementation of scan
template <typename Self, typename Reducer, typename Device,
bool Vectorize =
(TensorEvaluator<typename Self::ChildTypeNoConst, Device>::PacketAccess &&
internal::reducer_traits<Reducer, Device>::PacketAccess)>
struct ScanLauncher {
void operator()(Self& self, typename Self::CoeffReturnType *data) {
void operator()(Self& self, typename Self::CoeffReturnType* data) {
Index total_size = internal::array_prod(self.dimensions());
// We fix the index along the scan axis to 0 and perform a
// scan per remaining entry. The iteration is split into two nested
// loops to avoid an integer division by keeping track of each idx1 and idx2.
// loops to avoid an integer division by keeping track of each idx1 and
// idx2.
for (Index idx1 = 0; idx1 < total_size; idx1 += self.stride() * self.size()) {
ReduceBlock<Self, Vectorized> block_reducer;
ReduceBlock<Self, Vectorize, /*Parallel=*/false> block_reducer;
block_reducer(self, idx1, data);
}
}
};
#ifdef EIGEN_USE_THREADS
// Specialization for multi-threaded, vectorized execution.
template <typename Self, typename Reducer>
struct ScanLauncher<Self, Reducer, ThreadPoolDevice, true> {
// Adjust block_size to avoid false sharing of cachelines among
// threads. Currently set to twice the cache line size on Intel and ARM
// processors.
EIGEN_STRONG_INLINE Index AdjustBlockSize(Index item_size, Index block_size) {
EIGEN_CONSTEXPR Index kBlockAlignment = 128;
const Index items_per_cacheline =
numext::maxi<Index>(1, kBlockAlignment / item_size);
return items_per_cacheline * divup(block_size, items_per_cacheline);
}
template <typename Self>
struct ReduceBlock<Self, /*Vectorize=*/true, /*Parallel=*/true> {
EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
Index num_scalars = self.stride();
Index num_packets = 0;
if (self.stride() >= PacketSize) {
num_packets = self.stride() / PacketSize;
self.device().parallelFor(
num_packets,
TensorOpCost(PacketSize * self.size(), PacketSize * self.size(),
16 * PacketSize * self.size(), true, PacketSize),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
return AdjustBlockSize(PacketSize * sizeof(Scalar), blk_size);
},
[&](Index first, Index last) {
for (Index packet = first; packet < last; ++packet) {
const Index idx2 = packet * PacketSize;
ReducePacket(self, idx1 + idx2, data);
}
});
num_scalars -= num_packets * PacketSize;
}
self.device().parallelFor(
num_scalars, TensorOpCost(self.size(), self.size(), 16 * self.size()),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
return AdjustBlockSize(sizeof(Scalar), blk_size);
},
[&](Index first, Index last) {
for (Index scalar = first; scalar < last; ++scalar) {
const Index idx2 = num_packets * PacketSize + scalar;
ReduceScalar(self, idx1 + idx2, data);
}
});
}
};
template <typename Self>
struct ReduceBlock<Self, /*Vectorize=*/false, /*Parallel=*/true> {
EIGEN_STRONG_INLINE void operator()(Self& self, Index idx1,
typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
self.device().parallelFor(
self.stride(), TensorOpCost(self.size(), self.size(), 16 * self.size()),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
return AdjustBlockSize(sizeof(Scalar), blk_size);
},
[&](Index first, Index last) {
for (Index idx2 = first; idx2 < last; ++idx2) {
ReduceScalar(self, idx1 + idx2, data);
}
});
}
};
// Specialization for multi-threaded execution.
template <typename Self, typename Reducer, bool Vectorize>
struct ScanLauncher<Self, Reducer, ThreadPoolDevice, Vectorize> {
void operator()(Self& self, typename Self::CoeffReturnType* data) {
using Scalar = typename Self::CoeffReturnType;
using Packet = typename Self::PacketReturnType;
const int PacketSize = internal::unpacket_traits<Packet>::size;
const Index total_size = internal::array_prod(self.dimensions());
const Index inner_block_size = self.stride() * self.size();
const Index num_outer_blocks = total_size / inner_block_size;
// Block alignment used to avoid false sharing of cachelines among threads.
// Currently set to twice the cache line size on Intel and ARM processors.
EIGEN_CONSTEXPR Index kBlockAlignment = 128;
bool parallelize_by_outer_blocks = (total_size >= (self.stride() * inner_block_size));
if ((num_outer_blocks >= self.stride() && total_size <= 4096) ||
(num_outer_blocks < self.stride() && self.stride() < PacketSize)) {
ScanLauncher<Self, Reducer, DefaultDevice, true> launcher;
if ((parallelize_by_outer_blocks && total_size <= 4096) ||
(!parallelize_by_outer_blocks && self.stride() < PacketSize)) {
ScanLauncher<Self, Reducer, DefaultDevice, Vectorize> launcher;
launcher(self, data);
return;
}
if (num_outer_blocks >= self.stride()) {
if (parallelize_by_outer_blocks) {
// Parallelize over outer blocks.
const Index num_outer_blocks = total_size / inner_block_size;
self.device().parallelFor(
num_outer_blocks,
TensorOpCost(inner_block_size, inner_block_size,
16 * PacketSize * inner_block_size, true, PacketSize),
// Make the shard size large enough that two neighboring threads won't
// write to the same cacheline of `data`.
16 * PacketSize * inner_block_size, Vectorize,
PacketSize),
[=](Index blk_size) {
const Index inner_blocks_cacheline =
numext::maxi<Index>(1, kBlockAlignment / (inner_block_size * sizeof(Scalar)));
return inner_blocks_cacheline *
divup(blk_size, inner_blocks_cacheline);
return AdjustBlockSize(inner_block_size * sizeof(Scalar), blk_size);
},
[&](Index first, Index last) {
for (Index idx = first; idx < last; ++idx) {
ReduceBlock<Self, true> block_reducer;
block_reducer(self, idx * inner_block_size, data);
for (Index idx1 = first; idx1 < last; ++idx1) {
ReduceBlock<Self, Vectorize, /*Parallelize=*/false> block_reducer;
block_reducer(self, idx1 * inner_block_size, data);
}
});
} else {
// Parallelize over packets/scalars of the dimensions when the reduction
// Parallelize over inner packets/scalars dimensions when the reduction
// axis is not an inner dimension.
const Index num_packets = self.stride() / PacketSize;
ReduceBlock<Self, Vectorize, /*Parallelize=*/true> block_reducer;
for (Index idx1 = 0; idx1 < total_size;
idx1 += self.stride() * self.size()) {
self.device().parallelFor(
num_packets,
TensorOpCost(PacketSize * self.size(), PacketSize * self.size(),
16 * PacketSize * self.size(), true, PacketSize),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
const Index packets_per_cacheline =
numext::maxi<Index>(1, kBlockAlignment / (PacketSize * sizeof(Scalar)));
return packets_per_cacheline *
divup(blk_size, packets_per_cacheline);
},
[&](Index first, Index last) {
for (Index packet = first; packet < last; ++packet) {
const Index idx2 = packet * PacketSize;
ReducePacket(self, idx1 + idx2, data);
}
});
const Index num_scalars = self.stride() - num_packets * PacketSize;
self.device().parallelFor(
num_scalars,
TensorOpCost(self.size(), self.size(), 16 * self.size()),
// Make the shard size large enough that two neighboring threads
// won't write to the same cacheline of `data`.
[=](Index blk_size) {
const Index scalars_per_cacheline =
numext::maxi<Index>(1, kBlockAlignment / sizeof(Scalar));
return scalars_per_cacheline *
divup(blk_size, scalars_per_cacheline);
},
[&](Index first, Index last) {
for (Index scalar = first; scalar < last; ++scalar) {
const Index idx2 = num_packets * PacketSize + scalar;
ReduceScalar(self, idx1 + idx2, data);
}
});
block_reducer(self, idx1, data);
}
}
}
@ -328,6 +369,8 @@ struct ScanLauncher<Self, Reducer, GpuDevice, false> {
};
#endif // EIGEN_USE_GPU && (EIGEN_GPUCC)
} // namespace internal
// Eval as rvalue
template <typename Op, typename ArgType, typename Device>
struct TensorEvaluator<const TensorScanOp<Op, ArgType>, Device> {
@ -424,7 +467,7 @@ struct TensorEvaluator<const TensorScanOp<Op, ArgType>, Device> {
EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType data) {
m_impl.evalSubExprsIfNeeded(NULL);
ScanLauncher<Self, Op, Device> launcher;
internal::ScanLauncher<Self, Op, Device> launcher;
if (data) {
launcher(*this, data);
return false;