ApplyRange.h#
Fully qualified name: carb/tasking/detail/ApplyRange.h
File members: carb/tasking/detail/ApplyRange.h
// Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "../../Defines.h"
#if CARB_VERSION_ATLEAST(carb_tasking_ITasking, 2, 5)
namespace carb
{
namespace tasking
{
class Split_t
{
};
constexpr static Split_t Split{};
// Opaque types outside of carb.tasking
class ThreadLocalStorage;
struct TaskBundle;
namespace detail
{
struct ApplyRangeState;
// ApplyRange interface
class IApplyRange
{
public:
virtual uint32_t getThreadCount() const noexcept = 0;
virtual void beginApplyRange(ApplyRangeState& state, void* arg) const noexcept = 0;
virtual void finishApplyRange(ApplyRangeState& state,
OnTaskFn func,
void* (*resolve)(void*),
std::atomic_bool& wait) const noexcept = 0;
virtual void queueTask(OnTaskFn func, void* data) const noexcept = 0;
virtual TaskBundle* currentTask() const noexcept = 0;
virtual bool isStolen(TaskBundle* tb) const noexcept = 0;
};
using Depth_t = uint8_t;
constexpr Depth_t kInitDepth = 5;
constexpr Depth_t kDemandAddDepth = 1;
constexpr size_t kInitialChunks = 2;
template <class T>
class Range
{
public:
using const_iterator = T;
using value_type = T;
using size_type = size_t;
constexpr Range(T begin, T end, T step = 1) noexcept : m_end(end), m_begin(begin), m_step(step)
{
CARB_ASSERT(m_step > 0);
}
constexpr Range(Range& other, Split_t) noexcept : m_end(other.end()), m_begin(split(other)), m_step(other.m_step)
{
CARB_ASSERT(!(begin() < other.end()) && !(other.end() < begin()));
}
constexpr const_iterator begin() const noexcept
{
return m_begin;
}
constexpr const_iterator end() const noexcept
{
return m_end;
}
constexpr size_type size() const noexcept
{
return m_end - m_begin;
}
constexpr size_type step() const noexcept
{
return m_step;
}
constexpr bool empty() const noexcept
{
// Assume that T is only less-than-comparable
return !(m_begin < m_end);
}
constexpr bool canDivide() const noexcept
{
// Assume that T is only less-than-comparable
return step() < size();
}
private:
T m_end;
T m_begin;
T m_step;
static T split(Range& other)
{
CARB_ASSERT(other.canDivide());
auto mid = other.m_begin + (other.m_end - other.m_begin) / size_t(2);
other.m_end = mid;
return mid;
}
};
struct Node
{
Node* m_parent = nullptr;
std::atomic_int m_refCount{ 0 };
Node() = default;
Node(Node* parent, int refCount) : m_parent(parent), m_refCount(refCount)
{
CARB_ASSERT(refCount > 0);
}
};
struct WaitNode : Node
{
WaitNode() : Node{ nullptr, 1 }
{
}
std::atomic_bool m_wait{ true };
};
struct TreeNode : public Node
{
std::atomic_bool m_childStolen{ false };
TreeNode(Node* parent, int refCount) : Node(parent, refCount)
{
}
template <class Task>
static void markTaskStolen(Task& t)
{
static_cast<TreeNode*>(t.m_parent)->m_childStolen.store(true, std::memory_order_relaxed);
}
template <class Task>
static bool isPeerStolen(Task& t)
{
return static_cast<TreeNode*>(t.m_parent)->m_childStolen.load(std::memory_order_relaxed);
}
void reduce()
{
}
};
template <class Body>
struct ReductionTreeNode : public TreeNode
{
alignas(alignof(Body)) char m_space[sizeof(Body)];
Body& m_left;
bool m_hasRight{ false };
ReductionTreeNode(Node* parent, int refCount, Body& left) : TreeNode(parent, refCount), m_left(left)
{
}
~ReductionTreeNode()
{
if (m_hasRight)
reinterpret_cast<Body*>(m_space)->~Body();
}
void reduce()
{
if (m_hasRight)
m_left.reduce(*reinterpret_cast<Body*>(m_space));
}
};
template <class TNode>
void collapse(Node* n)
{
for (;;)
{
CARB_ASSERT(n);
CARB_ASSERT(n->m_refCount.load(std::memory_order_relaxed) > 0);
if (--n->m_refCount > 0)
return;
Node* parent = n->m_parent;
if (!parent)
break;
auto self = static_cast<TNode*>(n);
self->reduce();
self->~TNode();
carb::deallocate(self);
n = parent;
}
// Done, wakeup the waiting thread
auto& wait = static_cast<WaitNode*>(n)->m_wait;
wait.store(false, std::memory_order_release);
carb::getCachedInterface<ITasking>()->futexWakeup(wait, 1);
}
template <class T, size_t N = 1>
class AlignedArray
{
public:
T* begin() const
{
auto ptr = reinterpret_cast<uintptr_t>(&m_aligned);
return reinterpret_cast<T*>(ptr);
}
T* end() const
{
return begin() + N;
}
private:
alignas(alignof(T)) std::uint8_t m_aligned[N * sizeof(T)];
};
template <class T, Depth_t MaxCapacity>
class RangeQueue
{
public:
RangeQueue(const T& val) : m_head(0), m_tail(0), m_size(1)
{
m_depth[0] = 0;
new (static_cast<void*>(m_pool.begin())) T(val);
}
~RangeQueue()
{
while (!empty())
pop_back();
}
bool empty() const
{
return m_size == 0;
}
Depth_t size() const
{
return m_size;
}
void splitToFill(Depth_t maxDepth)
{
while (m_size < MaxCapacity && canDivide(maxDepth))
{
auto prev = m_head;
m_head = (m_head + 1) % MaxCapacity;
new (m_pool.begin() + m_head) T(std::move(m_pool.begin()[prev]));
m_pool.begin()[prev].~T();
new (m_pool.begin() + prev) T(m_pool.begin()[m_head], Split);
m_depth[m_head] = ++m_depth[prev];
++m_size;
}
}
void pop_back()
{
CARB_ASSERT(m_size > 0);
m_pool.begin()[m_head].~T();
--m_size;
m_head = (m_head + MaxCapacity - 1) % MaxCapacity;
}
void pop_front()
{
CARB_ASSERT(m_size > 0);
m_pool.begin()[m_tail].~T();
--m_size;
m_tail = (m_tail + 1) % MaxCapacity;
}
T& back()
{
CARB_ASSERT(m_size > 0);
return m_pool.begin()[m_head];
}
T& front()
{
CARB_ASSERT(m_size > 0);
return m_pool.begin()[m_tail];
}
Depth_t frontDepth() const
{
CARB_ASSERT(m_size > 0);
return m_depth[m_tail];
}
Depth_t backDepth() const
{
CARB_ASSERT(m_size > 0);
return m_depth[m_head];
}
bool canDivide(Depth_t maxDepth)
{
return backDepth() < maxDepth && back().canDivide();
}
private:
Depth_t m_head, m_tail, m_size;
Depth_t m_depth[MaxCapacity];
AlignedArray<T, MaxCapacity> m_pool;
};
class DefaultPartitioner
{
public:
static constexpr unsigned kRangeQueueSize = 8;
DefaultPartitioner(detail::IApplyRange& ar, size_t concurrency)
: m_ar(ar), m_divisor(concurrency), m_maxDepth(kInitDepth)
{
m_divisor *= kInitialChunks;
}
DefaultPartitioner(DefaultPartitioner& other, Split_t)
: m_ar(other.m_ar), m_divisor(split(other)), m_maxDepth(other.m_maxDepth)
{
}
template <class ApplyRangeType, class Range>
void execute(ApplyRangeType& ar, Range& range)
{
while (range.canDivide() && this->canDivide())
{
ar.divideWork();
}
this->doWork(ar, range);
}
bool canDivide()
{
if (m_divisor > 1)
return true;
if (m_divisor && m_maxDepth)
{
--m_maxDepth;
m_divisor = 0;
return true;
}
return false;
}
template <class Task>
bool hasDemand(Task& t)
{
if (TreeNode::isPeerStolen(t))
{
m_maxDepth += kDemandAddDepth;
return true;
}
return false;
}
template <class ApplyRangeType, class Range>
void doWork(ApplyRangeType& ar, Range& range)
{
if (!range.canDivide() || !this->maxDepth())
{
ar.runBody(range);
}
else
{
RangeQueue<Range, kRangeQueueSize> rangeQueue(range);
do
{
rangeQueue.splitToFill(this->maxDepth());
if (this->hasDemand(ar))
{
if (rangeQueue.size() > 1)
{
ar.divideWork(rangeQueue.front(), rangeQueue.frontDepth());
rangeQueue.pop_front();
continue;
}
if (rangeQueue.canDivide(this->maxDepth()))
continue;
}
ar.runBody(rangeQueue.back());
rangeQueue.pop_back();
} while (!rangeQueue.empty());
}
}
template <class Task>
bool evalStolen(Task& t, TaskBundle* tb)
{
if (!this->m_divisor)
{
this->m_divisor = 1;
if (t.m_parent->m_refCount >= 2 && m_ar.isStolen(tb))
{
TreeNode::markTaskStolen(t);
if (!m_maxDepth)
++m_maxDepth;
m_maxDepth += kDemandAddDepth;
return true;
}
}
return false;
}
Depth_t maxDepth() const
{
return m_maxDepth;
}
void rebaseDepth(Depth_t base)
{
CARB_ASSERT(base <= m_maxDepth);
auto temp = m_maxDepth - base;
m_maxDepth = Depth_t(temp); // GCC7 -Werror=conversion fix
}
private:
template <class Range, class Exec, class Partitioner>
friend struct ApplyRange;
static size_t split(DefaultPartitioner& other)
{
return other.m_divisor /= size_t(2);
}
detail::IApplyRange& m_ar;
size_t m_divisor;
Depth_t m_maxDepth;
};
struct ApplyRangeState
{
bool isExternalThread;
bool recursive;
ThreadLocalStorage* tls;
void* arg;
};
template <class Range, class Exec, class Partitioner>
struct ApplyRange
{
detail::IApplyRange& m_ar;
Range m_range;
const Exec m_exec;
Node* m_parent;
ApplyRange* m_topmost;
Partitioner m_partition;
bool m_onStack{ false };
void execute()
{
m_partition.evalStolen(*this, m_ar.currentTask());
m_partition.execute(*this, m_range);
finalize();
}
static void execute(void* p)
{
static_cast<ApplyRange*>(p)->execute();
}
void finalize()
{
auto parent = m_parent;
auto onStack = m_onStack;
if (!onStack)
this->~ApplyRange();
collapse<TreeNode>(parent);
if (!onStack)
carb::deallocate(this);
}
ApplyRange(detail::IApplyRange& ar, const Range& range, const Exec& exec, Partitioner& partitioner)
: m_ar(ar),
m_range(range),
m_exec(exec),
m_parent(nullptr),
m_topmost(this),
m_partition(partitioner),
m_onStack(true)
{
}
ApplyRange(ApplyRange& parent, Split_t)
: m_ar(parent.m_ar),
m_range(parent.m_range, Split),
m_exec(parent.m_exec),
m_parent(nullptr),
m_topmost(parent.m_topmost),
m_partition(parent.m_partition, Split)
{
}
ApplyRange(ApplyRange& parent, const Range& range, Depth_t depth)
: m_ar(parent.m_ar),
m_range(range),
m_exec(parent.m_exec),
m_parent(nullptr),
m_topmost(parent.m_topmost),
m_partition(parent.m_partition, Split)
{
m_partition.rebaseDepth(depth);
}
static void Run(detail::IApplyRange& ar, const Range& range, const Exec& exec, Partitioner& partitioner)
{
CARB_ASSERT(!range.empty());
ApplyRange taskData{ ar, range, exec, partitioner };
CARB_ASSERT(taskData.m_onStack);
ApplyRangeState state = {};
ar.beginApplyRange(state, &taskData);
WaitNode waiter;
taskData.m_parent = &waiter;
// Run the first task locally
taskData.execute();
ar.finishApplyRange(state, &ApplyRange::execute,
[](void* arg) -> void* { return static_cast<ApplyRange*>(arg)->m_topmost; }, waiter.m_wait);
}
void runBody(Range& range)
{
CARB_ASSERT(!range.empty());
m_exec(range.begin(), range.end());
}
void divideWork()
{
divideWorkInternal(*this, Split);
}
void divideWork(const Range& range, Depth_t depth)
{
divideWorkInternal(*this, range, depth);
}
private:
template <class... Args>
void divideWorkInternal(Args&&... args)
{
// Both of these are allocated using the same size so that we get better/faster heap memory reuse
static_assert(sizeof(ApplyRange) <= (2 * CARB_CACHELINE_SIZE), "broken assumption");
auto taskData =
new (carb::allocate(2 * CARB_CACHELINE_SIZE, CARB_CACHELINE_SIZE)) ApplyRange(std::forward<Args>(args)...);
CARB_ASSERT(!taskData->m_onStack);
static_assert(sizeof(TreeNode) <= (2 * CARB_CACHELINE_SIZE), "broken assumption");
taskData->m_parent = m_parent =
new (carb::allocate(2 * CARB_CACHELINE_SIZE, CARB_CACHELINE_SIZE)) TreeNode(m_parent, 2);
m_ar.queueTask(execute, taskData);
}
};
template <class Range, class Body, class Partitioner>
struct ParallelReduce
{
detail::IApplyRange& m_ar;
Range m_range;
Body* m_body;
Node* m_parent;
ParallelReduce* m_topmost;
Partitioner m_partition;
bool m_onStack{ false };
bool m_right{ false };
using TreeNodeType = ReductionTreeNode<Body>;
void execute()
{
m_partition.evalStolen(*this, m_ar.currentTask());
CARB_ASSERT(m_parent);
if (m_right && m_parent->m_refCount.load(std::memory_order_acquire) == 2)
{
TreeNodeType* parent = static_cast<TreeNodeType*>(m_parent);
m_body = new (parent->m_space) Body(*m_body, Split);
parent->m_hasRight = true;
}
CARB_ASSERT(m_body);
m_partition.execute(*this, m_range);
finalize();
}
static void execute(void* p)
{
static_cast<ParallelReduce*>(p)->execute();
}
void finalize()
{
auto parent = m_parent;
auto onStack = m_onStack;
if (!onStack)
this->~ParallelReduce();
collapse<TreeNodeType>(parent);
if (!onStack)
carb::deallocate(this);
}
ParallelReduce(detail::IApplyRange& ar, const Range& range, Body& body, Partitioner& partitioner)
: m_ar(ar),
m_range(range),
m_body(&body),
m_parent(nullptr),
m_topmost(this),
m_partition(partitioner),
m_onStack(true)
{
}
ParallelReduce(ParallelReduce& parent, Split_t split)
: m_ar(parent.m_ar),
m_range(parent.m_range, split),
m_body(parent.m_body),
m_parent(nullptr),
m_topmost(parent.m_topmost),
m_partition(parent.m_partition, split),
m_right(true)
{
parent.m_right = false;
}
ParallelReduce(ParallelReduce& parent, const Range& range, Depth_t depth)
: m_ar(parent.m_ar),
m_range(range),
m_body(parent.m_body),
m_parent(nullptr),
m_topmost(parent.m_topmost),
m_partition(parent.m_partition, Split),
m_right(true)
{
m_partition.rebaseDepth(depth);
parent.m_right = false;
}
static void Run(detail::IApplyRange& ar, const Range& range, Body& body, Partitioner& partitioner)
{
CARB_ASSERT(!range.empty());
ParallelReduce taskData{ ar, range, body, partitioner };
CARB_ASSERT(taskData.m_onStack);
ApplyRangeState state = {};
ar.beginApplyRange(state, &taskData);
WaitNode waiter;
taskData.m_parent = &waiter;
// Run the first task locally
taskData.execute();
ar.finishApplyRange(state, &ParallelReduce::execute,
[](void* arg) -> void* { return static_cast<ParallelReduce*>(arg)->m_topmost; },
waiter.m_wait);
}
void runBody(Range& range)
{
CARB_ASSERT(!range.empty());
(*m_body)(range.begin(), range.end());
}
void divideWork()
{
divideWorkInternal(*this, Split);
}
void divideWork(const Range& range, Depth_t depth)
{
divideWorkInternal(*this, range, depth);
}
private:
template <class... Args>
void divideWorkInternal(Args&&... args)
{
// Both of these are allocated using the same size so that we get better/faster heap memory reuse
static_assert(sizeof(ParallelReduce) <= (2 * CARB_CACHELINE_SIZE), "broken assumption");
auto taskData = new (carb::allocate(2 * CARB_CACHELINE_SIZE, CARB_CACHELINE_SIZE))
ParallelReduce(std::forward<Args>(args)...);
CARB_ASSERT(!taskData->m_onStack);
static_assert(sizeof(ParallelReduce) <= (2 * CARB_CACHELINE_SIZE), "broken assumption");
taskData->m_parent = m_parent =
new (carb::allocate(2 * CARB_CACHELINE_SIZE, CARB_CACHELINE_SIZE)) TreeNodeType(m_parent, 2, *m_body);
m_ar.queueTask(execute, taskData);
}
};
} // namespace detail
} // namespace tasking
} // namespace carb
#endif