Events#

API/design overview#

Event handling is provided by the carb.eventdispatcher plugin which exports the IEventDispatcher interface. The goal of this event system is to provide a performant, thread-safe weak-coupling system to pass data between systems where tight-coupling is not desired.

An Event in Kit Kernel is a small lightweight object that consist of an event name and an optional ABI-safe payload of key/value pairs using the carb.variant plugin.

Python bindings are provided for carb.eventdispatcher as well (carb.eventdispatcher.IEventDispatcher and carb.eventdispatcher.IMessageQueue).

The two main features that IEventDispatcher provides is observing and dispatching. Events can be also queued in a IMessageQueue and deferred to be dispatched at a later time.

Observing Events#

Observing an event means to register a callback that is called in priority order immediately when an event is dispatched. Events are named in a global namespace, so fully-qualified descriptive names such as <plugin/extension>:<event>:<subtype> are recommended. For example, omni.kit.app will dispatch error logs as event name omni.kit.app:error_log:immediate. It is also recommended that constants be provided for the event names to catch misspellings at compile time.

When an event is observed, an ObserverGuard instance is returned. As long as it is valid, the event observation is active. To stop observing the event, simply destroy or reset the ObserverGuard.

Event observation fully conforms to Carbonite’s Basic Callback Hygiene. That is:

  • During the event callback, it is expressly allowed to stop (or start) observing any event, including the currently dispatching event (that is, destroying or resetting the ObserverGuard managing the observation).

  • If another thread is currently dispatching events to an observer, attempts to stop that observer will wait until the call into the observer from the other thread has completed.

  • No internal locks are held inside IEventDispatcher while calling observers.

C++ example of observing an event:

auto ed = carb::getCachedInterface<carb::eventdispatcher::IEventDispatcher>();

// The returned ObserverGuard instance must be retained to maintain the observer.
m_updateEvtSub = ed->observeEvent(
    carb::RStringKey("omni.kit.ui/Menu"), // name of observer for debugging/profiling
    carb::eventdispatcher::kDefaultOrder, // priority order
    omni::kit::kGlobalEventUpdate, // event name to observe
    [this](const carb::eventdispatcher::Event& e) {
        const static carb::RStringKey kDt("dt");
        _update(e.getValueOr<float>(kDt, 0.f)); // Reads the 'dt' key from the payload and passes it to _update()
    } /* optional filter arguments */);

Python example of observing the same event:

import carb.eventdispatcher

class MyClass:
    # ...
    def __init__(self):
        # The returned object must be retained so the observer is not deleted immediately.
        self._sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
            observer_name="omni.kit.ui/Menu",
            event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
            on_event=handle_event # may also be a lambda
            #, filter=<dict of optional filter arguments>
        )

    def handle_event(self, e: carb.eventdispatcher.Event):
        self._update(e['dt']) # Reads the 'dt' key from the payload and passes it to self._update()

Warning

observe_event() returns an object that must be retained. When this object is reset() or deleted, the observation will end. Make sure to keep a reference to the returned object.

Dispatching Events#

Dispatching an event causes the current thread to find all observers and call them in priority order. Read-only arguments can be passed to the dispatch call which form the Event's payload. Those arguments can then be read by observers. If an observer was registered with filter arguments, the dispatching thread will check those arguments and skip any observers that do not match.

The Event payload may contain rich ABI-safe arguments provided by the carb.variant plugin. See the documentation for Variant and NamedVariant for more information.

There is very low average cost to dispatching events with no listeners.

C++ example of dispatching an immediate event:

const auto kGlobalClickedEventImmediate = carb::RString("omni.kit.window.status_bar@clicked:immediate");
const auto kClicked = carb::RStringKey("clicked");

// Dispatch the Immediate event
carb::getCachedInterface<carb::eventdispatcher::IEventDispatcher>()->dispatchEvent(
    kGlobalClickedEventImmediate,
    std::make_pair(kClicked, true)
);

And the same example from Python:

import carb.eventdispatcher
CLICKED_GLOBAL_EVENT: str = "omni.kit.window.status_bar@clicked:immediate"

carb.eventdispatcher.get_eventdispatcher().dispatch_event(CLICKED_GLOBAL_EVENT, {"clicked": true})

Warning

It is strongly recommended that no locks are held while dispatching an event!

Deferring Events#

Events can also be deferred. This may be preferable in situations where dispatching an event may affect performance or may provide unwanted interaction with locked mutexes, etc. In these cases (and if possible), it is recommended to have a first-chance (immediate) event that is dispatched just before the event is queued, and then queue the event in an IMessageQueue instance. At a later time, the popAllAndDispatch() function can be used to drain the message queue and dispatch all of the deferred events.

System Events#

Many systems have their own events. For instance, omni.kit.app provides events to notify about shutdown (kGlobalEventPostQuit and kGlobalEventPreShutdown) that are dispatched to notify any observers of those situations.

Look for each system to document the events that it sends.

Here is a list of Kit Kernel events:

Best Practices#

The event dispatcher system is flexible and there are several recommendations that are intended to help the most frequent use-cases, as well as provide clarifications on specific parts of the event logic.

Deferred callbacks#

If an event must be dispatched but conditions are unsafe or otherwise prevent it, it is recommended to defer the event. The carb::eventdispatcher::IMessageQueue is intended specifically for this purpose. Events may be asynchronously pushed into a IMessageQueue instance and stored until such time as they can be safely dispatched. The carb::eventdispatcher::popAllAndDispatch() is a helper function to later pop and dispatch the events.

Event names#

Since events are dispatched and observed by name in a global namespace, having descriptive fully-qualified event names is highly recommended to avoid collisions and to aid debugging.

The recommended format is: <plugin/extension name>:<event>[:<discriminator>].

It is also recommended that event names be available as a constant so that misspellings are caught by the compiler, or in the case of Python as an exception rather than an event that is never observed.

An example from omni.kit.app would be the events omni.kit.app:error_log (deferred) and omni.kit.app:error_log:immediate (immediate). These events are available through C++ constants omni::kit::kGlobalEventErrorLog and omni::kit::kGlobalEventErrorLogImmediate, respectively. The event named with the :immediate suffix is dispatched immediately when an error log occurs in the thread where the error log occurred. This ideally should have very few (if any) observers. The event is then queued in an IMessageQueue (as omni.kit.app:error_log without the :immediate discriminator), and then dispatched at the end of the frame (from the main thread).

Warning

It is possible for multiple threads to dispatch events simultaneously, even the same event (such as our omni.kit.app:error_log:immediate example).

Transient subscriptions#

It is often desired to have an event that happens only once (or a set number of times). It is perfectly valid to stop observing an event during the event callback. It is also valid to start observing a different event during an event callback.

This is how omni.kit.app.next_update_async() works: when called, it observes the global update event and then awaits a Future. When the next update event is observed, the Future is completed, which wakes the coroutine. When the coroutine exits, the ObserverGuard is destroyed which stops observing the update event.

    f = asyncio.Future()

    def on_event(ev: carb.eventdispatcher.Event):
        if not f.done():
            f.set_result(ev["dt"])

    _ = carb.eventdispatcher.get_eventdispatcher().observe_event(
        order, GLOBAL_EVENT_UPDATE, on_event, observer_name=name
    )
    return await f

Recursive dispatching#

It is possible to have an observer recursively dispatch a different event. Dispatching (or causing to dispatch) the same event recursively is not recommended. Care must be taken to ensure that endless recursion does not occur.

Additional Python Code examples#

Subscribe to Shutdown Events#

# App/Subscribe to Shutdown Events
import omni.kit.app
from carb.eventdispatcher import get_eventdispatcher

def on_post_quit_event(_):
    print("We are about to shutdown")

def on_pre_shutdown_event(_):
    print("We are shutting down")

sub1 = get_eventdispatcher().observe_event(event_name=omni.kit.app.GLOBAL_EVENT_POST_QUIT, on_event=on_post_quit_event, observer_name="name of the subscriber for debugging", order=0)
sub2 = get_eventdispatcher().observe_event(event_name=omni.kit.app.GLOBAL_EVENT_PRE_SHUTDOWN, on_event=on_pre_shutdown_event, observer_name="name of the subscriber for debugging", order=0)

Subscribe to Update Events#

# App/Subscribe to Update Events
import carb.eventdispatcher
import omni.kit.app

def on_update(e: carb.eventdispatcher.Event):
	print(f"Update: {e['dt']}")

sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
	order=0,
	event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
	on_event=on_update,
	observer_name="My Subscription Name"
)

Create custom event#

# App/Create Custom Event
import carb.events
import omni.kit.app

# Event is unique integer id. Create it from string by hashing, using helper function.
# [ext name].[event name] is a recommended naming convention:
MY_CUSTOM_EVENT = carb.events.type_from_string("omni.my.extension.MY_CUSTOM_EVENT")

# App provides common event bus. It is event queue which is popped every update (frame).
bus = omni.kit.app.get_app().get_message_bus_event_stream()

def on_event(e):
    print(e.type, e.type == MY_CUSTOM_EVENT, e.payload)

# Subscribe to the bus. Keep subscription objects (sub1, sub2) alive for subscription to work.
# Push to queue is called immediately when pushed
sub1 = bus.create_subscription_to_push_by_type(MY_CUSTOM_EVENT, on_event)
# Pop is called on next update
sub2 = bus.create_subscription_to_pop_by_type(MY_CUSTOM_EVENT, on_event)

# Push event the bus with custom payload
bus.push(MY_CUSTOM_EVENT, payload={"data": 2, "x": "y"})