omni/graph/exec/unstable/Executor.h
File members: omni/graph/exec/unstable/Executor.h
// Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include <carb/cpp/TypeTraits.h>
#include <omni/graph/exec/unstable/ExecutionPath.h>
#include <omni/graph/exec/unstable/ExecutionTask.h>
#include <omni/graph/exec/unstable/IExecutionContext.h>
#include <omni/graph/exec/unstable/IExecutor.h>
#include <omni/graph/exec/unstable/INode.h>
#include <omni/graph/exec/unstable/ITopology.h>
#include <omni/graph/exec/unstable/ScheduleFunction.h>
#include <omni/graph/exec/unstable/SchedulingInfo.h>
#include <atomic>
#include <memory>
#include <queue>
#include <type_traits>
namespace omni
{
namespace graph
{
namespace exec
{
namespace unstable
{
struct SerialScheduler
{
SerialScheduler(IExecutionContext* context)
{
}
template <typename Fn>
Status schedule(Fn&& task, SchedulingInfo)
{
m_tasks.emplace(
[task = captureScheduleFunction(task), this]() mutable
{
Status stat = invokeScheduleFunction(task);
this->m_tasksStatus |= stat;
});
return Status::eSuccess;
}
Status getStatus()
{
while (!m_tasks.empty())
{
auto& task = m_tasks.front();
task();
m_tasks.pop();
}
return m_tasksStatus;
}
private:
Status m_tasksStatus{ Status::eUnknown };
std::queue<std::function<void()>> m_tasks;
};
struct ExecutionNodeData
{
std::atomic<std::uint32_t> visitCount{ 0 };
std::atomic<bool> hasComputedUpstream{ false };
std::atomic<bool> hasDeferredUpstream{ false };
};
// ef-docs execution-visit-begin
struct ExecutionVisit
{
template <typename ExecutorInfo>
static Status tryVisit(ExecutorInfo info) noexcept
{
auto& nodeData = info.getNodeData();
if (info.currentTask.getExecutionStatus() == Status::eDeferred)
nodeData.hasDeferredUpstream = true; // we only set to true...doesn't matter which thread does it first
std::size_t requiredCount = info.nextNode->getParents().size() - info.nextNode->getCycleParentCount();
if ((requiredCount == 0) || (++nodeData.visitCount == requiredCount))
{
if (!nodeData.hasDeferredUpstream)
{
// spawning a task within executor doesn't change the upstream path. just reference the same one.
ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
return info.schedule(std::move(newTask));
}
else
return Status::eDeferred;
}
return Status::eUnknown;
}
};
// ef-docs execution-visit-end
// ef-docs execution-visit-cache-begin
struct ExecutionVisitWithCacheCheck
{
template <typename ExecutorInfo>
static Status tryVisit(ExecutorInfo info) noexcept
{
auto& nodeData = info.getNodeData();
auto triggeringTaskStatus = info.currentTask.getExecutionStatus();
if (triggeringTaskStatus == Status::eSuccess)
nodeData.hasComputedUpstream = true; // we only set to true...doesn't matter which thread does it first
else if (triggeringTaskStatus == Status::eDeferred)
nodeData.hasDeferredUpstream = true; // we only set to true...doesn't matter which thread does it first
std::size_t requiredCount = info.nextNode->getParents().size() - info.nextNode->getCycleParentCount();
if ((requiredCount == 0) || (++nodeData.visitCount == requiredCount))
{
if (nodeData.hasDeferredUpstream)
return Status::eDeferred;
else
{
// spawning a task within executor doesn't change the upstream path. just reference the same one.
ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
if (nodeData.hasComputedUpstream ||
info.getContext()->getStateInfo(newTask)->needsCompute(info.getExecutionStamp()))
return info.schedule(std::move(newTask));
else // continue downstream...there may be something dirty. Bypass scheduler to avoid unnecessary
// overhead
return info.continueExecute(newTask);
}
}
return Status::eUnknown;
}
};
// ef-docs execution-visit-cache-end
struct DefaultSchedulingStrategy
{
static SchedulingInfo getSchedulingInfo(const ExecutionTask& task)
{
INode* node = task.getNode();
if (node->getNodeDef())
return node->getNodeDef()->getSchedulingInfo(task);
else if (node->getNodeGraphDef())
return node->getNodeGraphDef()->getSchedulingInfo(task);
else
return SchedulingInfo::eSchedulerBypass; // bypass the scheduler since there is nothing to compute
}
};
template <typename ExecNode,
typename ExecStrategy,
typename ExecNodeData,
typename Scheduler,
typename SchedulingStrategy,
typename ExecutorInterface = IExecutor>
class Executor : public Implements<ExecutorInterface>
{
using Node = const ExecNode;
using NodeData = ExecNodeData;
using NodeDataArray = std::vector<NodeData>;
using ThisExecutor = Executor<ExecNode, ExecStrategy, ExecNodeData, Scheduler, SchedulingStrategy, ExecutorInterface>;
using ThisExecutorPtr = omni::core::ObjectPtr<ThisExecutor>;
template <typename S, typename Enabled = void>
struct is_deferred
{
static constexpr bool value = false;
};
template <typename S>
struct is_deferred<
S,
std::enable_if_t<std::is_same<Status,
decltype(std::declval<S>().scheduleDeferred(
std::declval<IScheduleFunction*>(), std::declval<SchedulingInfo>()))>::value>>
{
static constexpr bool value = true;
};
public:
struct Info
{
private:
Executor* m_executor;
public:
const ExecutionTask& currentTask;
INode* nextNode;
Info(Executor* executor, const ExecutionTask& task, INode* next) noexcept
: m_executor(executor), currentTask(task), nextNode(next)
{
}
NodeData& getNodeData()
{
return m_executor->getNodeData(nextNode);
}
ThisExecutor* getExecutor()
{
return m_executor;
}
IExecutionContext* getContext()
{
return currentTask.getContext();
}
Stamp getExecutionStamp()
{
return getContext()->getExecutionStamp();
}
const ExecutionPath& getUpstreamPath() const
{
return currentTask.getUpstreamPath();
}
Status schedule(ExecutionTask&& newTask)
{
return m_executor->scheduleInternal(std::move(newTask));
}
SchedulingInfo getSchedulingInfo(const ExecutionTask& task) const
{
return m_executor->getSchedulingInfo(task);
}
Scheduler& getScheduler()
{
return m_executor->m_scheduler;
}
Status continueExecute(ExecutionTask& currentTask)
{
return m_executor->continueExecute_abi(¤tTask);
}
};
SchedulingInfo getSchedulingInfo(const ExecutionTask& task) const
{
return SchedulingStrategy::getSchedulingInfo(task);
}
NodeData& getNodeData(INode* node)
{
return m_nodeData[node->getIndexInTopology()];
}
const ExecutionPath& getPath() const
{
return m_path;
}
IExecutionContext* getContext() const
{
return m_task.getContext();
}
static ThisExecutorPtr create(omni::core::ObjectParam<ITopology> toExecute, const ExecutionTask& thisTask)
{
return omni::core::steal(new ThisExecutor(toExecute.get(), thisTask));
}
protected:
Executor() = delete;
explicit Executor(ITopology* toExecute, const ExecutionTask& currentTask) noexcept
: m_path((currentTask.getNode() != toExecute->getRoot()) ?
ExecutionPath(currentTask.getUpstreamPath(), currentTask.getNode()) :
currentTask.getUpstreamPath()),
m_task(currentTask.getContext(), toExecute->getRoot(), m_path),
m_nodeData(toExecute->getMaxNodeIndex()),
m_scheduler(currentTask.getContext())
{
}
// ef-docs executor-execute-begin
Status execute_abi() noexcept override
{
// We can bypass all subsequent processing if the node associated with the task starting
// this execution has no children. Note that we return an eSuccess status because nothing
// invalid has occurred (e.g., we tried to execute an empty NodeGraphDef); we were asked to
// compute nothing, and so we computed nothing successfully (no-op)!
if (m_task.getNode()->getChildren().empty())
{
return Status::eSuccess | m_task.getExecutionStatus();
}
(void)continueExecute_abi(&m_task);
// Give a chance for the scheduler to complete the execution of potentially parallel work which should complete
// within current execution. All background tasks will continue pass this point.
// Scheduler is responsible for collecting the execution status for everything that this executor generated.
return m_scheduler.getStatus() | m_schedulerBypass;
}
// ef-docs executor-execute-end
// ef-docs executor-continue-execute-begin
Status continueExecute_abi(ExecutionTask* currentTask) noexcept override
{
OMNI_GRAPH_EXEC_ASSERT(currentTask);
if (currentTask->getNode()->getChildren().empty())
{
return Status::eSuccess | currentTask->getExecutionStatus();
}
Status ret = Status::eUnknown;
for (auto child : currentTask->getNode()->getChildren())
{
ret |= ExecStrategy::tryVisit(Info(this, *currentTask, child));
}
return ret | currentTask->getExecutionStatus();
}
// ef-docs executor-continue-execute-end
Status schedule_abi(IScheduleFunction* fn, SchedulingInfo schedInfo) noexcept override
{
OMNI_GRAPH_EXEC_FATAL_UNLESS_ARG(fn);
return scheduleExternal(fn, schedInfo);
}
template <typename S = Scheduler>
Status scheduleInternal(ExecutionTask&& newTask, typename std::enable_if_t<!is_deferred<S>::value>* = nullptr)
{
// ef-docs executor-schedule-internal-begin
Status ret = Status::eUnknown;
SchedulingInfo schedInfo = getSchedulingInfo(newTask);
if (schedInfo != SchedulingInfo::eSchedulerBypass)
{
// this task will finish before we exit executor...just capture as reference to avoid unnecessary cost
ret = m_scheduler.schedule([executor = this, task = std::move(newTask)]() mutable -> Status
{ return task.execute(executor); },
schedInfo);
}
else // bypass the scheduler...no need for extra scheduling overhead
{
m_schedulerBypass |= newTask.execute(this);
}
return ret;
// ef-docs executor-schedule-internal-end
}
template <typename S = Scheduler>
Status scheduleInternal(ExecutionTask&& newTask, typename std::enable_if_t<is_deferred<S>::value>* = nullptr)
{
// ef-docs executor-schedule-deferred-begin
SchedulingInfo schedInfo = getSchedulingInfo(newTask);
// for deferred tasks, we capture executor as a shared_ptr (extra cost, but keeps object alive)
Status ret = m_scheduler.scheduleDeferred(
[executor = omni::core::borrow(this), task = std::move(newTask)]() mutable -> Status
{ return task.execute(executor); },
schedInfo);
return ret;
// ef-docs executor-schedule-deferred-end
}
template <typename S = Scheduler>
Status scheduleExternal(IScheduleFunction* fn,
SchedulingInfo schedInfo,
typename std::enable_if_t<!is_deferred<S>::value>* = nullptr)
{
if (schedInfo != SchedulingInfo::eSchedulerBypass)
{
return m_scheduler.schedule(fn, schedInfo);
}
else // bypass the scheduler...no need for extra scheduling overhead
{
return fn->invoke();
}
}
template <typename S = Scheduler>
Status scheduleExternal(IScheduleFunction* fn,
SchedulingInfo schedInfo,
typename std::enable_if_t<is_deferred<S>::value>* = nullptr)
{
return m_scheduler.scheduleDeferred(fn, schedInfo);
}
ExecutionPath m_path;
ExecutionTask m_task;
NodeDataArray m_nodeData;
Scheduler m_scheduler;
Status m_schedulerBypass{ Status::eUnknown };
};
using ExecutorFallback = Executor<INode, ExecutionVisit, ExecutionNodeData, SerialScheduler, DefaultSchedulingStrategy>;
} // namespace unstable
} // namespace exec
} // namespace graph
} // namespace omni