Executor Creation
This is a practitioner’s guide to using the Execution Framework. Before continuing, it is recommended you first review the Execution Framework Overview along with basic topics such as Graphs Concepts, Pass Concepts, and Execution Concepts.
Customizing execution can happen at many levels, let’s have a look at different examples.
Customizing Visit Strategy
The default ExecutorFallback
’s visit strategy and execution order is matching traversal over the entire graph, where each node gets
computed only once when all upstream nodes complete computation. Without changing the traversal order, we can
change the visit strategy to only compute when the underlying node requests to compute.
struct ExecutionVisitWithCacheCheck
{
//! Called when the traversal wants to visit a node. This method determines what to do with the node (e.g. schedule
//! it, defer it, etc).
template <typename ExecutorInfo>
static Status tryVisit(ExecutorInfo info) noexcept
{
auto& nodeData = info.getNodeData();
auto triggeringTaskStatus = info.currentTask.getExecutionStatus();
if (triggeringTaskStatus == Status::eSuccess)
nodeData.hasComputedUpstream = true; // we only set to true...doesn't matter which thread does it first
else if (triggeringTaskStatus == Status::eDeferred)
nodeData.hasDeferredUpstream = true; // we only set to true...doesn't matter which thread does it first
std::size_t requiredCount = info.nextNode->getParents().size() - info.nextNode->getCycleParentCount();
if ((requiredCount == 0) || (++nodeData.visitCount == requiredCount))
{
if (nodeData.hasDeferredUpstream)
return Status::eDeferred;
else
{
// spawning a task within executor doesn't change the upstream path. just reference the same one.
ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
if (nodeData.hasComputedUpstream ||
info.getContext()->getStateInfo(newTask)->needsCompute(info.getExecutionStamp()))
return info.schedule(std::move(newTask));
else // continue downstream...there may be something dirty. Bypass scheduler to avoid unnecessary
// overhead
return info.continueExecute(newTask);
}
}
return Status::eUnknown;
}
};
In this modified version, we will only compute a node and propagate this to the downstream when compute was requested.
Customizing Preallocated Per-node Data
Sometimes visit strategy must store more data per node to achieve the desired execution behavior. We will use an example from a pipeline graph that dynamically generates more work based on data and a static graph.
struct TestPipelineExecutionNodeData : public ExecutionNodeData
{
DynamicNode* getNode(ExecutionTaskTag tag)
{
if (tag == ExecutionTask::kEmptyTag)
return nullptr;
auto findIt = generatedNodes.find(tag);
return findIt != generatedNodes.end() ? &findIt->second : nullptr;
}
DynamicNode* createNode(ExecutionTask&& task)
{
if (!task.hasValidTag())
return nullptr; // LCOV_EXCL_LINE
auto findIt = generatedNodes.find(task.getTag());
if (findIt != generatedNodes.end())
return &findIt->second; // LCOV_EXCL_LINE
auto added = generatedNodes.emplace(task.getTag(), std::move(task));
return &added.first->second;
}
using DynamicNodes = std::map<ExecutionTaskTag, DynamicNode>;
DynamicNodes generatedNodes;
std::atomic<std::size_t> dynamicUpstreamCount{ 0 };
std::atomic<std::size_t> dynamicVisitCount{ 0 };
};
template <typename ExecutorInfo>
Status TestPipelineExecutionVisit::tryVisit(ExecutorInfo info) noexcept
{
OMNI_GRAPH_EXEC_ASSERT(info.nextNode->getCycleParentCount() == 0);
auto pipelineNodeDef = omni::graph::exec::unstable::cast<TestPipelineNodeDef>(info.nextNode->getNodeDef());
if (!pipelineNodeDef)
return Status::eFailure; // LCOV_EXCL_LINE
auto executor = omni::graph::exec::unstable::cast<TestPipelineExecutor>(info.getExecutor());
REQUIRE(executor);
const ExecutionTask& currentTask = info.currentTask;
auto& predData = info.getExecutor()->getNodeData(currentTask.getNode());
auto& nodeData = info.getNodeData();
std::size_t dynamicVisit = 0;
if (!currentTask.hasValidTag()) // we enter a pre-visit that can statically generate work
{
nodeData.dynamicUpstreamCount += predData.generatedNodes.size();
nodeData.visitCount++;
dynamicVisit = nodeData.dynamicVisitCount;
Status status = pipelineNodeDef->generate(
currentTask, info.nextNode, TestPipelineNodeDef::VisitStep::ePreExecute, executor->getDynamicGraph());
if (status == Status::eSuccess /*STATIC*/ && nodeData.visitCount >= info.nextNode->getParents().size())
{
ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
(void)executor->continueExecute(newTask);
}
}
else
{
dynamicVisit = ++nodeData.dynamicVisitCount;
DynamicNode* predDynamicNode = predData.getNode(currentTask.getTag());
predDynamicNode->done();
pipelineNodeDef->generate(
currentTask, info.nextNode, TestPipelineNodeDef::VisitStep::eExecute, executor->getDynamicGraph());
}
// this was the last dynamic call into the node
if (nodeData.visitCount >= info.nextNode->getParents().size() && nodeData.dynamicUpstreamCount == dynamicVisit)
{
Status status = pipelineNodeDef->generate(
currentTask, info.nextNode, TestPipelineNodeDef::VisitStep::ePostExecute, executor->getDynamicGraph());
if (status == Status::eSuccess /*DYNAMIC*/)
{
ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
(void)executor->continueExecute(newTask);
}
}
// Kick dynamic work
for (auto& pair : nodeData.generatedNodes)
{
DynamicNode& dynNode = pair.second;
if (dynNode.trySchedule())
{
ExecutionTask newTask = dynNode.task();
info.schedule(std::move(newTask));
}
}
return Status::eUnknown;
}
Customizing Scheduler
The default ExecutorFallback
’s scheduler will run all the generated tasks serially on a calling thread. We can easily
change that and request task dispatch from a custom scheduler.
struct TestTbbScheduler
{
tbb::task_group g;
TestTbbScheduler(IExecutionContext* context)
{
}
~TestTbbScheduler() noexcept
{
}
template <typename Fn>
Status schedule(Fn&& task, SchedulingInfo)
{
g.run(
[task = captureScheduleFunction(task), this]() mutable
{
Status ret = invokeScheduleFunction(task);
Status current, newValue = Status::eUnknown;
do // LCOV_EXCL_LINE
{
current = this->m_status.load();
newValue = ret | current;
} while (!this->m_status.compare_exchange_weak(current, newValue));
});
return Status::eSuccess;
}
Status getStatus()
{
g.wait();
return m_status;
}
private:
std::atomic<Status> m_status{ Status::eUnknown };
};
Customizing Traversal
In all examples above, the executor was iterating over all children of a node and was able to stop dispatching the node
to compute. We can further customize the continuation loop over children of a node by overriding the Executor::continueExecute(const ExecutionTask&)
method. This ultimately allows us to change entire traversal behavior. In this final example, we will push this to the
end by also customizing IExecutor::execute()
and delegating the entire execution to the implementation of NodeDef
.
We will use Behavior Tree to illustrate it all. Make sure to follow examples from Definition Creation to learn
how NodeGraphDef
were implemented.
using BaseExecutorClass = Executor<Node, BtVisit, BtNodeData, SerialScheduler, DefaultSchedulingStrategy>;
class BtExecutor : public BaseExecutorClass
{
public:
//! Factory method
static omni::core::ObjectPtr<BtExecutor> create(omni::core::ObjectParam<ITopology> toExecute,
const ExecutionTask& thisTask)
{
return omni::core::steal(new BtExecutor(toExecute.get(), thisTask));
}
//! Custom execute method to bypass continuation and start visitation directly.
//! Propagate the behavior tree status to node instantiating NodeGraphDef this executor operate on. This enables
//! composability of behavior trees.
Status execute_abi() noexcept override
{
auto& instantiatingNodeState = BtNodeState::forceGet(m_task.getContext()->getStateInfo(m_path));
instantiatingNodeState.computeStatus = BtNodeState::Status::eSuccess;
for (auto child : m_task.getNode()->getChildren())
{
if (BtVisit::tryVisit(Info(this, m_task, child)) == Status::eFailure)
{
instantiatingNodeState.computeStatus = BtNodeState::Status::eFailure;
break;
}
}
return Status::eSuccess;
}
//! We don't leverage continuation called from within executed task. Entire traversal logic is handled before
//! from within NodeDef execution method. See nodes implementing @p BtNodeDefBase.
Status continueExecute_abi(ExecutionTask* currentTask) noexcept override
{
return currentTask->getExecutionStatus();
}
protected:
BtExecutor(ITopology* toExecute, const ExecutionTask& currentTask) noexcept
: BaseExecutorClass(toExecute, currentTask)
{
}
};
struct BtVisit
{
template <typename ExecutorInfo>
static Status tryVisit(ExecutorInfo info) noexcept
{
// Illustrate that we can still leverage pre-allocated data to avoid potential cycles.
// FWIW. They can as well be detected earlier in the pipeline.
auto& nodeData = info.getNodeData();
if (std::exchange(nodeData.executed, true))
{
return Status::eFailure; // LCOV_EXCL_LINE
}
// We don't engage the scheduler because there should be only single node under root...if not but we could get
// all the independent branches executed concurrently when going via scheduler.
ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
if (newTask.execute(info.getExecutor()) == Status::eSuccess)
{
auto& nodeState = BtNodeState::forceGet(&newTask);
return (nodeState.computeStatus == BtNodeState::Status::eSuccess) ? Status::eSuccess : Status::eFailure;
}
return Status::eFailure; // LCOV_EXCL_LINE
}
};
class BtSequenceNodeDef : public BtNodeDefBase
{
public:
//! Factory method
static omni::core::ObjectPtr<BtSequenceNodeDef> create()
{
return omni::core::steal(new BtSequenceNodeDef());
}
protected:
//! Specialized composition method for sequence behavior. We don't engage scheduler since all work needs to happen
//! during the call and scheduler would only add overhead in here.
Status execute_abi(ExecutionTask* info) noexcept override
{
auto& nodeState = BtNodeState::forceGet(info);
nodeState.computeStatus = BtNodeState::Status::eSuccess;
for (auto child : info->getNode()->getChildren())
{
ExecutionTask newTask(info->getContext(), child, info->getUpstreamPath());
newTask.execute(getCurrentExecutor()); // bypass scheduler
if (BtNodeState::forceGet(&newTask).computeStatus == BtNodeState::Status::eFailure)
{
nodeState.computeStatus = BtNodeState::Status::eFailure;
break;
}
}
return Status::eSuccess;
}
//! Constructor
BtSequenceNodeDef() noexcept : BtNodeDefBase("tests.def.BtSequenceNodeDef")
{
}
};