Parse messages as json and push to message bus#
Provides json messaging functionality between Kit and the web client using generic messaging such as omni.kit.gfn or omni.kit.livestream.webrtc.
It takes a message bus event of event_type omni.kit.livestream.receive_message and
payload {"event_type": "SetColor", "payload": { ... }} and parses the payload as json
and pushes a new event on the bus with event_type SetColor and the payload unpacked
to the event payload dictionary.
Base extension API#
Register and unregister event types to monitor for outgoing messages to the client:
import omni.kit.livestream.messaging as messaging
messaging.register_event_type_to_send("SetColor")
messaging.unregister_event_type_to_send("SetColor")
messaging.unregister_observe_type("SetColor")
messaging.clear_event_types_to_send() # unregister all at once
@messaging.observe#
Simple decorator that registers a handler for an inbound event. The handler receives the raw
carb.eventdispatcher.Event object.
@messaging.observe("SetColor")
def on_set_color(event):
color = dict(event.payload)
@messaging.observe_and_dispatch#
Decorator that observes an event, unpacks the payload as keyword arguments, and automatically sends the handler’s return value as a response event. Also registers the response event for outbound sending.
@messaging.observe_and_dispatch("my:request", response_name="my:response")
def handler(event, **kwargs):
return process(kwargs)
The response wire format (flat envelope, as received by the web client):
{ "event_type": "my:response", "id": "<uuid>", "payload": { <return value fields> } }
For dict returns the handler’s keys are spread directly into payload. For non-dict returns
the value is placed under the "message" key: { "message": <value> }.
Large messaging (contained subpackage)#
The extension includes a large-messaging subpackage at omni.kit.livestream.messaging.large. It is loaded automatically on startup — multi-part splitting and reassembly are active for all messages through the base extension with no extra imports or configuration required.
Request/response correlation#
The client sends a unique id (e.g. a UUID) with each request. observe_and_dispatch echoes that same id in every response so the client can match responses to requests.
import omni.kit.livestream.messaging as messaging
@messaging.observe_and_dispatch("my:request", response_name="my:response")
def handler(event, **kwargs):
return process(kwargs)
The client sends (flat envelope):
{ "event_type": "my:request", "id": "<uuid>", "payload": { <request fields> } }
The response always echoes the same id (wire format received by the client):
{ "event_type": "my:response", "id": "<uuid>", "payload": { <response fields> } }
For dict returns the handler’s keys are spread directly into payload. For non-dict returns
the value is placed under the "message" key: { "message": <value> }.
Wire envelope format#
Every message — small or large, inbound or outbound — uses the same flat envelope.
id, part, and numParts are top-level fields alongside event_type and payload.
Single-part (no part/numParts fields):
{ "event_type": "<name>", "id": "<uuid>", "payload": { <application payload dict> } }
Multi-part (each chunk is a separate message):
{ "event_type": "<name>", "id": "<uuid>", "part": <n>, "numParts": <total>, "payload": "<JSON-string chunk>" }
Field reference:
event_type— the application-level event name.id— correlation UUID shared across all parts of one logical message.part— 1-based chunk index (multi-part only).numParts— total number of chunks for this logical message (multi-part only).payload— for single-part: the application payload dict. For multi-part: one chunk of the JSON-serialised application payload string.
Multi-part receive (large requests from client)#
The base extension automatically buffers incoming messages that arrive in parts. If the client sends a payload split across multiple messages (same id, with part and numParts at the envelope top level), the base extension buffers them and dispatches the fully assembled payload once all parts arrive — any @messaging.observe or @observe_and_dispatch handler receives the complete payload without any extra work.
Client sends (one message per part, flat envelope):
{ "event_type": "my:request", "id": "<uuid>", "part": 1, "numParts": 3, "payload": "..." }
{ "event_type": "my:request", "id": "<uuid>", "part": 2, "numParts": 3, "payload": "..." }
{ "event_type": "my:request", "id": "<uuid>", "part": 3, "numParts": 3, "payload": "..." }
Handler receives one call with the assembled payload:
def handler(event, message=""):
# message is the full reassembled string
return process(message)
Multi-part send (outgoing messages to client)#
All outgoing messages carry a top-level id. Large payloads are additionally split into chunks:
{ "event_type": ..., "id": "<uuid>", "part": <n>, "numParts": <total>, "payload": "<JSON-string chunk>" }
Small payloads (serialized JSON fits within
MESSAGE_PART_SIZE_BYTES, ≈16 KiB) are sent as a single message with nopart/numPartsfields andpayloadas the application dict directly.Large payloads are split into
MESSAGE_PART_SIZE_BYTES(≈16 KiB) chunks, each sent as a separate message with an incrementingpartcounter andpayloadas a JSON-string chunk.
LARGE_MESSAGE_THRESHOLD_BYTES (64 KiB) is exported as an informational constant indicating the
payload size above which the client should expect multiple parts in practice.
No changes to existing send-side code are required. The client is responsible for detecting
part/numParts fields and buffering chunks until all parts have arrived before processing.
Constants#
Name |
Value |
Description |
|---|---|---|
|
|
Correlation id field at the envelope top level |
|
|
Part number within the message (1-based), at the envelope top level |
|
|
Total number of parts, at the envelope top level |
|
|
Application payload — a dict for single-part, a JSON-string chunk for multi-part |
|
|
Content-level key used to wrap bare-string handler return values and non-JSON assembled payloads |
|
64 KiB |
Informational threshold; payloads above this size will result in multiple parts |
|
≈16 KiB |
Maximum data size per chunk (includes headroom for the JSON envelope) |