10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H) 11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 17 template <
typename Function,
typename... Args>
struct FunctionWrapperWithNotification
19 static void run(Notification* n, Function f, Args... args) {
27 template <
typename Function,
typename... Args>
struct FunctionWrapperWithBarrier
29 static void run(Barrier* b, Function f, Args... args) {
37 template <
typename SyncType>
38 static EIGEN_STRONG_INLINE
void wait_until_ready(SyncType* n) {
47 virtual ~Allocator() {}
48 virtual void* allocate(
size_t num_bytes)
const = 0;
49 virtual void deallocate(
void* buffer)
const = 0;
53 struct ThreadPoolDevice {
55 ThreadPoolDevice(ThreadPoolInterface* pool,
int num_cores, Allocator* allocator =
nullptr)
56 : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
58 EIGEN_STRONG_INLINE
void* allocate(
size_t num_bytes)
const {
59 return allocator_ ? allocator_->allocate(num_bytes)
60 : internal::aligned_malloc(num_bytes);
63 EIGEN_STRONG_INLINE
void deallocate(
void* buffer)
const {
65 allocator_->deallocate(buffer);
67 internal::aligned_free(buffer);
71 EIGEN_STRONG_INLINE
void* allocate_temp(
size_t num_bytes)
const {
72 return allocate(num_bytes);
75 EIGEN_STRONG_INLINE
void deallocate_temp(
void* buffer)
const {
79 template<
typename Type>
80 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type
get(Type data)
const {
84 EIGEN_STRONG_INLINE
void memcpy(
void* dst,
const void* src,
size_t n)
const {
86 ::memcpy(dst, src, n);
92 const size_t kMinBlockSize = 32768;
93 const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
94 if (n <= kMinBlockSize || num_threads < 2) {
95 ::memcpy(dst, src, n);
97 const char* src_ptr =
static_cast<const char*
>(src);
98 char* dst_ptr =
static_cast<char*
>(dst);
99 const size_t blocksize = (n + (num_threads - 1)) / num_threads;
100 Barrier barrier(static_cast<int>(num_threads - 1));
102 for (
size_t i = 1; i < num_threads; ++i) {
103 enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
104 ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
105 numext::mini(blocksize, n - (i * blocksize)));
109 ::memcpy(dst_ptr, src_ptr, blocksize);
114 EIGEN_STRONG_INLINE
void memcpyHostToDevice(
void* dst,
const void* src,
size_t n)
const {
117 EIGEN_STRONG_INLINE
void memcpyDeviceToHost(
void* dst,
const void* src,
size_t n)
const {
121 EIGEN_STRONG_INLINE
void memset(
void* buffer,
int c,
size_t n)
const {
122 ::memset(buffer, c, n);
125 EIGEN_STRONG_INLINE
int numThreads()
const {
131 EIGEN_STRONG_INLINE
int numThreadsInPool()
const {
132 return pool_->NumThreads();
135 EIGEN_STRONG_INLINE
size_t firstLevelCacheSize()
const {
136 return l1CacheSize();
139 EIGEN_STRONG_INLINE
size_t lastLevelCacheSize()
const {
141 return l3CacheSize() / num_threads_;
144 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE
int majorDeviceVersion()
const {
149 template <
class Function,
class... Args>
150 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
151 Args&&... args)
const {
152 Notification* n =
new Notification();
154 std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n,
155 std::move(f), args...));
159 template <
class Function,
class... Args>
160 EIGEN_STRONG_INLINE
void enqueue_with_barrier(Barrier* b, Function&& f,
161 Args&&... args)
const {
163 std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b,
164 std::move(f), args...));
167 template <
class Function,
class... Args>
168 EIGEN_STRONG_INLINE
void enqueueNoNotification(Function&& f,
169 Args&&... args)
const {
170 if (
sizeof...(args) > 0) {
171 pool_->Schedule(std::bind(std::move(f), args...));
173 pool_->Schedule(std::move(f));
179 EIGEN_STRONG_INLINE
int currentThreadId()
const {
180 return pool_->CurrentThreadId();
190 void parallelFor(
Index n,
const TensorOpCost& cost,
193 if (EIGEN_PREDICT_FALSE(n <= 0)){
196 }
else if (n == 1 || numThreads() == 1 ||
197 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
203 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
208 Barrier barrier(static_cast<unsigned int>(block.count));
209 std::function<void(Index, Index)> handleRange;
210 handleRange = [=, &handleRange, &barrier, &f](
Index firstIdx,
212 while (lastIdx - firstIdx > block.size) {
214 const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
215 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
219 f(firstIdx, lastIdx);
223 if (block.count <= numThreads()) {
230 pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
237 void parallelFor(
Index n,
const TensorOpCost& cost,
239 parallelFor(n, cost,
nullptr, std::move(f));
249 void parallelForAsync(
Index n,
const TensorOpCost& cost,
252 std::function<
void()> done)
const {
254 if (n <= 1 || numThreads() == 1 ||
255 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
262 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
264 ParallelForAsyncContext*
const ctx =
265 new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
270 ctx->handle_range = [
this, ctx, block](
Index firstIdx,
Index lastIdx) {
271 while (lastIdx - firstIdx > block.size) {
273 const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
275 [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
280 ctx->f(firstIdx, lastIdx);
283 if (ctx->count.fetch_sub(1) == 1)
delete ctx;
286 if (block.count <= numThreads()) {
289 ctx->handle_range(0, n);
293 pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
298 void parallelForAsync(
Index n,
const TensorOpCost& cost,
300 std::function<
void()> done)
const {
301 parallelForAsync(n, cost,
nullptr, std::move(f), std::move(done));
305 ThreadPoolInterface* getPool()
const {
return pool_; }
308 Allocator* allocator()
const {
return allocator_; }
311 typedef TensorCostModel<ThreadPoolDevice> CostModel;
315 struct ParallelForAsyncContext {
316 ParallelForAsyncContext(
Index block_count,
318 std::function<
void()> done_callback)
319 : count(block_count),
320 f(
std::move(block_f)),
321 done(
std::move(done_callback)) {}
322 ~ParallelForAsyncContext() { done(); }
324 std::atomic<Index> count;
325 std::function<void(Index, Index)> f;
326 std::function<void()> done;
328 std::function<void(Index, Index)> handle_range;
331 struct ParallelForBlock {
341 ParallelForBlock CalculateParallelForBlock(
342 const Index n,
const TensorOpCost& cost,
343 std::function<
Index(
Index)> block_align)
const {
344 const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
345 const Index max_oversharding_factor = 4;
346 Index block_size = numext::mini(
347 n, numext::maxi<Index>(
348 divup<Index>(n, max_oversharding_factor * numThreads()),
350 const Index max_block_size = numext::mini(n, 2 * block_size);
353 Index new_block_size = block_align(block_size);
354 eigen_assert(new_block_size >= block_size);
355 block_size = numext::mini(n, new_block_size);
358 Index block_count = divup(n, block_size);
362 double max_efficiency =
363 static_cast<double>(block_count) /
364 (divup<int>(block_count, numThreads()) * numThreads());
368 for (
Index prev_block_count = block_count;
369 max_efficiency < 1.0 && prev_block_count > 1;) {
372 Index coarser_block_size = divup(n, prev_block_count - 1);
374 Index new_block_size = block_align(coarser_block_size);
375 eigen_assert(new_block_size >= coarser_block_size);
376 coarser_block_size = numext::mini(n, new_block_size);
378 if (coarser_block_size > max_block_size) {
382 const Index coarser_block_count = divup(n, coarser_block_size);
383 eigen_assert(coarser_block_count < prev_block_count);
384 prev_block_count = coarser_block_count;
385 const double coarser_efficiency =
386 static_cast<double>(coarser_block_count) /
387 (divup<int>(coarser_block_count, numThreads()) * numThreads());
388 if (coarser_efficiency + 0.01 >= max_efficiency) {
390 block_size = coarser_block_size;
391 block_count = coarser_block_count;
392 if (max_efficiency < coarser_efficiency) {
393 max_efficiency = coarser_efficiency;
398 return {block_size, block_count};
401 ThreadPoolInterface* pool_;
403 Allocator* allocator_;
409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H Namespace containing all symbols from the Eigen library.
Definition: AutoDiffScalar.h:718
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index