Scheduling Hints for OG Nodes

With the Execution Framework (EF) enabled by default in Kit 105, its current integration with OmniGraph (OG) now provides OG node developers an avenue for quickly boosting their nodal compute performance with minimal code overhead. This is done by adding so-called scheduling hints inside .ogn definitions, which allow the EF to more optimally schedule the marked nodes for evaluation (i.e. to determine when, how, and with what computational resources a given task, in this case running a node, should be executed).

Scheduling Hints Overview

In order to properly leverage scheduling hints to improve OG node computational performance, it helps to be aware of the exact types of scheduling dispatches that the EF currently supports. These can be found by following this link.

Adding a scheduling hint to a node is as simple as placing a "scheduling" property in the node’s .ogn file, followed by at least one of the following arguments:

  • "threadsafe": Indicates that this node can be executed in parallel with other "threadsafe" (and serially-scheduled) nodes. In other words, nodes with this scheduling hint can run concurrently on multiple CPU threads, thereby yielding the greatest overall performance gains relative to all other possible hints one can apply. Corresponds to omni::graph::exec::unstable::SchedulingInfo::eParallel scheduling behavior. In an .ogn file this hint would look like this:

"scheduling": ["threadsafe"]
  • "usd-write": Indicates that this node writes data out to the global USD stage, which can lead to trouble if other nodes also attempt reading from/writing to stage at the same time (note that there are ways of circumventing this, which will be discussed/shown later). For this reason, nodes with this hint are executed in isolation, i.e. no other nodes can run concurrently until the "usd-write" node’s compute method has finished. Note that nodes with this hint will typically force quite-significant slow-downs in overall graph execution speed, and should thus be used sparingly/only when absolutely necessary. Corresponds to omni::graph::exec::unstable::SchedulingInfo::eIsolate scheduling behavior. In an .ogn file this hint would look like this:

"scheduling": ["usd-write"]

If no scheduling hint is explicitly specified for a node, the EF will default to scheduling the node in serial (corresponds to omni::graph::exec::unstable::SchedulingInfo::eSerial scheduling behavior), which forces all serial nodes to be executed one-after-another in a single thread. Note that nodes with the "threadsafe" hint will still be able to run concurrently alongside such serial nodes in different threads (unlike nodes with the "usd-write" hint, which force a temporary stop to all such concurrencies until it finishes being executed).

Scheduling Hint Exclusivity (Or Lack Thereof)

Note that scheduling hints are made to be non-exclusive, i.e. one can add multiple scheduling hints to the same OG node like so:

"scheduling": ["usd-write", "threadsafe"]

This is allowed because there exist some implementations of nodes that write back to USD in a thread-safe manner, which necessitates the use of multiple hints to both inform the EF how to properly schedule the node and to provide observers extra information on the node’s behavior in its .ogn file. In the above example the "threadsafe" hint takes priority over the usd-write hint, so the node would be scheduled for parallel execution in multiple threads.

Tips and Tricks For Determining What Scheduling Hint To Use

In a perfect world one would be able to declare all OG nodes as being "threadsafe" and reap the resultant multithreaded processing performance benefits. Unfortunately, many nodes encapsulate compute logic that simply cannot be run safely in a concurrent fashion, so attempting to stick "threadsafe" hints into all OG nodes would invariably lead to myriad runtime crashes. At the same time, being overly sparing in employing these hints (perhaps out of fear of running into thread-safety issues, or because the process seems like too much of a hassle) can leave substantial performance gains on the table. Having a quick set of features/”red flags” to look out for in node implementations for determining their level of thread-safety can thus be extremely valuable in helping developers apply the correct hints to their nodes (thereby speeding up overall graph processing) without necessarily having to perform any in-depth testing/profiling. To that end, below is a (relatively) simple flowchart of some node-specific features and implementation details to look out for when deciding what scheduling hint(s) to apply:

Note that:

  • A distinction is made between thread-safe and thread-efficient node behavior, i.e. even though a node may technically be safe to run concurrently, its internal implementation may actually hamper the kinds of performance gains one might seek from multithreaded graph evaluation.

    • For example, this is why it is not recommended to schedule Python nodes in parallel; the GIL only allows one thread to access the Python interpreter for running Python code at a time, which while technically thread-safe (race conditions between threads can’t really break out if only one is allowed to run at a time), also essentially nullifies the entire point of multithreaded scheduling. The extra CPU overhead incurred from spawning and managing multiple threads thus becomes a pure hindrance in this situation, hence the advice to completely avoid the issue by scheduling all Python nodes on a single thread.

  • Thread-safety regarding the use of bundles in OG is somewhat tenuous, and mostly relies on the facts that:

    • Currently OG is updated in isolation from all other Kit processes, which implies that during this compute time only OG can make changes to Fabric/the bundles built on top of Fabric. If this were not the case (i.e. if other external processes were able to simultaneously alter the state of Fabric while OG was evaluating), then all thread-safety bets surrounding the utilization of bundles in nodes would be off.

    • All bundle operations that go through the node’s database ABI (e.g. db.inputs.bundle.attributeByName, db.outputs.bundle = db.inputs.bundle, etc.) ultimately call upon methods found in the so-called DataModel class, which are protected with a global reader-writer mutex lock.

    Taking these two points into account, interacting with bundles strictly through the node’s database ABI guarantees thread-safety because (a) the DataModel restricts thread access to it with a global read-write mutex lock, and (b) no other non-OG processes are executing at the same time that might modify Fabric without going through the DataModel (and corresponding mutex safeguards). Thread-efficiency, on the other hand, is only assured if, in addition to the previous requirements, all of the bundle-related methods that a node’s compute calls into ultimately invoke DataModel functions that only require a reader mutex to be acquired (thereby allowing multiple threads to call on them concurrently).

  • Nodes that utilize the GPU(s) via CUDA don’t inherently require a specific type of scheduling hint. Implementation details will determine whether or not the node implementation is thread-safe (an example of which is shown later - see OgnDeformer1_GPU)

Code Examples

Presented below are a variety of node implementations, most of which also exist in various OG extensions; the ones that have been conjured up specifically for this documentation are marked as such for clarity’s sake. Alongside each definition is a discussion on how one could analyze the given code, using the methods described in previous sections, to reasonably deduce what scheduling behavior would best suit each one. With any luck, this will help further elucidate the process behind applying scheduling hints to nodes and showcase some of the similarities shared between nodes in each category to make future identification a bit easier.

Note that for brevity some of the examples have been trimmed to just show the node’s core compute logic.

Parallel-Scheduled Node Examples

OgnTutorialTupleArrays

// OgnTutorialTupleArrays.cpp

static bool compute(OgnTutorialTupleArraysDatabase& db)
{
    // Use the "auto&" declarations to avoid long type names, prone to error
    const auto& a = db.inputs.a();
    const auto& b = db.inputs.b();
    auto& result = db.outputs.result();

    // Some simple error checking never hurts
    if (a.size() != b.size())
    {
        db.logWarning("Input sizes did not match (%zu versus %zu)", a.size(), b.size());
        return false;
    }

    // The output contents are unrelated to the input contents so resize rather than copying
    result.resize(a.size());

    // Illustrating how simple these operations are thanks to iterators and the built-in operator*
    std::transform(a.begin(), a.end(), b.begin(), result.begin(),
                    [](const GfVec3f& valueA, const GfVec3f& valueB) -> float { return valueA * valueB; });

    return true;
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → No.

  2. Does the node utilize bundles in any capacity? → No.

  3. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → No.

  4. Is the node implemented using Python/Warp? → No.

  5. Does the node load/unload any extensions? → No.

Conclusion: Schedule this node in parallel with the "threadsafe" hint.

OgnDeformer1_GPU

// OgnDeformer1_GPU.cpp

#include "OgnDeformer1_GPUDatabase.h"

namespace omni
{
namespace graph
{
namespace examples
{
extern "C" void deformer1W(outputs::points_t outputPoints,
                           inputs::points_t points,
                           inputs::multiplier_t multiplier,
                           inputs::wavelength_t wavelength,
                           size_t numPoints);

class OgnDeformer1_GPU
{
public:
    static bool compute(OgnDeformer1_GPUDatabase& db)
    {
        size_t numberOfPoints = db.inputs.points.size();
        db.outputs.points.resize(numberOfPoints);
        if (numberOfPoints == 0)
        {
            return true;
        }

        deformer1W(
            db.outputs.points(), db.inputs.points(), db.inputs.multiplier(), db.inputs.wavelength(), numberOfPoints);

        return true;
    }
};
REGISTER_OGN_NODE()
}
}
}
// OgnDeformer1_GPU.cu

#include <OgnDeformer1_GPUDatabase.h>

__global__ void deformer1(outputs::points_t outputArray, inputs::points_t inputArray,
                          inputs::multiplier_t multiplier, inputs::wavelength_t wavelength, size_t numPoints)
{
    int i = blockIdx.x*blockDim.x + threadIdx.x;
    if (numPoints <= i) return;

    const float3* points = *inputArray;
    float3* outputPoints = *outputArray;

    float width = *wavelength;
    float height = 10.0f * (*multiplier);
    float freq = 10.0f;

    float3 point = points[i];
    float tx = freq * (point.x - width) / width;
    float ty = 1.5f * freq * (point.y - width) / width;

    point.z += height * (sin(tx) + cos(ty));

    outputPoints[i] = point;
}

extern "C"
void deformer1W(outputs::points_t outputArray, inputs::points_t inputArray,
                inputs::multiplier_t multiplier, inputs::wavelength_t wavelength, size_t numPoints)
{
    const int nt = 256;
    const int nb = (numPoints + nt - 1) / nt;

    deformer1<<<nb, nt>>>(outputArray, inputArray, multiplier, wavelength, numPoints);
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → No.

  2. Does the node utilize bundles in any capacity? → No.

  3. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → No.

  4. Is the node implemented using Python/Warp? → No.

  5. Does the node load/unload any extensions? → No.

Conclusion: Schedule this node in parallel with the "threadsafe" hint.

OgnWritePrimAttribute

// OgnWritePrimAttribute.cpp

static bool compute(OgnWritePrimAttributeDatabase& db)
{
    if (!db.inputs.value().resolved())
        return true;

    NodeObj nodeObj = db.abi_node();
    GraphObj graphObj = nodeObj.iNode->getGraph(nodeObj);

    auto& instance = db.internalState<OgnWritePrimAttribute>();

    if (!instance.m_correctlySetup)
    {
        instance.setup(nodeObj, graphObj, db.getInstanceIndex());
    }
    else
    {
        auto path = db.inputs.usePath() ? db.inputs.primPath().token : db.stringToToken(db.inputs.prim.path()).token;

        if (path != instance.m_destPathToken.token || db.inputs.name() != instance.m_destAttrib)
            instance.setup(nodeObj, graphObj, db.getInstanceIndex());
    }

    if (instance.m_correctlySetup)
    {
        copyAttributeDataToPrim(db.abi_context(),
            instance.m_destPath,
            instance.m_destAttrib,
            nodeObj,
            inputs::value.m_token,
            db.getInstanceIndex(),
            true,
            db.inputs.usdWriteBack());

        db.outputs.execOut() = kExecutionAttributeStateEnabled;
        return true;
    }

    return false;
}
// PrimCommon.cpp

// Helper to copy data from our attribute to the target prim
void copyAttributeDataToPrim(const GraphContextObj& context,
                             PathC destPath,
                             TokenC destName,
                             const NodeObj& srcNode,
                             TokenC srcName,
                             InstanceIndex instanceIndex,
                             bool allowDisconnected,
                             bool usdWriteBack)
{
    AttributeObj inputAttr = srcNode.iNode->getAttributeByToken(srcNode, srcName);

    // Implementation details have been nixed for brevity's sake...

    ConstAttributeDataHandle inputHandle = inputAttr.iAttribute->getConstAttributeDataHandle(inputAttr, instanceIndex);
    copyAttributeData(context, destPath, destName, srcNode, inputHandle, usdWriteBack);
}

// Helper to copy data from an input to a destination attribute
void copyAttributeData(GraphContextObj const& context,
                       PathC destPath,
                       TokenC destName,
                       NodeObj const& srcNode,
                       ConstAttributeDataHandle const& inputHandle,
                       bool const usdWriteBack)
{
    AttributeDataHandle const dstHandle{ AttrKey{ destPath.path, destName.token } };
    context.iBundle->copyAttribute(context, BundleHandle{ destPath.path }, destName, inputHandle);
    if (usdWriteBack)
    {
        context.iContext->registerForUSDWriteBack(context, (BundleHandle)destPath.path, destName);
    }
    else
    {
        // Implementation details have been nixed for brevity's sake...
    }
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → Yes, it writes some attribute data out to a target USD prim.

  2. Is the node utilizing the registerForUSDWriteBack method to do so? → Yes, the compute method calls into copyAttributeDataToPrim, which calls into copyAttributeData, which finally calls into registerForUsdWriteBack.

  3. Does the node utilize bundles in any capacity? → No.

  4. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → No.

  5. Is the node implemented using Python/Warp? → No.

  6. Does the node load/unload any extensions? → No.

Conclusion: Add both the "usd-write" and "threadsafe" scheduling hints to this node; the latter will take precedence and ensure that the node can be executed in parallel, while the former simply lets observers know that the node writes to USD.

OgnArrayLength

// OgnArrayLength.cpp

// Outputs the length of a specified array attribute in an input prim,
// or 1 if the attribute is not an array attribute.
static bool compute(OgnArrayLengthDatabase& db)
{
    auto bundledAttribute = db.inputs.data().attributeByName(db.inputs.attrName());
    db.outputs.length() = bundledAttribute.isValid() ? bundledAttribute.size() : 0;

    return true;
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → No.

  2. Does the node utilize bundles in any capacity? → Yes, the node ingests a bundle as part of its input.

  3. Are the bundles operated on using strictly methods from the database ABI? → Yes, Database ABI Only; the bundle is only accessed via db.inputs.data(), and the only function invoked on it is attributeByName.

  4. Do all of the DataModel methods underlying the operations being performed on the bundles allow for multiple reader thread access? → Yes, attributeByName eventually invokes DataModel::commonGetAttribute, which only requires a reader mutex lock to utilize.

  5. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → No.

  6. Is the node implemented using Python/Warp? → No.

  7. Does the node load/unload any extensions? → No.

Conclusion: Schedule this node in parallel with the "threadsafe" hint.

Serially-Scheduled Node Examples

OgnSharedDataWrite (Made-up!)

// SharedState.h

#pragma once
#include <mutex>

static std::mutex s_mutex;
static int s_counter = 0;
// OgnSharedDataWrite.cpp

#include <OgnSharedDataWriteDatabase.h>
#include "../include/omni/graph/madeup/SharedState.h"
#include <chrono>
#include <thread>

namespace omni
{
namespace graph
{
namespace madeup
{
class OgnSharedDataWrite
{
public:
    static bool compute(OgnSharedDataWriteDatabase& db)
    {
        // Mutex lock prevents other threads from accessing the shared
        // integer counter at the same time. Also sleep to simulate
        // complex compute.
        std::lock_guard<std::mutex> guard(s_mutex);
        std::this_thread::sleep_for(std::chrono::seconds(2));
        ++s_counter;

        return true;
    }
};
REGISTER_OGN_NODE()
}
}
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → No.

  2. Does the node utilize bundles in any capacity? → No.

  3. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → Yes, this node attempts to increment a shared static integer counter.

  4. Are these write operations being performed in a thread-safe manner? → Yes, mutex locks are used to ensure that only one thread at a time has access to s_counter.

  5. Are these write operations being performed in a thread-efficient manner? → No, if multiple OgnSharedDataWrite nodes were to be executed in separate parallel threads, each thread would have to wait for its turn to access s_counter thanks to the mutex lock. The resultant graph evaluation behavior would thus be similar to a serial scheduling pattern, except for the fact that extra overhead would also be incurred from needlessly utilizing multiple threads for the computes.

Conclusion: Schedule this node in serial (done by default when no scheduling hint is specified in the .ogn).

OgnTutorialComplexDataPy

# OgnTutorialComplexDataPy.py

def compute(db) -> bool:
    """
    Multiply a float array by a float[3] to yield a float[3] array, using the point3f role.
    Practically speaking the data in the role-based attributes is no different than the underlying raw data
    types. The role only helps you understand what the intention behind the data is, e.g. to differentiate
    surface normals and colours, both of which might have float[3] types.
    """

    # Verify that the output array was correctly set up to have a "point" role
    assert db.role.outputs.a_productArray == db.ROLE_POINT

    multiplier = db.inputs.a_vectorMultiplier
    input_array = db.inputs.a_inputArray
    input_array_size = len(db.inputs.a_inputArray)

    # The output array should have the same number of elements as the input array.
    # Setting the size informs fabric that when it retrieves the data it should allocate this much space.
    db.outputs.a_productArray_size = input_array_size

    # The assertions illustrate the type of data that should have been received for inputs and set for outputs
    assert isinstance(multiplier, numpy.ndarray)  # numpy.ndarray is the underlying type of tuples
    assert multiplier.shape == (3,)
    assert isinstance(input_array, numpy.ndarray)  # numpy.ndarray is the underlying type of simple arrays
    assert input_array.shape == (input_array_size,)

    # If the input array is empty then the output is empty and does not need any computing
    if input_array.shape[0] == 0:
        db.outputs.a_productArray = []
        assert db.outputs.a_productArray.shape == (0, 3)
        return True

    # numpy has a nice little method for replicating the multiplier vector the number of times required
    # by the size of the input array.
    #   e.g. numpy.tile( [1, 2], (3, 1) ) yields [[1, 2], [1, 2], [1, 2]]
    product = numpy.tile(multiplier, (input_array_size, 1))

    # Multiply each of the tiled vectors by the corresponding constant in the input array
    for i in range(0, product.shape[0]):
        product[i] = product[i] * input_array[i]
    db.outputs.a_productArray = product

    # Make sure the correct type of array was produced
    assert db.outputs.a_productArray.shape == (input_array_size, 3)

    return True

Following the flowchart steps:

  1. Does the node write to the USD stage? → No.

  2. Does the node utilize bundles in any capacity? → No.

  3. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → No.

  4. Is the node implemented using Python/Warp? → Yes, this node is implemented using Python.

Conclusion: Schedule this node in serial (done by default when no scheduling hint is specified in the .ogn).

Isolate-Scheduled Node Examples

OgnPrimDeformer1

// OgnPrimDeformer1.cpp

static bool compute(OgnPrimDeformer1Database& db)
{
    const auto& contextObj = db.abi_context();
    const auto& nodeObj = db.abi_node();
    const IGraphContext& iContext = *contextObj.iContext;
    NodeContextHandle node = nodeObj.nodeContextHandle;

    static const Token inputMeshName("inputMesh");
    static const Token outputMeshName("outputMesh");
    static const Token pointsName("points");

    // Get input bundle.
    omni::fabric::PathC const inputBundlePath = iContext.getInputTarget(contextObj, node, inputMeshName, db.getInstanceIndex());
    ogn::BundleContents<ogn::kOgnInput, ogn::kAny> const inputBundle{ contextObj, inputBundlePath };
    ConstBundleHandle inputMesh = inputBundle.abi_bundleHandle();

    // Make output bundle from input prim.
    BundleHandle outputMesh = iContext.copyBundleContentsIntoOutput(contextObj, node, outputMeshName, inputMesh, db.getInstanceIndex());

    AttributeDataHandle outputPointsAttr = getAttributeW(contextObj, outputMesh, pointsName);
    Float3* const* pointsArray = getDataW<Float3*>(contextObj, outputPointsAttr);
    if (! pointsArray)
    {
        return true;
    }
    Float3* points = *pointsArray;
    size_t pointCount = getElementCount(contextObj, outputPointsAttr);

    static const Token multiplierName("multiplier");
    ConstAttributeDataHandle multiplierAttr = getAttributeR(contextObj, node, multiplierName, db.getInstanceIndex());
    const float* pMultiplier = getDataR<float>(contextObj, multiplierAttr);
    float multiplier = pMultiplier ? *pMultiplier : 1.0f;

    static const Token wavelengthName("wavelength");
    ConstAttributeDataHandle wavelengthAttr = getAttributeR(contextObj, node, wavelengthName, db.getInstanceIndex());
    const float* pWavelength = getDataR<float>(contextObj, wavelengthAttr);
    float wavelength = pWavelength ? *pWavelength : 1.0f;

    float width = wavelength;
    float height = 10.0f * multiplier;
    float freq = 10.0f;

    for (uint32_t i = 0; i < pointCount; i++)
    {
        carb::Float3 point = points[i];
        float tx = freq * (point.x - width) / width;
        float ty = 1.5f * freq * (point.y - width) / width;

        point.z += height * (sin(tx) + cos(ty));

        points[i] = point;
    }

    return true;
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → No.

  2. Does the node utilize bundles in any capacity? → Yes, the node is using bundles to both read in and write out data during compute. In this case it’s copying a constant input bundle into a writable output bundle (via the iContext.copyBundleContentsIntoOutput) before modifying the output bundle’s points attribute.

  3. Are the bundles operated on using strictly methods from the database ABI? → No, this node is using non-database ABI methods on the bundles. For example, it’s using the iContext.copyBundleContentsIntoOutput function to copy the input bundle into the output bundle.

  4. Are all of these non-database ABI mechanisms for operating on the bundles eventually getting routed through some DataModel function? → Yes; for example, iContext.copyBundleContentsIntoOutput ultimately invokes DataModel::copy, a method that is protected with a reader mutex lock (which implies that efficient multithreaded bundle copying is indeed possible).

  5. Does the node write to any other external/shared data containers (not including the USD stage and the node’s direct outputs)? → Yes, this node attempts to directly write to the “points” output attribute inside of the output bundle, which itself is stored in a bucket in Fabric (which is external to the node’s scope), via a writable data pointer.

  6. Are these write operations being performed in a thread-safe manner? → No, there are no protections in place for writing over the output bundle’s “points” attribute in this node; there could be another node executing in another thread, for example, that tries accessing the data in the same attribute on the same bundle while it’s being written into by this node.

Conclusion: Schedule this node in isolation with the "usd-write" hint.

OgnSetPrimActive

// OgnSetPrimActive.cpp

static bool compute(OgnSetPrimActiveDatabase& db)
{
    const auto& primPath = db.inputs.prim();
    if (pxr::SdfPath::IsValidPathString(primPath))
    {
        // Find our stage
        const GraphContextObj& context = db.abi_context();
        long stageId = context.iContext->getStageId(context);
        auto stage = pxr::UsdUtilsStageCache::Get().Find(pxr::UsdStageCache::Id::FromLongInt(stageId));
        if (!stage)
        {
            db.logError("Could not find USD stage %ld", stageId);
            return false;
        }
        pxr::UsdPrim targetPrim = stage->GetPrimAtPath(pxr::SdfPath(primPath));
        if (!targetPrim)
        {
            db.logError("Could not find prim \"%s\" in USD stage", primPath.data());
            return false;
        }
        return targetPrim.SetActive(db.inputs.active());
    }
    return true;
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → Yes, it binds some user-specified material to an input prim, i.e. targetPrim.SetActive(db.inputs.active()).

  2. Is the node utilizing the registerForUSDWriteBack method to do so? → No.

Conclusion: Schedule this node in isolation with the "usd-write" hint.

OgnWritePrimMaterial

// OgnWritePrimMaterial.cpp

static bool compute(OgnWritePrimMaterialDatabase& db)
{
    const auto& primPath = db.inputs.primPath();
    if (!PXR_NS::SdfPath::IsValidPathString(primPath)) {
        db.logError("Invalid prim path");
        return false;
    }
    const auto& materialPath = db.inputs.materialPath();

    // Find our stage
    const GraphContextObj& context = db.abi_context();
    long stageId = context.iContext->getStageId(context);
    PXR_NS::UsdStagePtr stage = pxr::UsdUtilsStageCache::Get().Find(pxr::UsdStageCache::Id::FromLongInt(stageId));
    if (!stage)
    {
        db.logError("Could not find USD stage %ld", stageId);
        return false;
    }

    PXR_NS::UsdPrim prim = stage->GetPrimAtPath(PXR_NS::SdfPath(primPath));
    if (!prim) {
        db.logError("Could not find USD prim");
        return false;
    }

    PXR_NS::UsdShadeMaterialBindingAPI materialBinding(prim);
    PXR_NS::UsdPrim materialPrim = stage->GetPrimAtPath(PXR_NS::SdfPath(materialPath));
    if (!materialPrim) {
        db.logError("Could not find USD material");
        return false;
    }
    PXR_NS::UsdShadeMaterial material(materialPrim);
    if (!materialBinding.Bind(material)) {
        db.logError("Could not bind USD material to USD prim");
        return false;
    }

    db.outputs.execOut() = kExecutionAttributeStateEnabled;

    return true;
}

Following the flowchart steps:

  1. Does the node write to the USD stage? → Yes, it binds some user-specified material to an input prim, i.e. materialBinding.Bind(material).

  2. Is the node utilizing the registerForUSDWriteBack method to do so? → No.

Conclusion: Schedule this node in isolation with the "usd-write" hint.

Testing for Node Thread Safety

While the aforementioned decision flowchart and examples may be helpful in correctly identifying the best scheduling behavior for a variety of nodes, it’s unfortunately not a catch-all solution. Perhaps a node implements some behavior that isn’t covered in this documentation (very possible given the large scope of what a node is allowed to do), which could leave a developer unsure of how to best approach the issue. The workflow discussed thus far also does little for detecting potential future regressions where a node’s behavior changes significantly-enough that it loses compatibility with its given scheduling hint.

In both cases one would benefit greatly from having targeted “scheduling behavior” tests, whether it be to experimentally determine how a node should be scheduled or to simply add greater code coverage to ensure that future node alterations remain congenial with the manner in which they are scheduled for execution.

Typically these tests tend to focus around verifying the thread-safety status of a node, especially since:

  1. Developers stand to gain significant performance boosts if a node can be flipped to concurrent evaluation.

  2. Particularly nasty breakages can occur when the EF attempts to execute not-thread-safe nodes in parallel; the opposite scenerio, i.e. thread-safe nodes being evaluated serially and/or in isolation, may drag performance down unnecessarily but won’t lead to the code base crashing, hanging, etc. entirely.

While developers are more than welcome to write their own specialized thread-safety tests, a few different tools have already been developed to automate parts of the relevant test-development process; these tools will be discussed below in a bit more detail.

Autogenerated Tests from .ogn Constructs

Whenever a node’s .ogn file has the "threadsafe" scheduling hint and a user-defined test construct (see the ogn_user_guide for more information on adding unit tests directly to a node’s .ogn), a Python threading test called test_thread_safety will automatically be generated. This threading test will create multiple copies of the test graph setup(s) specified in the .ogn and execute them many times to look for potential race conditions. This is perhaps the easiest way to begin verifying that a node is thread-safe and/or establishing test coverage for thread-safe nodes, but it does come with some major limitations.

For starters, the .ogn test construct itself is quite constricted in terms of the features one might like to use when designing complete unit tests. While this is partly by design (more complex tests should be written as separate external Python scripts), it’s still important to keep in mind that actions such as:

  • Specifying graph execution behavior (e.g. push vs. dirty push vs. execution vs. …; The autogenerated test currently defaults to running all nodes on a push graph).

  • Dynamically adding/removing node attributes, connecting/disconnecting nodes from one another, adding/removing nodes after the test graph has been initially populated.

  • Creating multiple test graphs at once.

  • etc.

are not (presently) possible to exercise in .ogn test constructs; the resultant restriction in scope on the number of possible behaviors that these test scenes can pick up on means that a node may exhibit thread-safety issues which simply don’t/cannot be discovered via the autogenerated threading test.

On the other hand, these limitations sometimes have the opposite effect and lead to an autogenerated thread-safety test failing despite the fact that the underlying node is perfectly capable of being safely executed concurrently. One such situation that’s popped up a few times in the past involved nodes with non-deterministic/stateful behavior (i.e. their internal state depends on the number of times that they’ve been evaluated). Take, for example, the OgnCounter Action Graph node, which increments its internal counter by one every time it’s executed. If a developer added the following test construct to its .ogn and left out the "threadsafe" scheduling hint:

"tests" : [
  {"outputs:count": 1, "state_set:count": 0, "inputs:execIn": 1},
]

then the test would pass with no issue - the node gets executed once, resulting in the counter going up by one. If the "threadsafe" hint is then added back in, however, the resultant autogenerated threading test will actually fail. This is because a portion of the test involves executing each graph instance multiple times before checking the final condition. Each graph evaluation leads to the node’s internal counter being incremented by one, so if the threading test runs each graph 10 times, then the instanced nodes’ final output counts will be 10, and not 1 as is expected in the “outputs:count" attribute, hence the failure. One could thus be misled to believe that OgnCounter is not thread-safe, when the exact opposite is true.

In general, it’s important to remember that these autogenerated thread-safety tests, although convenient, are not applicable to every node (as they’re currently designed) and do not always yield correct results; treat them as an extra source of reassurance when determining what scheduling behavior will best suite a given node rather than ironclad proof that a node is (or isn’t) thread-safe.

ThreadsafetyTestUtils

The ThreadsafetyTestUtils class (located in the omni.graph extension) provides a set of utility functions and decorators that allow one to (relatively) easily convert a typical test coroutine for a node in some external Python script into a fully-fledged thread-safety test. Similar to the autogenerated threading tests from above, this is accomplished behind-the-hood by essentially “duplicating” the given test script (or more specifically the test graph created by the script), executing them (the test graphs) concurrently, and checking whether each test instance passes. If they don’t, then there might exist some threading issues with some of the nodes in the test graph. The process for doing so is perhaps best shown with an example:

“Regular” Async, Non-Threaded Version of a Node Unit Test:

import omni.graph.core as og
import omni.graph.core.tests as ogts

class TestClass(ogts.OmniGraphTestCase):
    """Test Class"""

    TEST_GRAPH_PATH = "/World/TestGraph"
    keys = og.Controller.Keys

    async def setUp(self):
        """Set up  test environment, to be torn down when done"""
        await super().setUp()

    async def test_forloop_node(self):
        """Test ForLoop node"""
        context = omni.usd.get_context()
        stage = context.get_stage()

        # Create a prim + add an attribute to it.
        prim = stage.DefinePrim("/World/TestPrim")
        prim.CreateAttribute("val1", Sdf.ValueTypeNames.Int2, False).Set(Gf.Vec2i(1, 1))

        # Instance a test graph setup.
        graph_path = self.TEST_GRAPH_PATH
        og.Controller.create_graph({"graph_path": graph_path, "evaluator_name": "execution"})
        (, (, _, _, _, _, _, write_node, finish_counter), _, _) = og.Controller.edit(
            graph_path,
            {
                self.keys.CREATE_NODES: [
                    ("OnTick", "omni.graph.action.OnTick"),
                    ("Const", "omni.graph.nodes.ConstantInt2"),
                    ("StopNum", "omni.graph.nodes.ConstantInt"),
                    ("Add", "omni.graph.nodes.Add"),
                    ("Branch", "omni.graph.action.Branch"),
                    ("For", "omni.graph.action.ForLoop"),
                    ("Write1", "omni.graph.nodes.WritePrimAttribute"),
                    ("FinishCounter", "omni.graph.action.Counter"),
                ],
                self.keys.SET_VALUES: [
                    ("OnTick.inputs:onlyPlayback", False),
                    ("Const.inputs:value", [1, 2]),
                    ("StopNum.inputs:value", 3),
                    ("Write1.inputs:name", "val1"),
                    ("Write1.inputs:primPath", "/World/TestPrim"),
                    ("Write1.inputs:usePath", True),
                    ("Branch.inputs:condition", True),
                ],
                self.keys.CONNECT: [
                    ("OnTick.outputs:tick", "For.inputs:execIn"),
                    ("StopNum.inputs:value", "For.inputs:stop"),
                    ("For.outputs:loopBody", "Branch.inputs:execIn"),
                    ("For.outputs:finished", "FinishCounter.inputs:execIn"),
                    ("Branch.outputs:execTrue", "Write1.inputs:execIn"),
                    ("For.outputs:value", "Add.inputs:a"),
                    ("Const.inputs:value", "Add.inputs:b"),
                    ("Add.outputs:sum", "Write1.inputs:value"),
                ],
            },
        )

        # Evaluate the graph.
        await og.Controller.evaluate()
        self.assertListEqual([3, 4], list(stage.GetAttributeAtPath("/World/TestPrim.val1").Get()))
        self.assertEqual(3, write_node.get_compute_count())
        self.assertEqual(1, finish_counter.get_compute_count())

        # Remove the prim from the stage at the very end of the test, when it's no longer needed.
        stage.RemovePrim("/World/TestPrim")

Threaded Version of a Node Unit Test:

import omni.graph.core as og
from omni.graph.core import ThreadsafetyTestUtils
import omni.graph.core.tests as ogts

class TestClass(ogts.OmniGraphTestCase):
    """Test Class"""

    TEST_GRAPH_PATH = "/World/TestGraph"
    keys = og.Controller.Keys

    async def setUp(self):
        """Set up  test environment, to be torn down when done"""
        await super().setUp()

    @ThreadsafetyTestUtils.make_threading_test
    def test_forloop_node(self, test_instance_id: int = 0):
        """Test ForLoop node"""
        context = omni.usd.get_context()
        stage = context.get_stage()

        # Since we want to use the same prim across all graph instances in the
        # thread-safety test, we add it to the threading cache like so:
        prim = ThreadsafetyTestUtils.add_to_threading_cache(test_instance_id, stage.DefinePrim("/World/TestPrim"))

        # We only want to add this new attribute to the prim once at the start of the
        # threading test.
        ThreadsafetyTestUtils.single_evaluation_first_test_instance(
            test_instance_id,
            lambda: prim.CreateAttribute("val1", Sdf.ValueTypeNames.Int2, False).Set(Gf.Vec2i(1, 1))
        )

        # Instance a test graph setup. Note that we append the graph path with the test_instance_id
        # so that the graph can be uniquely identified in the thread-safety test!
        graph_path = self.TEST_GRAPH_PATH + str(test_instance_id)
        og.Controller.create_graph({"graph_path": graph_path, "evaluator_name": "execution"})
        (, (, _, _, _, _, _, write_node, finish_counter), _, _) = og.Controller.edit(
            graph_path,
            {
                self.keys.CREATE_NODES: [
                    ("OnTick", "omni.graph.action.OnTick"),
                    ("Const", "omni.graph.nodes.ConstantInt2"),
                    ("StopNum", "omni.graph.nodes.ConstantInt"),
                    ("Add", "omni.graph.nodes.Add"),
                    ("Branch", "omni.graph.action.Branch"),
                    ("For", "omni.graph.action.ForLoop"),
                    ("Write1", "omni.graph.nodes.WritePrimAttribute"),
                    ("FinishCounter", "omni.graph.action.Counter"),
                ],
                self.keys.SET_VALUES: [
                    ("OnTick.inputs:onlyPlayback", False),
                    ("Const.inputs:value", [1, 2]),
                    ("StopNum.inputs:value", 3),
                    ("Write1.inputs:name", "val1"),
                    ("Write1.inputs:primPath", "/World/TestPrim"),
                    ("Write1.inputs:usePath", True),
                    ("Branch.inputs:condition", True),
                ],
                self.keys.CONNECT: [
                    ("OnTick.outputs:tick", "For.inputs:execIn"),
                    ("StopNum.inputs:value", "For.inputs:stop"),
                    ("For.outputs:loopBody", "Branch.inputs:execIn"),
                    ("For.outputs:finished", "FinishCounter.inputs:execIn"),
                    ("Branch.outputs:execTrue", "Write1.inputs:execIn"),
                    ("For.outputs:value", "Add.inputs:a"),
                    ("Const.inputs:value", "Add.inputs:b"),
                    ("Add.outputs:sum", "Write1.inputs:value"),
                ],
            },
        )

        # Evaluate the graph(s). Yielding to wait for compute to happen across
        # all graph instances before continuing the test.
        yield ThreadsafetyTestUtils.EVALUATION_ALL_GRAPHS
        self.assertListEqual([3, 4], list(stage.GetAttributeAtPath("/World/TestPrim.val1").Get()))
        self.assertEqual(3, write_node.get_compute_count())
        self.assertEqual(1, finish_counter.get_compute_count())

        # Remove the prim from the stage at the very end of
        # the test, when it's no longer needed.
        ThreadsafetyTestUtils.single_evaluation_last_test_instance(
            test_instance_id,
            lambda: stage.RemovePrim("/World/TestPrim")
        )

To summarize, converting from the former to the latter involves:

  • Adding the make_threading_test decorator to the top of the test method one wishes to convert.

  • Removing the async keyword from the test.

  • Adding a test_instance_id argument to the test method and using it to identify objects that need to remain unique in each instanced test (e.g. the test graph path, which is the most common use for test_instance_id).

  • Replacing all await og.Controller.evaluate() calls with yield ThreadsafetyTestUtils.EVALUATION_ALL_GRAPHS.

  • Replacing all await omni.kit.app.get_app().next_update_async() calls with yield ThreadsafetyTestUtils.EVALUATION_WAIT_FRAME (not shown in the above example).

  • Adding any objects/variables that need to persist/remain the same across all test instances to an internal cache using the ThreadsafetyTestUtils.add_to_threading_cache() method.

  • Executing any code that needs to occur only once at the start of the test via the ThreadsafetyTestUtils.single_evaluation_first_test_instance() method.

  • Executing any code that needs to occur only once at the end of the test via the ThreadsafetyTestUtils.single_evaluation_last_test_instance() method.

Finally, note that threaded tests which are converted in this manner can also be easily configured to become serial tests once more by just swapping the make_threading_test decorator with make_serial_test (rather than having to rewrite the entire chunk of code to resemble the “async version”).