Event streams#
Event streams are covered by the carb.events
plugin and carb::events::IEvents
Carbonite interface. The goal of event streams is to provide a means to move data around using the generalized interface in a thread-safe manner, and also act as a way to synchronize the logic.
API/design overview#
The singleton IEvents
interface is used to create IEventStream
objects. Whenever an event is being pushed into an event stream, the immediate callback is triggered, and the event stream stores the event in the internal event queue. Then, events can be popped from the queue one by one, or all at once (also called pump), and at this point deferred callbacks are triggered. The event stream owner typically controls where this pumping is happening.
Event consumers can subscribe to both immediate (push) and deferred (pop) callbacks. Subscription functions create ISubscription
class, which usually unsubscribes automatically upon destruction. Callbacks are wrapped into IEventListener
class that allows for context binding to the subscription, and upon triggering, the callback is triggered with the IEvent
passed as parameter, this parameter describes the event which triggered the callback. IEvent
contains event type, sender id and custom payload, which is stored as carb.dictionary
item.
Recommended usage#
The events subsystem is flexible and there are several recommendations that are intended to help the most frequent use-cases, as well as provide clarifications on specific parts of the events logic.
Deferred callbacks#
As opposed to immediate callback invocation, the recommended way of using events streams is through the deferred callbacks mechanisms, unless using immediate callbacks are absolutely necessary. When an event is pushed into an event stream, it is fairly frequent that the subsequent immediate callback is not a safe place to modify or even read related data outside the event payload. To avoid corruptions, it is recommended to use the deferred callbacks, which will be triggered at some place that the event stream owner deemed safe.
Event types#
Each event contains an event type, which is set upon pushing the event into the stream, and can be specified when a consumer subscribes to an event stream. This can be used to narrow/decrease the number of callback invocations, which is especially important when listening to the event stream from the scripting language.
It is recommended to use string hashes as event types, as this will help avoid managing the event type allocation in case multiple sources can push events into an event stream. In C++, use CARB_EVENTS_TYPE_FROM_STR
which provides a 64-bit FNV-1a hash computed in compile-time, or its run-time counterpart, carb::events::typeFromString
. In Python, carb.events.type_from_string
can be used.
Important event streams design choices: either multiple event streams with fairly limited number of event types served by each, or one single event stream can be created, serving many different event types. The latter approach is more akin to the event bus with many producers and consumers. Event buses are useful when designing a system that is easily extendable.
Transient subscriptions#
In case you want to implement a deferred-action triggered by some event - instead of subscribing to the event on startup and then checking the action queue on each callback trigger, consider doing the transient subscriptions.
This approach involves subscribing to the event stream only after you have a specific instance of action you want to execute in a deferred manner. When the event callback subscription is triggered, you execute the action and immediately unsubscribe, so you don’t introduce an empty callback ticking unconditionally each time the event happens.
The transient subscription can also include a simple counter, so you execute your code only on Nth event, not necessarily on the next one.
Code examples#
Subscribe to Shutdown Events#
# App/Subscribe to Shutdown Events
import carb.events
import omni.kit.app
# Stream where app sends shutdown events
shutdown_stream = omni.kit.app.get_app().get_shutdown_event_stream()
def on_event(e: carb.events.IEvent):
if e.type == omni.kit.app.POST_QUIT_EVENT_TYPE:
print("We are about to shutdown")
sub = shutdown_stream.create_subscription_to_pop(on_event, name="name of the subscriber for debugging", order=0)
Subscribe to Update Events#
# App/Subscribe to Update Events
import carb.events
import omni.kit.app
update_stream = omni.kit.app.get_app().get_update_event_stream()
def on_update(e: carb.events.IEvent):
print(f"Update: {e.payload['dt']}")
sub = update_stream.create_subscription_to_pop(on_update, name="My Subscription Name")
Create custom event#
# App/Create Custom Event
import carb.events
import omni.kit.app
# Event is unique integer id. Create it from string by hashing, using helper function.
# [ext name].[event name] is a recommended naming convention:
MY_CUSTOM_EVENT = carb.events.type_from_string("omni.my.extension.MY_CUSTOM_EVENT")
# App provides common event bus. It is event queue which is popped every update (frame).
bus = omni.kit.app.get_app().get_message_bus_event_stream()
def on_event(e):
print(e.type, e.type == MY_CUSTOM_EVENT, e.payload)
# Subscribe to the bus. Keep subscription objects (sub1, sub2) alive for subscription to work.
# Push to queue is called immediately when pushed
sub1 = bus.create_subscription_to_push_by_type(MY_CUSTOM_EVENT, on_event)
# Pop is called on next update
sub2 = bus.create_subscription_to_pop_by_type(MY_CUSTOM_EVENT, on_event)
# Push event the bus with custom payload
bus.push(MY_CUSTOM_EVENT, payload={"data": 2, "x": "y"})