Executor Creation#

This is a practitioner’s guide to using the Execution Framework. Before continuing, it is recommended you first review the Execution Framework Overview along with basic topics such as Graphs Concepts, Pass Concepts, and Execution Concepts.

Customizing execution can happen at many levels, let’s have a look at different examples.

Customizing Visit Strategy#

The default ExecutorFallback’s visit strategy and execution order is matching traversal over the entire graph, where each node gets computed only once when all upstream nodes complete computation. Without changing the traversal order, we can change the visit strategy to only compute when the underlying node requests to compute.

Listing 31 A custom visit strategy for visiting only nodes that requested compute.#
struct ExecutionVisitWithCacheCheck
{
    //! Called when the traversal wants to visit a node.  This method determines what to do with the node (e.g. schedule
    //! it, defer it, etc).
    template <typename ExecutorInfo>
    static Status tryVisit(ExecutorInfo info) noexcept
    {
        auto& nodeData = info.getNodeData();

        auto triggeringTaskStatus = info.currentTask.getExecutionStatus();
        if (triggeringTaskStatus == Status::eSuccess)
            nodeData.hasComputedUpstream = true; // we only set to true...doesn't matter which thread does it first
        else if (triggeringTaskStatus == Status::eDeferred)
            nodeData.hasDeferredUpstream = true; // we only set to true...doesn't matter which thread does it first

        std::size_t requiredCount = info.nextNode->getParents().size() - info.nextNode->getCycleParentCount();
        if ((requiredCount == 0) || (++nodeData.visitCount == requiredCount))
        {
            if (nodeData.hasDeferredUpstream)
                return Status::eDeferred;
            else
            {
                // spawning a task within executor doesn't change the upstream path. just reference the same one.
                ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
                if (nodeData.hasComputedUpstream ||
                    info.getContext()->getStateInfo(newTask)->needsCompute(info.getExecutionStamp()))
                    return info.schedule(std::move(newTask));
                else // continue downstream...there may be something dirty. Bypass scheduler to avoid unnecessary
                     // overhead
                    return info.continueExecute(newTask);
            }
        }

        return Status::eUnknown;
    }
};

In this modified version, we will only compute a node and propagate this to the downstream when compute was requested.

Customizing Preallocated Per-node Data#

Sometimes visit strategy must store more data per node to achieve the desired execution behavior. We will use an example from a pipeline graph that dynamically generates more work based on data and a static graph.

Listing 32 A custom data used by pipeline graph example.#
struct TestPipelineExecutionNodeData : public ExecutionNodeData
{
    DynamicNode* getNode(ExecutionTaskTag tag)
    {
        if (tag == ExecutionTask::kEmptyTag)
            return nullptr;

        auto findIt = generatedNodes.find(tag);
        return findIt != generatedNodes.end() ? &findIt->second : nullptr;
    }

    DynamicNode* createNode(ExecutionTask&& task)
    {
        if (!task.hasValidTag())
            return nullptr; // LCOV_EXCL_LINE

        auto findIt = generatedNodes.find(task.getTag());
        if (findIt != generatedNodes.end())
            return &findIt->second; // LCOV_EXCL_LINE

        auto added = generatedNodes.emplace(task.getTag(), std::move(task));
        return &added.first->second;
    }

    using DynamicNodes = std::map<ExecutionTaskTag, DynamicNode>;
    DynamicNodes generatedNodes;

    std::atomic<std::size_t> dynamicUpstreamCount{ 0 };
    std::atomic<std::size_t> dynamicVisitCount{ 0 };
};
Listing 33 A custom visit strategy for dynamically generating work.#
template <typename ExecutorInfo>
Status TestPipelineExecutionVisit::tryVisit(ExecutorInfo info) noexcept
{
    OMNI_GRAPH_EXEC_ASSERT(info.nextNode->getCycleParentCount() == 0);

    auto pipelineNodeDef = omni::graph::exec::unstable::cast<TestPipelineNodeDef>(info.nextNode->getNodeDef());
    if (!pipelineNodeDef)
        return Status::eFailure; // LCOV_EXCL_LINE

    auto executor = omni::graph::exec::unstable::cast<TestPipelineExecutor>(info.getExecutor());
    REQUIRE(executor);

    const ExecutionTask& currentTask = info.currentTask;
    auto& predData = info.getExecutor()->getNodeData(currentTask.getNode());
    auto& nodeData = info.getNodeData();

    std::size_t dynamicVisit = 0;
    if (!currentTask.hasValidTag()) // we enter a pre-visit that can statically generate work
    {
        nodeData.dynamicUpstreamCount += predData.generatedNodes.size();
        nodeData.visitCount++;
        dynamicVisit = nodeData.dynamicVisitCount;

        Status status = pipelineNodeDef->generate(
            currentTask, info.nextNode, TestPipelineNodeDef::VisitStep::ePreExecute, executor->getDynamicGraph());
        if (status == Status::eSuccess /*STATIC*/ && nodeData.visitCount >= info.nextNode->getParents().size())
        {
            ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
            (void)executor->continueExecute(newTask);
        }
    }
    else
    {
        dynamicVisit = ++nodeData.dynamicVisitCount;

        DynamicNode* predDynamicNode = predData.getNode(currentTask.getTag());
        predDynamicNode->done();

        pipelineNodeDef->generate(
            currentTask, info.nextNode, TestPipelineNodeDef::VisitStep::eExecute, executor->getDynamicGraph());
    }

    // this was the last dynamic call into the node
    if (nodeData.visitCount >= info.nextNode->getParents().size() && nodeData.dynamicUpstreamCount == dynamicVisit)
    {
        Status status = pipelineNodeDef->generate(
            currentTask, info.nextNode, TestPipelineNodeDef::VisitStep::ePostExecute, executor->getDynamicGraph());
        if (status == Status::eSuccess /*DYNAMIC*/)
        {
            ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
            (void)executor->continueExecute(newTask);
        }
    }

    // Kick dynamic work
    for (auto& pair : nodeData.generatedNodes)
    {
        DynamicNode& dynNode = pair.second;
        if (dynNode.trySchedule())
        {
            ExecutionTask newTask = dynNode.task();
            info.schedule(std::move(newTask));
        }
    }

    return Status::eUnknown;
}

Customizing Scheduler#

The default ExecutorFallback’s scheduler will run all the generated tasks serially on a calling thread. We can easily change that and request task dispatch from a custom scheduler.

Listing 34 A custom scheduler dispatch implementation to run all generated tasks concurrently.#
struct TestTbbScheduler
{
    tbb::task_group g;

    TestTbbScheduler(IExecutionContext* context)
    {
    }

    ~TestTbbScheduler() noexcept
    {
    }

    template <typename Fn>
    Status schedule(Fn&& task, SchedulingInfo)
    {
        g.run(
            [task = captureScheduleFunction(task), this]() mutable
            {
                Status ret = invokeScheduleFunction(task);
                Status current, newValue = Status::eUnknown;
                do // LCOV_EXCL_LINE
                {
                    current = this->m_status.load();
                    newValue = ret | current;
                } while (!this->m_status.compare_exchange_weak(current, newValue));
            });
        return Status::eSuccess;
    }
    Status getStatus()
    {
        g.wait();
        return m_status;
    }

private:
    std::atomic<Status> m_status{ Status::eUnknown };
};

Customizing Traversal#

In all examples above, the executor was iterating over all children of a node and was able to stop dispatching the node to compute. We can further customize the continuation loop over children of a node by overriding the Executor::continueExecute(const ExecutionTask&) method. This ultimately allows us to change entire traversal behavior. In this final example, we will push this to the end by also customizing IExecutor::execute() and delegating the entire execution to the implementation of NodeDef. We will use Behavior Tree to illustrate it all. Make sure to follow examples from Definition Creation to learn how NodeGraphDef were implemented.

Listing 35 A custom executor for behavior tree.#
using BaseExecutorClass = Executor<Node, BtVisit, BtNodeData, SerialScheduler, DefaultSchedulingStrategy>;
class BtExecutor : public BaseExecutorClass
{
public:
    //! Factory method
    static omni::core::ObjectPtr<BtExecutor> create(omni::core::ObjectParam<ITopology> toExecute,
                                                    const ExecutionTask& thisTask)
    {
        return omni::core::steal(new BtExecutor(toExecute.get(), thisTask));
    }

    //! Custom execute method to bypass continuation and start visitation directly.
    //! Propagate the behavior tree status to node instantiating NodeGraphDef this executor operate on. This enables
    //! composability of behavior trees.
    Status execute_abi() noexcept override
    {
        auto& instantiatingNodeState = BtNodeState::forceGet(m_task.getContext()->getStateInfo(m_path));
        instantiatingNodeState.computeStatus = BtNodeState::Status::eSuccess;

        for (auto child : m_task.getNode()->getChildren())
        {
            if (BtVisit::tryVisit(Info(this, m_task, child)) == Status::eFailure)
            {
                instantiatingNodeState.computeStatus = BtNodeState::Status::eFailure;
                break;
            }
        }

        return Status::eSuccess;
    }

    //! We don't leverage continuation called from within executed task. Entire traversal logic is handled before
    //! from within NodeDef execution method. See nodes implementing @p BtNodeDefBase.
    Status continueExecute_abi(ExecutionTask* currentTask) noexcept override
    {
        return currentTask->getExecutionStatus();
    }

protected:
    BtExecutor(ITopology* toExecute, const ExecutionTask& currentTask) noexcept
        : BaseExecutorClass(toExecute, currentTask)
    {
    }
};
Listing 36 A custom visit strategy for behavior tree executor.#
struct BtVisit
{
    template <typename ExecutorInfo>
    static Status tryVisit(ExecutorInfo info) noexcept
    {
        // Illustrate that we can still leverage pre-allocated data to avoid potential cycles.
        // FWIW. They can as well be detected earlier in the pipeline.
        auto& nodeData = info.getNodeData();
        if (std::exchange(nodeData.executed, true))
        {
            return Status::eFailure; // LCOV_EXCL_LINE
        }

        // We don't engage the scheduler because there should be only single node under root...if not but we could get
        // all the independent branches executed concurrently when going via scheduler.
        ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
        if (newTask.execute(info.getExecutor()) == Status::eSuccess)
        {
            auto& nodeState = BtNodeState::forceGet(&newTask);
            return (nodeState.computeStatus == BtNodeState::Status::eSuccess) ? Status::eSuccess : Status::eFailure;
        }

        return Status::eFailure; // LCOV_EXCL_LINE
    }
};
Listing 37 An example implementation of a node responsible to execute children and propagate the result.#
class BtSequenceNodeDef : public BtNodeDefBase
{
public:
    //! Factory method
    static omni::core::ObjectPtr<BtSequenceNodeDef> create()
    {
        return omni::core::steal(new BtSequenceNodeDef());
    }

protected:
    //! Specialized composition method for sequence behavior. We don't engage scheduler since all work needs to happen
    //! during the call and scheduler would only add overhead in here.
    Status execute_abi(ExecutionTask* info) noexcept override
    {
        auto& nodeState = BtNodeState::forceGet(info);
        nodeState.computeStatus = BtNodeState::Status::eSuccess;

        for (auto child : info->getNode()->getChildren())
        {
            ExecutionTask newTask(info->getContext(), child, info->getUpstreamPath());
            newTask.execute(getCurrentExecutor()); // bypass scheduler

            if (BtNodeState::forceGet(&newTask).computeStatus == BtNodeState::Status::eFailure)
            {
                nodeState.computeStatus = BtNodeState::Status::eFailure;
                break;
            }
        }

        return Status::eSuccess;
    }

    //! Constructor
    BtSequenceNodeDef() noexcept : BtNodeDefBase("tests.def.BtSequenceNodeDef")
    {
    }
};