Anatomy of a Task
While a task itself is a simple construct, there is a need to handle various aspects of a task: when it can start, knowing when it has finished and retrieving any return values from it.
Let’s take a look at the most basic way to queue a task: carb::tasking::ITasking::addTask()
/**
* Runs the given function-like object as a task.
*
* @param priority The priority of the task to execute.
* @param trackers (optional) Zero or more counters that are incremented before addTask() returns and are
* decremented upon task completion. These counters can be used to determine task completion.
* @param f A C++ "Callable" object (i.e. functor, lambda, [member] function ptr) that optionally returns a value
* @param args Arguments to pass to @p f
* @return A Future based on the return type of @p f
*/
template <class Callable, class... Args>
auto addTask(Priority priority, Trackers&& trackers, Callable&& f, Args&&... args);
Priority
Every task has an associated priority of the value carb::tasking::Priority
. As expected, higher priority tasks
execute before lower priority tasks. As carb.tasking.plugin is a cooperative (as opposed to preemptive) multi-tasking
scheduler, the higher priority tasks will complete (or yield) before lower priority tasks will begin executing.
A special eMain
priority exists to specify that a task should only execute on the “main” thread.
Trackers
Trackers are a powerful feature for tasks. The carb::tasking::Trackers
structure accepts zero or more objects
of type carb::tasking::Tracker
which in turn accept multiple types. These types function as input/output
parameters to the tasking system. Each provided Tracker will be “entered” before the addTask
function returns (and before
the task can possibly start), and each provided Tracker will be “exited” when the provided Callable
has completed. Successfully
cancelling a task causes all Trackers to be “exited.”
Note
Trackers are exited after the provided Callable
has completed, but slightly before the TaskContext
for the task becomes signaled (and before any returned Future
becomes signaled). See Task Completion and Guarantees.
No Trackers
Either nullptr
or an empty initializer list {}
can be specified to indicate that no Tracker objects are desired.
“Enter” action: None.
“Exit” action: None.
TaskGroup Tracker
The carb::tasking::TaskGroup
object is conceptually an inverse semaphore: It is signaled when empty and will
block when tasks enter it.
“Enter” action: The carb::tasking::TaskGroup::enter()
function is called, incrementing the number of tasks by one.
“Exit” action: The carb::tasking::TaskGroup::leave()
function is called, decrementing the number of tasks by one. If the number of tasks in the task group becomes zero, the TaskGroup
becomes signaled.
TaskGroup
can be passed either via reference or pointer. If a null pointer is passed it will be ignored, which makes
an effective means of supporting optional TaskGroup
objects.
Warning
It is the caller’s responsibility to ensure that the TaskGroup
object is valid until carb::tasking::TaskGroup::try_wait()
would return true
.
TaskContext Tracker
Either a pointer or a reference to a carb::tasking::TaskContext
can be passed as a Tracker. If the pointer is
nullptr
it will be ignored.
“Enter” action: The pointer (if non-nullptr
) or reference will receive the associated TaskContext
for the task.
“Exit” action: None.
Future<void> Tracker
Regardless of the return value of the invocable object passed to addTask
, a pointer or a reference to a carb::tasking::Future<void>
can be populated as a Tracker. The Future<void>
is a specialization of carb::tasking::Future
that is simple wrapper around a TaskContext
.
Like other Tracker types, a nullptr
will be ignored to facilitate optional types.
“Enter” action: The pointer (if non-nullptr
) or reference will receive the associated Future<void>
for the task.
“Exit” action: None.
Counter Tracker
Despite being deprecated and to promote backwards-compatibility, objects convertible to pointers of carb::tasking::Counter
are accepted as Tracker objects. Similarly to other Tracker types, nullptr
is ignored.
“Enter” action: Calls carb::tasking::ITasking::fetchAddCounter()
.
“Exit” action: Calls carb::tasking::ITasking::fetchSubCounter()
.
Warning
It is the caller’s responsibility to ensure that the Counter
object is valid until carb::tasking::ITasking::try_wait()
on the Counter
would return true
.
Invocable Objects
The Callable
and optional Args
parameters work together to provide std::invoke-like
execution support. Nearly anything that is callable via std::invoke
is supported.
Adding a task implicitly creates a shared state as for carb::tasking::Promise
. The carb::tasking::Future
from the Promise
is returned to the caller, and the return value of Callable
is passed similarly to carb::tasking::Promise::set_value()
.
Therefore, when the Callable
returns a value, the Future
becomes ready and the return value can be retrieved.
If any Args
are provided, they are captured in a std::tuple
using std::decay
and passed as distinct arguments to Callable
. The
Args
are forwarded using std::forward
to be captured and passed to Callable
with std::move
.
If a task is canceled or throws an uncaught exception, the captured Args
will be destroyed.
Warning
It is the caller’s responsibility to ensure that the invocable object, lambda captures, and passed arguments remain
valid until the Future<>
returned from carb::tasking::ITasking::addTask()
becomes signaled. For instance,
passing a member-function pointer to addTask
and a this
pointer requires that the caller ensure that the this
pointer remains valid, otherwise undefined behavior will occur. In general it is better to pass temporaries to addTask
and/or use std::shared_ptr
/ std::weak_ptr
so that tasks may be abandoned without repercussions (Best Practices).
Lambda
One of the most common methods involves lambdas which can capture any arguments. Lambdas are powerful but can make debugging difficult as they are often inlined and do not have a human readable name. Part of the power of lambdas is that asynchronous code is easily visible next to synchronous code.
Warning
If parameters are captured by reference (&
), or pointers are captured, it is the caller’s responsibility to ensure that the captured item exists for the duration of the task.
auto future = tasking->addTask(Priority::eDefault, {}, [my, captured, &variables] {
// Do asynchronous work
// ...
return result;
});
// Do synchronous work
// ...
std::cout << future.get(); // await result and print
Variables that are not passed in a lambda can be passed as arguments. This is especially helpful for loops.
TaskGroup tg;
for (int i = 0; i != kMaxTasks; ++i)
{
tasking->addTask(Priority::eDefault, tg, [] (int taskNumber) {
// Do something asynchronous based on taskNumber
// ...
}, i); // Note the passing of i as an additional parameter
}
tg.wait();
Function and Pointer-to-Function
C-style pointers-to-functions can be passed to addTask
as well:
void fooAsync()
{
// Do something async
// ...
}
tasking->addTask(Priority::eDefault, {}, fooAsync);
Additional parameters passed to addTask
will be passed as parameters to the function:
void fooAsync(std::unique_ptr<int> i)
{
// Do something async with i
// ...
}
tasking->addTask(Priority::eDefault, {}, fooAsync, std::make_unique(5));
Member Functions
It is also possible to pass member functions to addTask
and pass the this
parameter as the first additional parameter:
Warning
It is the caller’s responsibility to ensure that the this
parameter remains valid until the task has completed.
class Foo
{
void myMemberFunc()
{
printf("Foo ptr: %p\n", this);
}
};
Foo fooOnStack;
auto future = tasking->addTask(Priority::eDefault, {}, &Foo::myMemberFunc, &fooOnStack);
future.wait();
Functors
Objects with operator()
are also supported:
struct FiveAdder
{
int operator() (int val)
{
return val + 5;
}
};
auto future = tasking->addTask(Priority::eDefault, {}, FiveAdder(), 5);
CHECK_EQ(future.get(), 10);
Execution
Notifying threads of work
When addTask
is called, the task goes into a queue within the scheduler. Threads belonging to the carb.tasking system
add work to their own queue, and external threads will contribute to a global queue within the scheduler. If the system is idle,
a thread will be woken, that will then wake other threads causing a chain reaction.
When worker threads wake, they search for something to do. The priority order of a worker thread’s jobs are:
The thread’s pinned tasks
The thread’s local task queue (youngest tasks)
The system task queue
Stolen work from another thread’s local task queue (oldest tasks)
Threads remain active looking for work to do as long as the system says there might be work.
Note
A task can begin executing even before addTask
returns!
Assigning a fiber
Before a task can begin executing, it must be assigned a special stack called a fiber. A fiber is like a thread that the scheduler can choose to run at any point (as opposed to a thread which the OS will schedule and run). Typically a task is assigned a fiber immediately before it begins executing, but certain operations will assign a fiber immediately even if the task has not started. (Main Article)
Fibers are a limited resource within the system. The absolute maximum fibers is specified by carb::tasking::kMaxFibers
and the maximum is configured by carb::tasking::ITasking::changeParameters
. If the system runs out of fibers,
tasks cannot execute until existing tasks complete and relinquish their fibers. For this reason, the system provides some
support for adding sub-tasks that have a dependency rather than requiring code that immediately waits for a resource upon
starting. This is accomplished through the carb::tasking::ITasking::addSubTask()
function and allows fiber allocation
to be deferred until the dependency condition is fulfilled.
Invoking the Callable
Once a fiber is assigned, the thread will switch to the fiber and invoke the Callable
passed to addTask
. As this
happens within the context of the fiber, this is known as “task context.” By having an underlying fiber, calls back into
the scheduler to wait won’t block the thread, but will “put the fiber to sleep” and run a different task (or wait for a new
task) until the fiber is woken again. This allows tasks to call “await”-style functions without blocking worker threads.
See Best Practices for more info.
Notifying Trackers
When the Callable
returns, any carb::tasking::Future
returned from addTask
will receive the return
value from the Callable
and further attempts to wait()
on the Future
will return immediately.
Note that multiple Tracker
objects are notified in a non-deterministic order and gaps of time exist where one Tracker
may have been notified that the task completed, but the Future
or another Tracker
may not yet have received notification.
Therefore, the following example is bad practice:
TaskGroup tg1;
auto tg2 = std::make_unique<TaskGroup>();
tasking->addTask(Priority::eDefault, { tg, tg2 }, [] { /* do something async */ });
tg1.wait();
tg2.reset(); // TaskGroup was destroyed without checking to make sure it was signaled yet! Do tg2->wait() first.
Returning Values
The addTask
returns a carb::tasking::Future
object that is based on the return type of Callable
.
This value can be retrieved via the carb::tasking::Future::get()
function, which will wait until the
task completes and the value becomes available.
Task Completion and Guarantees
There are two primary phases to task completion that can be used to trigger subsequent tasks or can be waited on.
Phase 1: Tracker Exit
The Trackers that were passed to addTask
are “exited” once the following are guaranteed to have happened (note that the order is not guaranteed):
The
Callable
function has returned (any value returned has been set in theFuture<T>
, but theFuture<T>
is not yet signaled).Any and all parameters bound to
addTask
have been destructed.The
Callable
along with any and all captured variables have been destructed.Task-specific data has been destroyed.
At this point, Trackers that were tracking the task may be signaled, but there is no guarantee that the carb::tasking::TaskContext
or Future<T>
have been signaled.
Phase 2: Task Completion
Once Trackers have been exited, the task is marked as complete. This causes the carb::tasking::TaskContext
and Future<T>
to be signaled, and will release any threads or tasks waiting on TaskContext
or Future<T>
.
Waiting on the TaskContext
or the Future<T>
guarantees that the task is complete in all respects, that any and all
bound or captured variables passed to addTask
have been destructed, and that all Trackers have been exited.
Cancellation
In some cases it may be advantageous to cancel a task that hasn’t started yet. This can be attempted through the carb::tasking::ITasking::tryCancelTask
function.
Warning
A task cannot be canceled once the Callable
has been invoked. However, the Callable
could check a flag and return as quickly as possible.
The system guarantees that the Callable
will be invoked exactly once unless tryCancelTask
has been invoked and returns
true
, in which case Callable
will never be called.
Successfully cancelling a task also notifies all Tracker
objects that the task is finished, but Future
objects will
be in a state where carb::tasking::Future::isCancelled()
will return true and attempts to get()
the value will
call std::terminate()
.
When tryCancelTask
returns true
it is also guaranteed that Callable
and any captured Args
originally passed
to addTask
have been destroyed, and the associated TaskContext
is signaled.
TaskGroup tg;
// Add a task that waits 100ms before executing
auto fut = tasking->addTaskIn(100ms, Priority::eDefault, tg, [] {
REQUIRE_UNARY(false); // This should never get executed
return true;
});
// Cancel the task immediately
CHECK(tasking->tryCancelTask(*fut.task_if()));
// Note that the TaskGroup is now empty (signalled), the Future is signalled and `isCanceled()` returns true.
CHECK(tg.try_wait());
REQUIRE(fut.valid());
CHECK(fut.try_wait());
CHECK(fut.isCanceled());
// It shouldn't be executed; if it is, the REQUIRE(false) in the task will fail the test
std::this_thread::sleep_for(150ms);
Other Task Types
The system provides other functions that allow queuing tasks that have other dependencies without needing to allocate a fiber to the tasks. Using these functions can mitigate fiber resource issues.
Throttled Tasks
Throttled tasks use a carb::tasking::Semaphore
as a gate to ensure that only a certain number of tasks that
share the Semaphore
are executing at any given time. Tasks that cannot acquire the semaphore can wait generally without
assigning a fiber. Before the task can start, it must acquire the semaphore. When the task has completed or has been canceled
the semaphore will be released.
SemaphoreWrapper sema(1); // Starts with a value of 1, so only 1 task may execute concurrently.
TaskGroup tg;
for (int i = 0; i != 5; ++i)
{
tasking->addThrottledTask(sema, Priority::eDefault, {}, [] { /* do something async */ });
}
tg.wait(); // All tasks execute serially because the semaphore has a value of 1
Sub-tasks
Sub-tasks can only execute after a dependency becomes signaled. The carb::tasking::ITasking::addSubTask()
function
uses the carb::tasking::RequiredObject
helper function to declare zero or more dependencies for a task. Generally
the task is not assigned a fiber until all dependencies have become signaled.
The following are valid dependencies and can be given as a RequiredObject
:
nullptr
- always signaled.carb::tasking::TaskContext
- signaled when the associated task completes in all respects.carb::tasking::TaskGroup
- signaled when theTaskGroup
contains zero tasks.carb::tasking::Future
- signaled when the value is available to be read from theFuture
.carb::tasking::SharedFuture
- signaled when the value is available to be read from theSharedFuture
.carb::tasking::Counter*
- becomes signaled when theCounter
is at its target value.
auto dependency = tasking->addTask(Priority::eDefault, {}, [] { /* long running task */ });
auto future = tasking->addSubTask(dependency, Priority::eDefault, {}, [] { /* cleanup task */ });
future.wait();
// The dependent task must complete before the cleanup task is started. At this point both have completed.
The carb::tasking::Future::then()
function is provided as syntactic sugar around ITasking::addSubTask
.
There is a function that also combines the concepts of throttling and sub-tasks: carb::tasking::ITasking::addThrottledSubTask()
.
Timed Tasks
Another powerful feature involves running a task after a certain amount of time has elapsed, or at a specific time. There are
two functions provided for this: carb::tasking::ITasking::addTaskIn()
and carb::tasking::addTaskAt()
. In
both cases, the task will typically not be assigned a fiber until the time period has elapsed.
// Add an alarm that happens in an hour
using namespace std::chrono_literals;
tasking->addTaskIn(1h, Priority::eDefault, {}, [] { /* do something in an hour */ });