Parse messages as json and push to message bus#

Provides json messaging functionality between Kit and the web client using generic messaging such as omni.kit.gfn or omni.kit.livestream.webrtc.

It takes a message bus event of event_type omni.kit.livestream.receive_message and payload {"event_type": "SetColor", "payload": { ... }} and parses the payload as json and pushes a new event on the bus with event_type SetColor and the payload unpacked to the event payload dictionary.

Base extension API#

Register and unregister event types to monitor for outgoing messages to the client:

import omni.kit.livestream.messaging as messaging

messaging.register_event_type_to_send("SetColor")
messaging.unregister_event_type_to_send("SetColor")
messaging.unregister_observe_type("SetColor")
messaging.clear_event_types_to_send()  # unregister all at once

@messaging.observe#

Simple decorator that registers a handler for an inbound event. The handler receives the raw carb.eventdispatcher.Event object.

@messaging.observe("SetColor")
def on_set_color(event):
    color = dict(event.payload)

@messaging.observe_and_dispatch#

Decorator that observes an event, unpacks the payload as keyword arguments, and automatically sends the handler’s return value as a response event. Also registers the response event for outbound sending.

@messaging.observe_and_dispatch("my:request", response_name="my:response")
def handler(event, **kwargs):
    return process(kwargs)

The response wire format (flat envelope, as received by the web client):

{ "event_type": "my:response", "id": "<uuid>", "payload": { <return value fields> } }

For dict returns the handler’s keys are spread directly into payload. For non-dict returns the value is placed under the "message" key: { "message": <value> }.

Large messaging (contained subpackage)#

The extension includes a large-messaging subpackage at omni.kit.livestream.messaging.large. It is loaded automatically on startup — multi-part splitting and reassembly are active for all messages through the base extension with no extra imports or configuration required.

Request/response correlation#

The client sends a unique id (e.g. a UUID) with each request. observe_and_dispatch echoes that same id in every response so the client can match responses to requests.

import omni.kit.livestream.messaging as messaging

@messaging.observe_and_dispatch("my:request", response_name="my:response")
def handler(event, **kwargs):
    return process(kwargs)

The client sends (flat envelope):

{ "event_type": "my:request", "id": "<uuid>", "payload": { <request fields> } }

The response always echoes the same id (wire format received by the client):

{ "event_type": "my:response", "id": "<uuid>", "payload": { <response fields> } }

For dict returns the handler’s keys are spread directly into payload. For non-dict returns the value is placed under the "message" key: { "message": <value> }.

Wire envelope format#

Every message — small or large, inbound or outbound — uses the same flat envelope. id, part, and numParts are top-level fields alongside event_type and payload.

Single-part (no part/numParts fields):

{ "event_type": "<name>", "id": "<uuid>", "payload": { <application payload dict> } }

Multi-part (each chunk is a separate message):

{ "event_type": "<name>", "id": "<uuid>", "part": <n>, "numParts": <total>, "payload": "<JSON-string chunk>" }

Field reference:

  • event_type — the application-level event name.

  • id — correlation UUID shared across all parts of one logical message.

  • part — 1-based chunk index (multi-part only).

  • numParts — total number of chunks for this logical message (multi-part only).

  • payload — for single-part: the application payload dict. For multi-part: one chunk of the JSON-serialised application payload string.

Multi-part receive (large requests from client)#

The base extension automatically buffers incoming messages that arrive in parts. If the client sends a payload split across multiple messages (same id, with part and numParts at the envelope top level), the base extension buffers them and dispatches the fully assembled payload once all parts arrive — any @messaging.observe or @observe_and_dispatch handler receives the complete payload without any extra work.

Client sends (one message per part, flat envelope):

{ "event_type": "my:request", "id": "<uuid>", "part": 1, "numParts": 3, "payload": "..." }
{ "event_type": "my:request", "id": "<uuid>", "part": 2, "numParts": 3, "payload": "..." }
{ "event_type": "my:request", "id": "<uuid>", "part": 3, "numParts": 3, "payload": "..." }

Handler receives one call with the assembled payload:

def handler(event, message=""):
    # message is the full reassembled string
    return process(message)

Multi-part send (outgoing messages to client)#

All outgoing messages carry a top-level id. Large payloads are additionally split into chunks:

{ "event_type": ..., "id": "<uuid>", "part": <n>, "numParts": <total>, "payload": "<JSON-string chunk>" }
  • Small payloads (serialized JSON fits within MESSAGE_PART_SIZE_BYTES, ≈16 KiB) are sent as a single message with no part/numParts fields and payload as the application dict directly.

  • Large payloads are split into MESSAGE_PART_SIZE_BYTES (≈16 KiB) chunks, each sent as a separate message with an incrementing part counter and payload as a JSON-string chunk.

LARGE_MESSAGE_THRESHOLD_BYTES (64 KiB) is exported as an informational constant indicating the payload size above which the client should expect multiple parts in practice.

No changes to existing send-side code are required. The client is responsible for detecting part/numParts fields and buffering chunks until all parts have arrived before processing.

Constants#

Name

Value

Description

ID_KEY

"id"

Correlation id field at the envelope top level

PART_KEY

"part"

Part number within the message (1-based), at the envelope top level

NUM_PARTS_KEY

"numParts"

Total number of parts, at the envelope top level

PAYLOAD_KEY

"payload"

Application payload — a dict for single-part, a JSON-string chunk for multi-part

MESSAGE_KEY

"message"

Content-level key used to wrap bare-string handler return values and non-JSON assembled payloads

LARGE_MESSAGE_THRESHOLD_BYTES

64 KiB

Informational threshold; payloads above this size will result in multiple parts

MESSAGE_PART_SIZE_BYTES

≈16 KiB

Maximum data size per chunk (includes headroom for the JSON envelope)