Anatomy of a Task

While a task itself is a simple construct, there is a need to handle various aspects of a task: when it can start, knowing when it has finished and retrieving any return values from it.

Let’s take a look at the most basic way to queue a task: carb::tasking::ITasking::addTask()

/**
 * Runs the given function-like object as a task.
 *
 * @param priority  The priority of the task to execute.
 * @param trackers (optional) Zero or more counters that are incremented before addTask() returns and are
 * decremented upon task completion. These counters can be used to determine task completion.
 * @param f A C++ "Callable" object (i.e. functor, lambda, [member] function ptr) that optionally returns a value
 * @param args Arguments to pass to @p f
 * @return A Future based on the return type of @p f
 */
template <class Callable, class... Args>
auto addTask(Priority priority, Trackers&& trackers, Callable&& f, Args&&... args);

Priority

Every task has an associated priority of the value carb::tasking::Priority. As expected, higher priority tasks execute before lower priority tasks. As carb.tasking.plugin is a cooperative (as opposed to preemptive) multi-tasking scheduler, the higher priority tasks will complete (or yield) before lower priority tasks will begin executing.

A special eMain priority exists to specify that a task should only execute on the “main” thread.

Trackers

Trackers are a powerful feature for tasks. The carb::tasking::Trackers structure accepts zero or more objects of type carb::tasking::Tracker which in turn accept multiple types. These types function as input/output parameters to the tasking system. Each provided Tracker will be “entered” before the addTask function returns (and before the task can possibly start), and each provided Tracker will be “exited” when the provided Callable has completed. Successfully cancelling a task causes all Trackers to be “exited.”

Note

Trackers are exited after the provided Callable has completed, but slightly before the TaskContext for the task becomes signaled (and before any returned Future becomes signaled). See Task Completion and Guarantees.

No Trackers

Either nullptr or an empty initializer list {} can be specified to indicate that no Tracker objects are desired.

“Enter” action: None.

“Exit” action: None.

TaskGroup Tracker

The carb::tasking::TaskGroup object is conceptually an inverse semaphore: It is signaled when empty and will block when tasks enter it.

“Enter” action: The carb::tasking::TaskGroup::enter() function is called, incrementing the number of tasks by one.

“Exit” action: The carb::tasking::TaskGroup::leave() function is called, decrementing the number of tasks by one. If the number of tasks in the task group becomes zero, the TaskGroup becomes signaled.

TaskGroup can be passed either via reference or pointer. If a null pointer is passed it will be ignored, which makes an effective means of supporting optional TaskGroup objects.

Warning

It is the caller’s responsibility to ensure that the TaskGroup object is valid until carb::tasking::TaskGroup::try_wait() would return true.

TaskContext Tracker

Either a pointer or a reference to a carb::tasking::TaskContext can be passed as a Tracker. If the pointer is nullptr it will be ignored.

“Enter” action: The pointer (if non-nullptr) or reference will receive the associated TaskContext for the task.

“Exit” action: None.

Future<void> Tracker

Regardless of the return value of the invocable object passed to addTask, a pointer or a reference to a carb::tasking::Future<void> can be populated as a Tracker. The Future<void> is a specialization of carb::tasking::Future that is simple wrapper around a TaskContext. Like other Tracker types, a nullptr will be ignored to facilitate optional types.

“Enter” action: The pointer (if non-nullptr) or reference will receive the associated Future<void> for the task.

“Exit” action: None.

Counter Tracker

Despite being deprecated and to promote backwards-compatibility, objects convertible to pointers of carb::tasking::Counter are accepted as Tracker objects. Similarly to other Tracker types, nullptr is ignored.

“Enter” action: Calls carb::tasking::ITasking::fetchAddCounter().

“Exit” action: Calls carb::tasking::ITasking::fetchSubCounter().

Warning

It is the caller’s responsibility to ensure that the Counter object is valid until carb::tasking::ITasking::try_wait() on the Counter would return true.

Invocable Objects

The Callable and optional Args parameters work together to provide std::invoke-like execution support. Nearly anything that is callable via std::invoke is supported.

Adding a task implicitly creates a shared state as for carb::tasking::Promise. The carb::tasking::Future from the Promise is returned to the caller, and the return value of Callable is passed similarly to carb::tasking::Promise::set_value(). Therefore, when the Callable returns a value, the Future becomes ready and the return value can be retrieved.

If any Args are provided, they are captured in a std::tuple using std::decay and passed as distinct arguments to Callable. The Args are forwarded using std::forward to be captured and passed to Callable with std::move.

If a task is canceled or throws an uncaught exception, the captured Args will be destroyed.

Warning

It is the caller’s responsibility to ensure that the invocable object, lambda captures, and passed arguments remain valid until the Future<> returned from carb::tasking::ITasking::addTask() becomes signaled. For instance, passing a member-function pointer to addTask and a this pointer requires that the caller ensure that the this pointer remains valid, otherwise undefined behavior will occur. In general it is better to pass temporaries to addTask and/or use std::shared_ptr / std::weak_ptr so that tasks may be abandoned without repercussions (Best Practices).

Lambda

One of the most common methods involves lambdas which can capture any arguments. Lambdas are powerful but can make debugging difficult as they are often inlined and do not have a human readable name. Part of the power of lambdas is that asynchronous code is easily visible next to synchronous code.

Warning

If parameters are captured by reference (&), or pointers are captured, it is the caller’s responsibility to ensure that the captured item exists for the duration of the task.

auto future = tasking->addTask(Priority::eDefault, {}, [my, captured, &variables] {
    // Do asynchronous work
    // ...
    return result;
});

// Do synchronous work
// ...

std::cout << future.get(); // await result and print

Variables that are not passed in a lambda can be passed as arguments. This is especially helpful for loops.

TaskGroup tg;
for (int i = 0; i != kMaxTasks; ++i)
{
    tasking->addTask(Priority::eDefault, tg, [] (int taskNumber) {
        // Do something asynchronous based on taskNumber
        // ...
    }, i); // Note the passing of i as an additional parameter
}
tg.wait();

Function and Pointer-to-Function

C-style pointers-to-functions can be passed to addTask as well:

void fooAsync()
{
    // Do something async
    // ...
}

tasking->addTask(Priority::eDefault, {}, fooAsync);

Additional parameters passed to addTask will be passed as parameters to the function:

void fooAsync(std::unique_ptr<int> i)
{
    // Do something async with i
    // ...
}

tasking->addTask(Priority::eDefault, {}, fooAsync, std::make_unique(5));

Member Functions

It is also possible to pass member functions to addTask and pass the this parameter as the first additional parameter:

Warning

It is the caller’s responsibility to ensure that the this parameter remains valid until the task has completed.

class Foo
{
    void myMemberFunc()
    {
        printf("Foo ptr: %p\n", this);
    }
};

Foo fooOnStack;

auto future = tasking->addTask(Priority::eDefault, {}, &Foo::myMemberFunc, &fooOnStack);
future.wait();

Functors

Objects with operator() are also supported:

struct FiveAdder
{
    int operator() (int val)
    {
        return val + 5;
    }
};

auto future = tasking->addTask(Priority::eDefault, {}, FiveAdder(), 5);
CHECK_EQ(future.get(), 10);

Execution

Notifying threads of work

When addTask is called, the task goes into a queue within the scheduler. Threads belonging to the carb.tasking system add work to their own queue, and external threads will contribute to a global queue within the scheduler. If the system is idle, a thread will be woken, that will then wake other threads causing a chain reaction.

When worker threads wake, they search for something to do. The priority order of a worker thread’s jobs are:

  1. The thread’s pinned tasks

  2. The thread’s local task queue (youngest tasks)

  3. The system task queue

  4. Stolen work from another thread’s local task queue (oldest tasks)

Threads remain active looking for work to do as long as the system says there might be work.

Note

A task can begin executing even before addTask returns!

Assigning a fiber

Before a task can begin executing, it must be assigned a special stack called a fiber. A fiber is like a thread that the scheduler can choose to run at any point (as opposed to a thread which the OS will schedule and run). Typically a task is assigned a fiber immediately before it begins executing, but certain operations will assign a fiber immediately even if the task has not started. (Main Article)

Fibers are a limited resource within the system. The absolute maximum fibers is specified by carb::tasking::kMaxFibers and the maximum is configured by carb::tasking::ITasking::changeParameters. If the system runs out of fibers, tasks cannot execute until existing tasks complete and relinquish their fibers. For this reason, the system provides some support for adding sub-tasks that have a dependency rather than requiring code that immediately waits for a resource upon starting. This is accomplished through the carb::tasking::ITasking::addSubTask() function and allows fiber allocation to be deferred until the dependency condition is fulfilled.

Invoking the Callable

Once a fiber is assigned, the thread will switch to the fiber and invoke the Callable passed to addTask. As this happens within the context of the fiber, this is known as “task context.” By having an underlying fiber, calls back into the scheduler to wait won’t block the thread, but will “put the fiber to sleep” and run a different task (or wait for a new task) until the fiber is woken again. This allows tasks to call “await”-style functions without blocking worker threads. See Best Practices for more info.

Notifying Trackers

When the Callable returns, any carb::tasking::Future returned from addTask will receive the return value from the Callable and further attempts to wait() on the Future will return immediately.

Note that multiple Tracker objects are notified in a non-deterministic order and gaps of time exist where one Tracker may have been notified that the task completed, but the Future or another Tracker may not yet have received notification. Therefore, the following example is bad practice:

TaskGroup tg1;
auto tg2 = std::make_unique<TaskGroup>();

tasking->addTask(Priority::eDefault, { tg, tg2 }, [] { /* do something async */ });

tg1.wait();
tg2.reset(); // TaskGroup was destroyed without checking to make sure it was signaled yet! Do tg2->wait() first.

Returning Values

The addTask returns a carb::tasking::Future object that is based on the return type of Callable. This value can be retrieved via the carb::tasking::Future::get() function, which will wait until the task completes and the value becomes available.

Task Completion and Guarantees

There are two primary phases to task completion that can be used to trigger subsequent tasks or can be waited on.

Phase 1: Tracker Exit

The Trackers that were passed to addTask are “exited” once the following are guaranteed to have happened (note that the order is not guaranteed):

  • The Callable function has returned (any value returned has been set in the Future<T>, but the Future<T> is not yet signaled).

  • Any and all parameters bound to addTask have been destructed.

  • The Callable along with any and all captured variables have been destructed.

  • Task-specific data has been destroyed.

At this point, Trackers that were tracking the task may be signaled, but there is no guarantee that the carb::tasking::TaskContext or Future<T> have been signaled.

Phase 2: Task Completion

Once Trackers have been exited, the task is marked as complete. This causes the carb::tasking::TaskContext and Future<T> to be signaled, and will release any threads or tasks waiting on TaskContext or Future<T>.

Waiting on the TaskContext or the Future<T> guarantees that the task is complete in all respects, that any and all bound or captured variables passed to addTask have been destructed, and that all Trackers have been exited.

Cancellation

In some cases it may be advantageous to cancel a task that hasn’t started yet. This can be attempted through the carb::tasking::ITasking::tryCancelTask function.

Warning

A task cannot be canceled once the Callable has been invoked. However, the Callable could check a flag and return as quickly as possible.

The system guarantees that the Callable will be invoked exactly once unless tryCancelTask has been invoked and returns true, in which case Callable will never be called.

Successfully cancelling a task also notifies all Tracker objects that the task is finished, but Future objects will be in a state where carb::tasking::Future::isCancelled() will return true and attempts to get() the value will call std::terminate().

When tryCancelTask returns true it is also guaranteed that Callable and any captured Args originally passed to addTask have been destroyed, and the associated TaskContext is signaled.

        TaskGroup tg;
        // Add a task that waits 100ms before executing
        auto fut = tasking->addTaskIn(100ms, Priority::eDefault, tg, [] {
            REQUIRE_UNARY(false); // This should never get executed
            return true;
        });
        // Cancel the task immediately
        CHECK(tasking->tryCancelTask(*fut.task_if()));
        // Note that the TaskGroup is now empty (signalled), the Future is signalled and `isCanceled()` returns true.
        CHECK(tg.try_wait());
        REQUIRE(fut.valid());
        CHECK(fut.try_wait());
        CHECK(fut.isCanceled());

        // It shouldn't be executed; if it is, the REQUIRE(false) in the task will fail the test
        std::this_thread::sleep_for(150ms);

Other Task Types

The system provides other functions that allow queuing tasks that have other dependencies without needing to allocate a fiber to the tasks. Using these functions can mitigate fiber resource issues.

Throttled Tasks

Throttled tasks use a carb::tasking::Semaphore as a gate to ensure that only a certain number of tasks that share the Semaphore are executing at any given time. Tasks that cannot acquire the semaphore can wait generally without assigning a fiber. Before the task can start, it must acquire the semaphore. When the task has completed or has been canceled the semaphore will be released.

SemaphoreWrapper sema(1); // Starts with a value of 1, so only 1 task may execute concurrently.

TaskGroup tg;
for (int i = 0; i != 5; ++i)
{
    tasking->addThrottledTask(sema, Priority::eDefault, {}, [] { /* do something async */ });
}
tg.wait(); // All tasks execute serially because the semaphore has a value of 1

Sub-tasks

Sub-tasks can only execute after a dependency becomes signaled. The carb::tasking::ITasking::addSubTask() function uses the carb::tasking::RequiredObject helper function to declare zero or more dependencies for a task. Generally the task is not assigned a fiber until all dependencies have become signaled.

The following are valid dependencies and can be given as a RequiredObject:

auto dependency = tasking->addTask(Priority::eDefault, {}, [] { /* long running task */ });
auto future = tasking->addSubTask(dependency, Priority::eDefault, {}, [] { /* cleanup task */ });

future.wait();
// The dependent task must complete before the cleanup task is started. At this point both have completed.

The carb::tasking::Future::then() function is provided as syntactic sugar around ITasking::addSubTask.

There is a function that also combines the concepts of throttling and sub-tasks: carb::tasking::ITasking::addThrottledSubTask().

Timed Tasks

Another powerful feature involves running a task after a certain amount of time has elapsed, or at a specific time. There are two functions provided for this: carb::tasking::ITasking::addTaskIn() and carb::tasking::addTaskAt(). In both cases, the task will typically not be assigned a fiber until the time period has elapsed.

// Add an alarm that happens in an hour
using namespace std::chrono_literals;
tasking->addTaskIn(1h, Priority::eDefault, {}, [] { /* do something in an hour */ });