Event streams#

Event streams are covered by the carb.events plugin and carb::events::IEvents Carbonite interface. The goal of event streams is to provide a means to move data around using the generalized interface in a thread-safe manner, and also act as a way to synchronize the logic.

API/design overview#

The singleton IEvents interface is used to create IEventStream objects. Whenever an event is being pushed into an event stream, the immediate callback is triggered, and the event stream stores the event in the internal event queue. Then, events can be popped from the queue one by one, or all at once (also called pump), and at this point deferred callbacks are triggered. The event stream owner typically controls where this pumping is happening.

Event consumers can subscribe to both immediate (push) and deferred (pop) callbacks. Subscription functions create ISubscription class, which usually unsubscribes automatically upon destruction. Callbacks are wrapped into IEventListener class that allows for context binding to the subscription, and upon triggering, the callback is triggered with the IEvent passed as parameter, this parameter describes the event which triggered the callback. IEvent contains event type, sender id and custom payload, which is stored as carb.dictionary item.

Code examples#

Subscribe to Shutdown Events#

# App/Subscribe to Shutdown Events
import carb.events
import omni.kit.app

# Stream where app sends shutdown events
shutdown_stream = omni.kit.app.get_app().get_shutdown_event_stream()

def on_event(e: carb.events.IEvent):
    if e.type == omni.kit.app.POST_QUIT_EVENT_TYPE:
        print("We are about to shutdown")

sub = shutdown_stream.create_subscription_to_pop(on_event, name="name of the subscriber for debugging", order=0)

Subscribe to Update Events#

# App/Subscribe to Update Events
import carb.events
import omni.kit.app


update_stream = omni.kit.app.get_app().get_update_event_stream()

def on_update(e: carb.events.IEvent):
	print(f"Update: {e.payload['dt']}")

sub = update_stream.create_subscription_to_pop(on_update, name="My Subscription Name")

Create custom event#

# App/Create Custom Event
import carb.events
import omni.kit.app

# Event is unique integer id. Create it from string by hashing, using helper function.
# [ext name].[event name] is a recommended naming convention:
MY_CUSTOM_EVENT = carb.events.type_from_string("omni.my.extension.MY_CUSTOM_EVENT")

# App provides common event bus. It is event queue which is popped every update (frame).
bus = omni.kit.app.get_app().get_message_bus_event_stream()

def on_event(e):
    print(e.type, e.type == MY_CUSTOM_EVENT, e.payload)

# Subscribe to the bus. Keep subscription objects (sub1, sub2) alive for subscription to work.
# Push to queue is called immediately when pushed
sub1 = bus.create_subscription_to_push_by_type(MY_CUSTOM_EVENT, on_event)
# Pop is called on next update
sub2 = bus.create_subscription_to_pop_by_type(MY_CUSTOM_EVENT, on_event)

# Push event the bus with custom payload
bus.push(MY_CUSTOM_EVENT, payload={"data": 2, "x": "y"})