Parallel Algorithms#
carb.tasking includes several parallel algorithms to better make use of parallelism. There is an inherent cost to using parallelism (such as dispatching parallel tasks, waking threads, etc.), so adding parallelism may not always increase performance. The minimum amount varies per system, but a rule of thumb is a minimum of 1,000 instructions should be executed for these algorithms to produce a benefit. Profiling is recommended.
Other parallel algorithms can be created from the tools provided by carb.tasking, but the following are natively supported.
Parallel-For / applyRange#
The carb.tasking plugin has means to process a loop in a parallel fashion: carb::tasking::ITasking::applyRange() or carb::tasking::ITasking::parallelFor().
These functions will call an invocable object (i.e. lambda or task function) repeatedly (and likely in parallel) for every index within a range of [0..range) or [begin..end) with an optional step value.
The function does not return until the invocable has been completed for every index in the given range.
std::atomic_int calls{ 0 };
// for (int i = 100; i != 600; i += 5)
tasking->parallelFor(100, 600, 5, [&calls] (int val) {
assert(val >= 100);
assert(val < 600);
assert((val % 5) == 0); // Should be multiple of 5.
calls++;
});
CHECK_EQ(calls, 100);
Warning
The carb::tasking::ITasking::applyRange() family of functions create tasks that do not necessarily run in separate fibers the way most other
carb.tasking tasks would. Therefore, they cannot be suspended and do not have an identifying TaskContext.
An alternative to carb::tasking::ITasking::applyRange() that is slightly more complicated to use but can result in slightly better performance is carb::tasking::ITasking::applyRangeBatch().
The invocable object is called less frequently but with a contiguous range of items to process, which reduces call overhead.
carb::tasking::ITasking::applyRange() is an adapter for carb::tasking::ITasking::applyRangeBatch() internally.
Batch size heuristic#
carb::tasking::ITasking::applyRange() and carb::tasking::ITasking::applyRangeBatch() perform operations in parallel while knowing very little about the operations themselves.
These functions must be performant from extreme cases of small range but long operation, to large range with
very short operations. Unfortunately, carb::tasking::ITasking::applyRange() knows no statistical information about the Callable operation.
carb::tasking::ITasking::applyRange() works by bifurcating range to a certain initial depth, slicing off half the remaining work for another
task to accomplish at each deeper step. The calling thread will participate, even if it is not a carb.tasking worker
thread. If the system is busy, no threads will steal the other work tasks, so the calling thread will eventually tackle
them as well. However, if the system has bandwidth, the tasks will be stolen by worker threads, which will indicate to
the carb::tasking::ITasking::applyRange() system that further bifurcation is warranted.
carb::tasking::ITasking::applyRangeBatch() may also take a batchHint parameter that the system will use as a recommended batch size.
The system may choose a larger or slightly smaller batch size. Batch sizes are not required to be uniform across multiple
invocations of the function passed to carb::tasking::ITasking::applyRangeBatch().
Recursive applyRange#
carb::tasking::ITasking::applyRange() (and carb::tasking::ITasking::applyRangeBatch()) self-limit in terms of recursion. Since these calls are synchronous (in that they must
finish before returning to the caller), they are the tasking system’s highest priority. Consider this example: parallel
processing of a 100x100 two-dimensional array is desired. An carb::tasking::ITasking::applyRange() call is made for the X dimension, and within
the lambda, another carb::tasking::ITasking::applyRange() call is made for the Y dimension. This means that potentially 101 carb::tasking::ITasking::applyRange() calls
will be made that must all complete before the first carb::tasking::ITasking::applyRange() call can return.
Each carb::tasking::ITasking::applyRange() call will divide its workload as described above which potentially leads to a large number of tasks
being created, but this task allocation is highly optimized and has very low contention. Furthermore, worker threads
spawn these tasks into a thread-specific task queue and will either execute them directly when finished with their primary
task, or rely on other worker threads stealing the tasks to assist the system in reducing the workload.
When tasks are stolen, this indicates to the carb::tasking::ITasking::applyRange() system that bandwidth is available and further subdivision of
the range will occur to split off more tasks to hungry worker threads. If tasks are not being stolen, the calling thread
instead will process the larger ranges in chunks as this is the fastest way to resolve the range. In this manner the system
self-adapts to busyness.
Historically, recursive calls to carb::tasking::ITasking::applyRange() were detected and the number of tasks that they spawned was artificially
limited, which could lead to a degenerative case where parallelism was reduced over fears of task overload. Another
historical method was to create a FIFO queue of in-progress carb::tasking::ITasking::applyRange() calls, but this creates a significant point of
contention.
Parallel-Sort#
The include/carb/tasking/ParallelSort.h file includes utility functions to provide a non-stable parallel sort:
carb::tasking::parallelSort(). The sort algorithm automatically checks for a minimum threshold of 512 entries
before dispatching a parallel task, otherwise the sort happens serially.
Parallel-Reduce#
carb::tasking::ITasking::parallelReduce() is a parallel implementation of a map/reduce algorithm,
or perhaps more aptly described as split-apply-combine. In this algorithm, a range is provided that creates an index
space [0..range). The Body of the algorithm must be applied over each index in the space. If parallelism is available
then portions of the index space are split off, applied in parallel, and then combined back with other results.
The documentation for carb::tasking::ITasking::parallelReduce() describes the required makeup of the Body template class.
An example of computing Pi in parallel:
inline double PiKernel(size_t i, double step)
{
double dx = (double(i) + double(0.5)) * step;
return 4.0 / (1.0 + dx * dx);
}
inline double PiSlice(size_t slice, size_t end, double step)
{
double pi = 0.;
do
{
pi += PiKernel(slice++, step);
} while (slice < end);
return pi;
}
struct PiReduce
{
double m_pi = 0.;
double m_step;
constexpr PiReduce(double step) noexcept : m_step(step)
{
}
// Splitting constructor
PiReduce(PiReduce& other, carb::tasking::Split_t) noexcept : m_pi(0.), m_step(other.m_step)
{
}
// Apply
void operator()(size_t begin, size_t end)
{
// May be called multiple times, so accumulate all results
m_pi += PiSlice(begin, end, m_step);
}
// Combine into *this
void reduce(const PiReduce& other)
{
m_pi += other.m_pi;
}
};
inline double computePiParallel(carb::tasking::ITasking* tasking, size_t intervals = 1'000'000'000)
{
auto step = 1.0 / intervals;
PiReduce body(step);
tasking->parallelReduce(intervals, 0, body);
body.m_pi *= step;
return body.m_pi;
}
inline double computePiSerial(size_t intervals = 1'000'000'000, size_t chunk_size = 4096)
{
double pi = 0.;
double step = 1.0 / intervals;
size_t tail = intervals % chunk_size;
size_t lastIndex = intervals - tail;
for (size_t slice = 0; slice != lastIndex; slice += chunk_size)
pi += PiSlice(slice, slice + chunk_size, step);
if (tail)
pi += PiSlice(lastIndex, lastIndex + tail, step);
pi *= step;
return pi;
}
On a AMD Threadripper PRO 5975W running Windows 11, the serial computation takes 1,173 ms and the above parallel computation achieves results in 43 ms–a 96% speed increase.