Synchronization

Because each task is assigned a fiber for its entire lifetime, that fiber can be scheduled to run on any thread in the thread pool. When a task requests to sleep or block until woken, the fiber is immediately unscheduled and the thread picks a new fiber to run. Later when the fiber is woken, any idle thread can resume the task. This means that standard synchronization primitives like std::mutex will not function properly; carb.tasking therefore implements its own primitives that are fiber aware–that is, they notify the scheduler that the thread can switch to a different task:

Caution

Using a standard synchronization primitive can bottleneck the system as the thread will not be aware that it can run another task.

Danger

Causing a fiber to sleep or wait while holding any std mutex lock will cause difficult-to-diagnose errors! The carb.tasking plugin supports a Debug Setting to detect this situation.

Note

Only the synchronization primitives in the carb::tasking namespace are fiber-aware. Other Carbonite synchronization primitives are not-fiber aware as they are designed to stand alone without the foundation of the Carbonite Framework.

Parallel-For / applyRange

The carb.tasking plugin has means to process a loop in a parallel fashion: carb::tasking::ITasking::applyRange() or carb::tasking::ITasking::parallelFor().

These functions will call an invocable object (i.e. lambda or task function) repeatedly (and likely in parallel) for every index within a range of [0..range) or [begin..end) with an optional step value. The function does not return until the invocable has been completed for every index in the given range.

std::atomic_int calls{ 0 };
// for (int i = 100; i != 600; i += 5)
tasking->parallelFor(100, 600, 5, [&calls] (int val) {
    assert(val >= 100);
    assert(val < 600);
    assert((val % 5) == 0); // Should be multiple of 5.
    calls++;
});
CHECK_EQ(calls, 100);

Warning

A minimum amount of required work must be done for carb::tasking::ITasking::applyRange() to be an improvement over serial processing. This amount varies per system, but typically a rule of thumb is a minimum of 1,000 instructions should be executed for carb::tasking::ITasking::applyRange() to produce a benefit. Profiling is highly recommended.

Warning

The carb::tasking::ITasking::applyRange() family of functions create tasks that do not necessarily run in separate fibers the way most other carb.tasking tasks would. Therefore, they cannot be suspended and do not have an identifying TaskContext.

An alternative to carb::tasking::ITasking::applyRange() that is slightly more complicated to use but can result in slightly better performance is carb::tasking::ITasking::applyRangeBatch(). The invocable object is called less frequently but with a contiguous range of items to process, which reduces call overhead. carb::tasking::ITasking::applyRange() is an adapter for carb::tasking::ITasking::applyRangeBatch() internally.

Batch size heuristic

carb::tasking::ITasking::applyRange() and carb::tasking::ITasking::applyRangeBatch() perform operations in parallel while knowing very little about the operations themselves. These functions must be performant from extreme cases of small range but long operation, to large range with very short operations. Unfortunately, carb::tasking::ITasking::applyRange() knows no statistical information about the Callable operation.

carb::tasking::ITasking::applyRange() works by bifurcating range to a certain initial depth, slicing off half the remaining work for another task to accomplish at each deeper step. The calling thread will participate, even if it is not a carb.tasking worker thread. If the system is busy, no threads will steal the other work tasks, so the calling thread will eventually tackle them as well. However, if the system has bandwidth, the tasks will be stolen by worker threads, which will indicate to the carb::tasking::ITasking::applyRange() system that further bifurcation is warranted.

Recursive applyRange

carb::tasking::ITasking::applyRange() (and carb::tasking::ITasking::applyRangeBatch()) self-limit in terms of recursion. Since these calls are synchronous (in that they must finish before returning to the caller), they are the tasking system’s highest priority. Consider this example: parallel processing of a 100x100 two-dimensional array is desired. An carb::tasking::ITasking::applyRange() call is made for the X dimension, and within the lambda, another carb::tasking::ITasking::applyRange() call is made for the Y dimension. This means that potentially 101 carb::tasking::ITasking::applyRange() calls will be made that must all complete before the first carb::tasking::ITasking::applyRange() call can return.

Each carb::tasking::ITasking::applyRange() call will divide its workload as described above which potentially leads to a large number of tasks being created, but this task allocation is highly optimized and has very low contention. Furthermore, worker threads spawn these tasks into a thread-specific task queue and will either execute them directly when finished with their primary task, or rely on other worker threads stealing the tasks to assist the system in reducing the workload.

When tasks are stolen, this indicates to the carb::tasking::ITasking::applyRange() system that bandwidth is available and further subdivision of the range will occur to split off more tasks to hungry worker threads. If tasks are not being stolen, the calling thread instead will process the larger ranges in chunks as this is the fastest way to resolve the range. In this manner the system self-adapts to busyness.

Historically, recursive calls to carb::tasking::ITasking::applyRange() were detected and the number of tasks that they spawned was artificially limited, which could lead to a degenerative case where parallelism was reduced over fears of task overload. Another historical method was to create a FIFO queue of in-progress carb::tasking::ITasking::applyRange() calls, but this creates a significant point of contention.

Synchronous Tasks

In some cases, it is necessary to wait for a task to complete. All of the carb::tasking::ITasking::addTask() functions return a carb::tasking::Future object that can be used to wait for a task to complete (and even receive a return value from a task). Alternately, a carb::tasking::SemaphoreWrapper or carb::tasking::ConditionVariableWrapper could be employed to wait until a specific condition is met.

There is also a function carb::tasking::ITasking::awaitSyncTask() which will take std::invoke-style parameters and execute them synchronously.

TaskGroup

It can be advantageous to know when a group of tasks is pending or has completed. This is an oft-used shutdown paradigm so that a system waits for all tasks to complete before proceeding with shutdown. This can be accomplished with a very simple class called carb::tasking::TaskGroup. This class acts like a reverse semaphore: when a task is tracked by the TaskGroup, the TaskGroup is unsignaled. Only when the TaskGroup is empty (all tasks have completed) does it become signaled.

Waiting

The tasking library has a generic wait function: carb::tasking::ITasking::wait() (and variants carb::tasking::ITasking::try_wait(), carb::tasking::ITasking::try_wait_for() and carb::tasking::ITasking::try_wait_until()) which can be used to wait on any tasking element that corresponds to the carb::tasking::RequiredObject named requirement. This includes carb::tasking::Future, carb::tasking::Counter, and carb::tasking::TaskGroup.

Multiple elements corresponding to the carb::tasking::RequiredObject named requirement can be grouped together using carb::tasking::Any and carb::tasking::All groupers, and the groupers can be nested to form complex wait mechanisms.

Throttling

In some cases it may be desirable to limit how many concurrent tasks can execute. This can be accomplished by using a Semaphore. Create the semaphore with a count equal to the maximum desired concurrency. The semaphore can then be passed to carb::tasking::ITasking::addThrottledTask() which will acquire() the semaphore before executing the task and release() the semaphore upon task completion.

This effectively limits the number of concurrent tasks to the initial count of the Semaphore, assuming the tasks have been queued with carb::tasking::ITasking::addThrottledTask() and the same Semaphore object.

The “Main” Priority

In some cases it may be necessary for certain tasks to execute only in the context of a single consistent thread, typically the main thread (or initial thread for the application). For this reason, a special Priority value exists: carb::tasking::Priority::eMain. Any task queued with this Priority will only execute when the “main” thread calls carb::tasking::ITasking::executeMainTasks().

The first thread that calls carb::tasking::ITasking::executeMainTasks() indicates which thread will be the “main” thread; any attempt to call the function on a different thread will call std::terminate(). The only way to change the “main” thread is to unload and reload carb.tasking.plugin.

These main-priority tasks are also assigned a fiber for their duration, and can still yield to sleep or block on I/O. When resumed, they will only execute on the “main” thread. When carb::tasking::ITasking::executeMainTasks() is called, it runs each main-priority task until it finishes or yields. As such, it is designed to be called periodically by the main loop of the application.

Counters (Deprecated)

Caution

Counters are deprecated.

There exists a simple synchronization mechanism: carb::tasking::Counter which acts like a reverse-semaphore: it becomes “signaled” when the count reaches a target value (typically 0). Counter objects can be created with carb::tasking::ITasking::createCounter() or more preferably with carb::tasking::CounterWrapper.

Deprecation Warning

However, this construct is deprecated. As it is a non-standard synchronization primitive, it is highly recommended that more standard constructs such as carb::tasking::Semaphore and carb::tasking::Future are used instead. Given that a large body of existing code uses them, and some things are still most easily expressed via Counter, as of version 1.6 of carb.tasking.plugin they remain supported.

Signaling task completion

Counters can be used to signal task completion. All of the carb::tasking::ITasking::addTask() variant functions take a carb::tasking::Trackers group which can contain zero or more counters. Prior to addTask returning, all of the given Counter objects are incremented by one. When the task completes, all of the Counter objects are decremented by one. If the Counter has reached its target (typically zero), the Counter becomes signaled and any threads/tasks waiting on it may resume (see carb::tasking::ITasking::yieldUntilCounter()).

Manual Counter Manipulation

Although it is deprecated and should be avoided whenever possible, there exists functionality to atomically increment/decrement Counter objects and check whether they are signaled.

Sub-tasks

It may also be advantageous for a task to be queued but wait for a previous task to complete. One option is that the first thing a task function does is yield waiting on a Counter. However, is inefficient in that it requires assigning a fiber to the task and starting the task only to immediately stop and wait. For this reason, carb::tasking::ITasking::addSubTask() exists, which takes zero or more Counter objects through a carb::tasking::RequiredObject helper struct. If more than one Counter object is required, they must be grouped with carb::tasking::All or carb::tasking::Any helper structs (which may also be nested). This is more efficient in that it allows the scheduler to understand that a task need not be assigned a fiber and started until one or more Counter objects have been signaled.