Using the Public Streaming API

The following guide contains information pertaining to publishing custom Omniverse applications to the public streaming API in order to support hosting custom-built applications for use by Users subscribed to the Omniverse public streaming API. This guide assumes your organization is already registered to the Omniverse public streaming API, allowing your Users to access collections of Omniverse workflows from their web browsers, streaming directly their Nucleus scenes from their storage locations.

Setting up a Sample Extension

To get started with the code samples used for illustrative purposes in this guide, you may wish to create a custom Omniverse extension which will be used for forwarding communications between your local installation and the remote server hosting the streaming sessions.

Should you require additional information regarding building custom Omniverse extensions, consider referencing the development guide describing getting started with Omniverse extensions.

exts/omni.services.streaming.application.example/config/extension.toml
 1[package]
 2authors = ["Name <name@email.com>"]
 3category = "Services"
 4changelog = "docs/CHANGELOG.md"
 5description = "Application streaming integration sample"
 6icon = "data/icon.png"
 7keywords = ["kit", "services", "application", "streaming", "example"]
 8readme  = "docs/README.md"
 9repository = "https://example.com/kit-extensions/kit-streaming-service-sample"
10title = "Application streaming integration sample"
11version = "1.0.0"
12
13[dependencies]
14"omni.services.core" = {}
15"omni.services.client" = {}
16"omni.services.transport.server.http" = {}
17"omni.services.transport.client.http_async" = {}
18
19[[python.module]]
20name = "omni.services.streaming.application.example"
21
22[settings]
23exts."omni.services.transport.server.http".http.enabled = true
24exts."omni.services.transport.server.http".host = "0.0.0.0"
25exts."omni.services.transport.server.http".port = 8011
exts/omni.services.streaming.application.example/models/streaming.py
  1# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
  2#
  3# NVIDIA CORPORATION and its licensors retain all intellectual property
  4# and proprietary rights in and to this software, related documentation
  5# and any modifications thereto.  Any use, reproduction, disclosure or
  6# distribution of this software and related documentation without an express
  7# license agreement from NVIDIA CORPORATION is strictly prohibited.
  8
  9from enum import Enum
 10
 11from fastapi import BaseModel, Field
 12
 13
 14class SortBy(str, Enum):
 15   """Fields by which to sort results by."""
 16   created_time = "created_at"
 17   username = "username"
 18
 19class OrderBy(str, Enum):
 20   """Sorting direction for the field by which to order results by."""
 21   asc = "ASC"
 22   desc = "DESC"
 23
 24class PortType(str, Enum):
 25   """Supported Omniverse application port types."""
 26   tcp = "TCP"
 27   udp = "UDP"
 28
 29class Port(BaseModel):
 30   """Omniverse application port specifications."""
 31   name: str = Field(..., description="Application port name.")
 32   number: int = Field(..., ge=0, le=65535, description="Application port number.")
 33   type: PortType = Field(..., description="Application port type.")
 34
 35class StreamingMode(str, Enum):
 36   """Supported Omniverse application streaming mode."""
 37   secure = "secure"
 38   insecure = "insecure"
 39
 40class StreamRequestSpec(BaseModel):
 41   """Extensible specification that is passed to the Streaming Controller endpoint."""
 42   kit_args: List[str] = Field(
 43      default=[],
 44      title="Omniverse application arguments.",
 45      description="Array of strings to be appended to the Omniverse application argument inputs.",
 46   )
 47   kit_env: Dict[str, str] = Field(
 48      default={},
 49      title="Omniverse application environment variables.",
 50      description="Dictionary containing environment variables for the Omniverse application container.",
 51   )
 52   streaming_mode: StreamingMode = Field(
 53      default=StreamingMode.secure,
 54      title="Omniverse application streaming mode.",
 55      description="Streaming mode for the Omniverse application.",
 56   )
 57   image: str = Field(
 58      ...,
 59      title="Omniverse application Docker image hosted on NGC.",
 60      description="Full path to Omniverse application Docker image hosted on NGC, along with its tag.",
 61   )
 62   user_ports: List[Port] = Field(
 63      default=[],
 64      title="Omniverse application ports.",
 65      description="List of additional container ports to expose on the streaming application container.",
 66   )
 67   user_labels: Dict[str, str] = Field(
 68      default={},
 69      title="Omniverse application pod labels.",
 70      description="Optional list of labels to assign to the Omniverse application pod.",
 71   )
 72   node_labels: Dict[str, str] = Field(
 73      default={},
 74      title="Omniverse application node labels.",
 75      description="Optional labels to set on the node for for scheduling purposes.",
 76   )
 77
 78class CreateStreamRequest(BaseModel):
 79   """Model of a request to create a new streaming instance."""
 80   url: str = Field(
 81      ...,
 82      title="USD Stage URL.",
 83      description="URL of the Nucleus USD Stage to open (e.g: `omniverse://example.com/path/to/stage.usd`)",
 84   )
 85   access_token: str = Field(
 86      ...,
 87      title="Nucleus access token.",
 88      description="Access token that the Omniverse application will use to connect to Nucleus.",
 89   )
 90   username: str = Field(
 91      ...,
 92      title="Username of the authenticated User for the session.",
 93      description="Authenticated username to use for the session.",
 94   )
 95   spec: StreamRequestSpec = Field(
 96      default=StreamRequestSpec(),
 97      title="Application stream specification.",
 98      description="Extensible Omniverse application stream specification.",
 99   )
100
101class ErrorMessage(BaseModel):
102   """Base application error message."""
103   message: str = Field(
104      ...,
105      title="Error message.",
106      description="Detail about the error that occurred.",
107   )
exts/omni.services.streaming.application.example/python_ext.py
 1# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
 2#
 3# NVIDIA CORPORATION and its licensors retain all intellectual property
 4# and proprietary rights in and to this software, related documentation
 5# and any modifications thereto.  Any use, reproduction, disclosure or
 6# distribution of this software and related documentation without an express
 7# license agreement from NVIDIA CORPORATION is strictly prohibited.
 8
 9import omni.ext
10
11from omni.services.core.main import deregister_router, register_router
12
13from .services.streaming import router
14
15
16class StreamingApplicationExampleExtension(omni.ext.IExt):
17   """Streaming application example extension."""
18
19   def on_startup(self, ext_id: str) -> None:
20      register_router(router=router, tags=["examples"])
21
22   def on_shutdown(self) -> None:
23      deregister_router(router=router)

API Code Samples

Healthcheck

Before attempting to communicate with remote endpoints in order to schedule Omniverse application streams, you may wish to first reach the healthcheck API available on the service in order to ensure resources are in a state allowing them to serve incoming requests. Proceeding by first establishing that components are exhibiting standard operational behavior can greatly help reduce potential investigation times in cases where resources may be limited on remote clusters, or if network connectivity issues may be temporarily affecting provisioning due to maintenance windows.

To query the /healthcheck API for the status of the streaming service APIs, navigate to the OpenAPI specification page of a production cluster (e.g. https://store-portal.ovx-prd31-sjc11.proxy.ace.ngc.nvidia.com/backend/api-docs), expanding the /healthcheck endpoint of the Swagger documentation page before clicking the Execute button and confirming that the endpoint responds with the successful application/json Content-Type of:

1{
2   "ok": true
3}

Listing Streaming Sessions

In order to list existing streaming sessions started by Users subscribed to the service, an HTTP GET request can be issued against a service endpoint responsible for returning the list of existing streaming sessions. The most straightforward example illustrating the results of this operation can be implemented using the following code sample:

exts/omni.services.streaming.application.example/services/streaming.py
 1# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
 2#
 3# NVIDIA CORPORATION and its licensors retain all intellectual property
 4# and proprietary rights in and to this software, related documentation
 5# and any modifications thereto.  Any use, reproduction, disclosure or
 6# distribution of this software and related documentation without an express
 7# license agreement from NVIDIA CORPORATION is strictly prohibited.
 8
 9from typing import Any, Dict
10
11from omni.services.client import AsyncClient
12from omni.services.core import routers
13
14router = routers.ServiceAPIRouter()
15
16
17@router.get(
18   path="/list-streams",
19   summary="Return the list of streaming sessions.",
20   description="Returns the list of streaming sessions currently active.",
21)
22async def list_streams() -> Dict[str, Any]:
23   """
24   Return metadata information about application streaming sessions.
25
26   Args:
27      None
28
29   Returns:
30      Dict[str, Any]: Metadata information about application streaming sessions.
31
32   """
33   client = AsyncClient(uri="https://cockpit.devtest.az.cloud.omniverse.nvidia.com")
34   response = await client.session_service.session.get()
35   return response

In the case of a successful response, the remote endpoint will respond with an HTTP 200 and an application/json Content-Type containing details about the streaming session that was created on behalf of the User:

 1{
 2   "results": [
 3      {
 4         "session_record": {
 5            "session_id": "99a13993-77d4-402b-81a8-a563858908d9",
 6            "username": "user@email.com",
 7            "source_address": "0.0.0.0",
 8            "destination_address": "12.34.56.78",
 9            "session_routes": "16249:TCP:NPORT:30136,15426:UDP:MEDIA:31306",
10            "nucleus": "external.devtest.az.cloud.omniverse.nvidia.com",
11            "stream_type": "app",
12            "pod_name": "ovc-v1-kit-99a13993-77d4-402b-81a8-a563858908d9",
13            "service_name": "ovc-v1-kit-99a13993-77d4-402b-81a8-a563858908d9",
14            "configmap_name": "ovc-v1-kit-99a13993-77d4-402b-81a8-a563858908d9",
15            "session_link": "https://devtest.cloud.omniverse.nvidia.com/webclient/index.html?signalingserver=customer.devtest.az.cloud.omniverse.nvidia.com&signalingport=48322&mediaserver=customer.devtest.az.cloud.omniverse.nvidia.com&mediaport=15426&sessionid=99a13993-77d4-402b-81a8-a563858908d9&mic=0&cursor=free&server=&nucleus=external.devtest.az.cloud.omniverse.nvidia.com&resolution=1920x1080&fps=60&autolaunch=true&backendurl=public-api.devtest.az.cloud.omniverse.nvidia.com&accessToken=<Nucleus Access Token>&terminateVerb=DELETE",
16            "created_at": "2023-08-01T12:00:00.000000+00:00",
17            "updated_at": "2023-08-01T12:00:00.000000+00:00"
18         },
19         "status": {
20            "phase": "Running",
21            "node_name": "cbg1-f06-ovx-05",
22            "host_ip": "12.34.56.78",
23            "pod_ip": "100.96.18.25",
24            "start_time": "2023-08-01T12:00:00+00:00"
25         }
26      },
27      // Include additional session metadata, for any available on the service.
28      // [...]
29   ]
30}

For an example using additional filtering and sorting options when issuing queries against the remote service’s session API, the following code sample illustrates how one could formulate a query to list the 10 most recent sessions of a particular User:

 1from .models.streaming import OrderBy, SortBy
 2
 3@router.get(
 4   path="/list-streams",
 5   summary="Return the list of streaming sessions.",
 6   description="Returns the list of streaming sessions currently active.",
 7)
 8async def list_streams() -> Dict[str, Any]:
 9   """
10   Return metadata information about application streaming sessions.
11
12   Args:
13      None
14
15   Returns:
16      Dict[str, Any]: Metadata information about application streaming sessions.
17
18   """
19   client = AsyncClient(uri="https://cockpit.devtest.az.cloud.omniverse.nvidia.com")
20   response = await client.session_service.session.get(
21      # Maximum number of results to display:
22      limit=10,
23      # Offset of the first result to begin displaying:
24      offset=0,
25      # Username of the User whose sessions should be displayed:
26      username="username@example.com",
27      # Field by which to sort results by:
28      sortBy=SortBy.created_time.value,
29      # Sorting direction of the results:
30      orderBy=OrderBy.desc.value,
31   )
32   return response

Creating Streaming Sessions

In order to create new streaming sessions, queries can be issued against the create-stream API, which accepts POST queries containing information such as the access token to use to authorize requests for the given username, along with data such as the URL of the NGC Docker container hosting the Omniverse application to stream:

exts/omni.services.streaming.application.example/services/streaming.py
 1# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
 2#
 3# NVIDIA CORPORATION and its licensors retain all intellectual property
 4# and proprietary rights in and to this software, related documentation
 5# and any modifications thereto.  Any use, reproduction, disclosure or
 6# distribution of this software and related documentation without an express
 7# license agreement from NVIDIA CORPORATION is strictly prohibited.
 8
 9from typing import Any, Dict
10
11from omni.services.client import AsyncClient
12from omni.services.core import routers
13
14from .models.streaming import CreateStreamRequest, StreamingMode, StreamRequestSpec
15
16router = routers.ServiceAPIRouter()
17
18
19@router.post(
20   path="/create-stream",
21   summary="Create a new streaming session.",
22   description="Returns metadata information about a newly-created streaming session.",
23)
24async def create_stream() -> Dict[str, Any]:
25   """
26   Send a request to create a streaming session using the given NGC Docker container on behalf of the User.
27
28   Args:
29      None
30
31   Returns:
32      Dict[str, Any]: Metadata information about the streaming session that was just created.
33
34   """
35   client = AsyncClient(uri="https://cockpit.devtest.az.cloud.omniverse.nvidia.com")
36   create_stream_options = CreateStreamRequest(
37      url="omniverse://store.devtest.az.cloud.omniverse.nvidia.com",
38      # Replace with the Nucleus access token for the given User:
39      access_token="<Nucleus Access Token>",
40      # Replace with the email address identifying the User on whose behalf the stream is initiated:
41      username="user@email.com",
42      # Configure any required information about the streaming session, including Docker container image settings
43      # from NGC, or environment variables to provide for the streaming session:
44      spec=StreamRequestSpec(
45         # Array of strings to be appended to the Omniverse application argument inputs:
46         kit_args=[],
47         # Omniverse application environment variables:
48         kit_env={},
49         # Omniverse application streaming mode:
50         streaming_mode=StreamingMode.secure,
51         # Omniverse application Docker image hosted on NGC:
52         image="nvcr.io/nvidian/omniverse/ov-composer-streaming-webrtc-ovc:2023.1.0-beta.28",
53         # Omniverse application ports:
54         user_ports=[],
55         # Omniverse application pod labels:
56         user_labels={},
57         # Omniverse application node labels:
58         node_labels={},
59      ),
60   )
61   response = await client.session_service.session.post(**create_stream_options.dict())
62   return response

Note

The code sample above provides a link to an Omniverse USD Composer application instance featuring WebRTC streaming capabilities in order to enable remote development workflows. To publish a custom version of an application’s Docker container, host the desired features to your organization’s private registry on NGC.

For additional information about using the NGC private registry, consult the User Guide available online.

In the case of a successful stream creation, the remote endpoint will respond with an HTTP 200 and an application/json Content-Type containing the URL of the stream that was created on behalf of the User:

1{
2   "redirect": "https://devtest.cloud.omniverse.nvidia.com/webclient/index.html?signalingserver=customer.devtest.az.cloud.omniverse.nvidia.com&signalingport=48322&mediaserver=customer.devtest.az.cloud.omniverse.nvidia.com&mediaport=15426&sessionid=99a13993-77d4-402b-81a8-a563858908d9&mic=0&cursor=free&server=&nucleus=external.devtest.az.cloud.omniverse.nvidia.com&resolution=1920x1080&fps=60&autolaunch=true&backendurl=public-api.devtest.az.cloud.omniverse.nvidia.com&accessToken=<Nucleus Access Token>&terminateVerb=DELETE",
3   "session_id": "99a13993-77d4-402b-81a8-a563858908d9",
4   "session_routes": "16249:TCP:NPORT:30136,15426:UDP:MEDIA:31306"
5}

Deleting Streaming Sessions

Once interaction with a streaming session has been completed, Users can proceed to terminate the application instance they have initiated in order to free resources on remote host and increase hardware availability for Users wishing to make use of greater node capacity. To terminate sessions, Users can emit DELETE requests against the streaming API and supplying the unique identifier of the session to terminate:

exts/omni.services.streaming.application.example/services/streaming.py
 1# Copyright (c) 2023, NVIDIA CORPORATION.  All rights reserved.
 2#
 3# NVIDIA CORPORATION and its licensors retain all intellectual property
 4# and proprietary rights in and to this software, related documentation
 5# and any modifications thereto.  Any use, reproduction, disclosure or
 6# distribution of this software and related documentation without an express
 7# license agreement from NVIDIA CORPORATION is strictly prohibited.
 8
 9from typing import Any, Dict
10
11from omni.services.client import AsyncClient
12from omni.services.core import routers
13
14from .models.streaming import ErrorMessage
15
16router = routers.ServiceAPIRouter()
17
18
19@router.delete(
20   path="/{session_id}",
21   summary="Terminate a session.",
22   description="Terminate the session with the given unique identifier.",
23   responses={
24      404: {"model": ErrorMessage},
25   },
26)
27async def delete_stream(session_id: str,) -> Dict[str, Any]:
28   """
29   Send a request to terminate the session with the given unique identifier.
30
31   Args:
32      session_id (str): Unique identifier of the session to terminate.
33
34   Returns:
35      Dict[str, Any]: Metadata information about the session that was just terminated.
36
37   """
38   client = AsyncClient(uri="https://cockpit.devtest.az.cloud.omniverse.nvidia.com")
39   response = await getattr(client.session_service.session, session_id).delete()
40   return response

In the case of a successful request, the remote endpoint will respond with an HTTP 200 and the content "OK".