omni/graph/exec/unstable/Executor.h

File members: omni/graph/exec/unstable/Executor.h

// Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//

#pragma once

#include <carb/cpp/TypeTraits.h>

#include <omni/graph/exec/unstable/ExecutionPath.h>
#include <omni/graph/exec/unstable/ExecutionTask.h>
#include <omni/graph/exec/unstable/IExecutionContext.h>
#include <omni/graph/exec/unstable/IExecutor.h>
#include <omni/graph/exec/unstable/INode.h>
#include <omni/graph/exec/unstable/ITopology.h>
#include <omni/graph/exec/unstable/ScheduleFunction.h>
#include <omni/graph/exec/unstable/SchedulingInfo.h>

#include <atomic>
#include <memory>
#include <queue>
#include <type_traits>

namespace omni
{
namespace graph
{
namespace exec
{
namespace unstable
{

struct SerialScheduler
{
    SerialScheduler(IExecutionContext* context)
    {
    }

    template <typename Fn>
    Status schedule(Fn&& task, SchedulingInfo)
    {
        m_tasks.emplace(
            [task = captureScheduleFunction(task), this]() mutable
            {
                Status stat = invokeScheduleFunction(task);
                this->m_tasksStatus |= stat;
            });

        return Status::eSuccess;
    }

    Status getStatus()
    {
        while (!m_tasks.empty())
        {
            auto& task = m_tasks.front();
            task();
            m_tasks.pop();
        }

        return m_tasksStatus;
    }

private:
    Status m_tasksStatus{ Status::eUnknown };

    std::queue<std::function<void()>> m_tasks;
};

struct ExecutionNodeData
{
    std::atomic<std::uint32_t> visitCount{ 0 };
    std::atomic<bool> hasComputedUpstream{ false };
    std::atomic<bool> hasDeferredUpstream{ false };
};

// ef-docs execution-visit-begin
struct ExecutionVisit
{
    template <typename ExecutorInfo>
    static Status tryVisit(ExecutorInfo info) noexcept
    {
        auto& nodeData = info.getNodeData();
        if (info.currentTask.getExecutionStatus() == Status::eDeferred)
            nodeData.hasDeferredUpstream = true; // we only set to true...doesn't matter which thread does it first

        std::size_t requiredCount = info.nextNode->getParents().size() - info.nextNode->getCycleParentCount();
        if ((requiredCount == 0) || (++nodeData.visitCount == requiredCount))
        {
            if (!nodeData.hasDeferredUpstream)
            {
                // spawning a task within executor doesn't change the upstream path. just reference the same one.
                ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
                return info.schedule(std::move(newTask));
            }
            else
                return Status::eDeferred;
        }

        return Status::eUnknown;
    }
};
// ef-docs execution-visit-end

// ef-docs execution-visit-cache-begin
struct ExecutionVisitWithCacheCheck
{
    template <typename ExecutorInfo>
    static Status tryVisit(ExecutorInfo info) noexcept
    {
        auto& nodeData = info.getNodeData();

        auto triggeringTaskStatus = info.currentTask.getExecutionStatus();
        if (triggeringTaskStatus == Status::eSuccess)
            nodeData.hasComputedUpstream = true; // we only set to true...doesn't matter which thread does it first
        else if (triggeringTaskStatus == Status::eDeferred)
            nodeData.hasDeferredUpstream = true; // we only set to true...doesn't matter which thread does it first

        std::size_t requiredCount = info.nextNode->getParents().size() - info.nextNode->getCycleParentCount();
        if ((requiredCount == 0) || (++nodeData.visitCount == requiredCount))
        {
            if (nodeData.hasDeferredUpstream)
                return Status::eDeferred;
            else
            {
                // spawning a task within executor doesn't change the upstream path. just reference the same one.
                ExecutionTask newTask(info.getContext(), info.nextNode, info.getUpstreamPath());
                if (nodeData.hasComputedUpstream ||
                    info.getContext()->getStateInfo(newTask)->needsCompute(info.getExecutionStamp()))
                    return info.schedule(std::move(newTask));
                else // continue downstream...there may be something dirty. Bypass scheduler to avoid unnecessary
                     // overhead
                    return info.continueExecute(newTask);
            }
        }

        return Status::eUnknown;
    }
};
// ef-docs execution-visit-cache-end

struct DefaultSchedulingStrategy
{
    static SchedulingInfo getSchedulingInfo(const ExecutionTask& task)
    {
        INode* node = task.getNode();
        if (node->getNodeDef())
            return node->getNodeDef()->getSchedulingInfo(task);
        else if (node->getNodeGraphDef())
            return node->getNodeGraphDef()->getSchedulingInfo(task);
        else
            return SchedulingInfo::eSchedulerBypass; // bypass the scheduler since there is nothing to compute
    }
};

template <typename ExecNode,
          typename ExecStrategy,
          typename ExecNodeData,
          typename Scheduler,
          typename SchedulingStrategy,
          typename ExecutorInterface = IExecutor>
class Executor : public Implements<ExecutorInterface>
{
    using Node = const ExecNode;
    using NodeData = ExecNodeData;
    using NodeDataArray = std::vector<NodeData>;
    using ThisExecutor = Executor<ExecNode, ExecStrategy, ExecNodeData, Scheduler, SchedulingStrategy, ExecutorInterface>;
    using ThisExecutorPtr = omni::core::ObjectPtr<ThisExecutor>;

    template <typename S, typename Enabled = void>
    struct is_deferred
    {
        static constexpr bool value = false;
    };

    template <typename S>
    struct is_deferred<
        S,
        std::enable_if_t<std::is_same<Status,
                                      decltype(std::declval<S>().scheduleDeferred(
                                          std::declval<IScheduleFunction*>(), std::declval<SchedulingInfo>()))>::value>>
    {
        static constexpr bool value = true;
    };

public:
    struct Info
    {
    private:
        Executor* m_executor;

    public:
        const ExecutionTask& currentTask;
        INode* nextNode;

        Info(Executor* executor, const ExecutionTask& task, INode* next) noexcept
            : m_executor(executor), currentTask(task), nextNode(next)
        {
        }

        NodeData& getNodeData()
        {
            return m_executor->getNodeData(nextNode);
        }

        ThisExecutor* getExecutor()
        {
            return m_executor;
        }

        IExecutionContext* getContext()
        {
            return currentTask.getContext();
        }

        Stamp getExecutionStamp()
        {
            return getContext()->getExecutionStamp();
        }

        const ExecutionPath& getUpstreamPath() const
        {
            return currentTask.getUpstreamPath();
        }

        Status schedule(ExecutionTask&& newTask)
        {
            return m_executor->scheduleInternal(std::move(newTask));
        }

        SchedulingInfo getSchedulingInfo(const ExecutionTask& task) const
        {
            return m_executor->getSchedulingInfo(task);
        }

        Scheduler& getScheduler()
        {
            return m_executor->m_scheduler;
        }

        Status continueExecute(ExecutionTask& currentTask)
        {
            return m_executor->continueExecute_abi(&currentTask);
        }
    };

    SchedulingInfo getSchedulingInfo(const ExecutionTask& task) const
    {
        return SchedulingStrategy::getSchedulingInfo(task);
    }

    NodeData& getNodeData(INode* node)
    {
        return m_nodeData[node->getIndexInTopology()];
    }

    const ExecutionPath& getPath() const
    {
        return m_path;
    }

    IExecutionContext* getContext() const
    {
        return m_task.getContext();
    }

    static ThisExecutorPtr create(omni::core::ObjectParam<ITopology> toExecute, const ExecutionTask& thisTask)
    {
        return omni::core::steal(new ThisExecutor(toExecute.get(), thisTask));
    }

protected:
    Executor() = delete;

    explicit Executor(ITopology* toExecute, const ExecutionTask& currentTask) noexcept
        : m_path((currentTask.getNode() != toExecute->getRoot()) ?
                     ExecutionPath(currentTask.getUpstreamPath(), currentTask.getNode()) :
                     currentTask.getUpstreamPath()),
          m_task(currentTask.getContext(), toExecute->getRoot(), m_path),
          m_nodeData(toExecute->getMaxNodeIndex()),
          m_scheduler(currentTask.getContext())
    {
    }

    // ef-docs executor-execute-begin
    Status execute_abi() noexcept override
    {
        // We can bypass all subsequent processing if the node associated with the task starting
        // this execution has no children. Note that we return an eSuccess status because nothing
        // invalid has occurred (e.g., we tried to execute an empty NodeGraphDef); we were asked to
        // compute nothing, and so we computed nothing successfully (no-op)!
        if (m_task.getNode()->getChildren().empty())
        {
            return Status::eSuccess | m_task.getExecutionStatus();
        }

        (void)continueExecute_abi(&m_task);

        // Give a chance for the scheduler to complete the execution of potentially parallel work which should complete
        // within current execution. All background tasks will continue pass this point.

        // Scheduler is responsible for collecting the execution status for everything that this executor generated.
        return m_scheduler.getStatus() | m_schedulerBypass;
    }
    // ef-docs executor-execute-end

    // ef-docs executor-continue-execute-begin
    Status continueExecute_abi(ExecutionTask* currentTask) noexcept override
    {
        OMNI_GRAPH_EXEC_ASSERT(currentTask);

        if (currentTask->getNode()->getChildren().empty())
        {
            return Status::eSuccess | currentTask->getExecutionStatus();
        }

        Status ret = Status::eUnknown;
        for (auto child : currentTask->getNode()->getChildren())
        {
            ret |= ExecStrategy::tryVisit(Info(this, *currentTask, child));
        }

        return ret | currentTask->getExecutionStatus();
    }
    // ef-docs executor-continue-execute-end

    Status schedule_abi(IScheduleFunction* fn, SchedulingInfo schedInfo) noexcept override
    {
        OMNI_GRAPH_EXEC_FATAL_UNLESS_ARG(fn);
        return scheduleExternal(fn, schedInfo);
    }

    template <typename S = Scheduler>
    Status scheduleInternal(ExecutionTask&& newTask, typename std::enable_if_t<!is_deferred<S>::value>* = nullptr)
    {
        // ef-docs executor-schedule-internal-begin
        Status ret = Status::eUnknown;
        SchedulingInfo schedInfo = getSchedulingInfo(newTask);
        if (schedInfo != SchedulingInfo::eSchedulerBypass)
        {
            // this task will finish before we exit executor...just capture as reference to avoid unnecessary cost
            ret = m_scheduler.schedule([executor = this, task = std::move(newTask)]() mutable -> Status
                                       { return task.execute(executor); },
                                       schedInfo);
        }
        else // bypass the scheduler...no need for extra scheduling overhead
        {
            m_schedulerBypass |= newTask.execute(this);
        }

        return ret;
        // ef-docs executor-schedule-internal-end
    }

    template <typename S = Scheduler>
    Status scheduleInternal(ExecutionTask&& newTask, typename std::enable_if_t<is_deferred<S>::value>* = nullptr)
    {
        // ef-docs executor-schedule-deferred-begin
        SchedulingInfo schedInfo = getSchedulingInfo(newTask);
        // for deferred tasks, we capture executor as a shared_ptr (extra cost, but keeps object alive)
        Status ret = m_scheduler.scheduleDeferred(
            [executor = omni::core::borrow(this), task = std::move(newTask)]() mutable -> Status
            { return task.execute(executor); },
            schedInfo);

        return ret;
        // ef-docs executor-schedule-deferred-end
    }

    template <typename S = Scheduler>
    Status scheduleExternal(IScheduleFunction* fn,
                            SchedulingInfo schedInfo,
                            typename std::enable_if_t<!is_deferred<S>::value>* = nullptr)
    {
        if (schedInfo != SchedulingInfo::eSchedulerBypass)
        {
            return m_scheduler.schedule(fn, schedInfo);
        }
        else // bypass the scheduler...no need for extra scheduling overhead
        {
            return fn->invoke();
        }
    }

    template <typename S = Scheduler>
    Status scheduleExternal(IScheduleFunction* fn,
                            SchedulingInfo schedInfo,
                            typename std::enable_if_t<is_deferred<S>::value>* = nullptr)
    {
        return m_scheduler.scheduleDeferred(fn, schedInfo);
    }

    ExecutionPath m_path;
    ExecutionTask m_task;
    NodeDataArray m_nodeData;
    Scheduler m_scheduler;
    Status m_schedulerBypass{ Status::eUnknown };
};

using ExecutorFallback = Executor<INode, ExecutionVisit, ExecutionNodeData, SerialScheduler, DefaultSchedulingStrategy>;

} // namespace unstable
} // namespace exec
} // namespace graph
} // namespace omni