Customizing Endpoint Resolution#

The sample APIs provide an option for developers to customize how the streaming endpoints are resolved.

Latest Container Release: 1.8.0

By default the resolve class is Generic, the sample also includes an AWS Targetgroup specific resolver class.

Both can be found within the container at the following path: /usr/local/lib/python3.11/site-packages/nv/svc/streaming/_csp.py

The class and arguments to load at startup is managed by the following settings:

  • backend.csp.cls

  • backend.csp.args

These can be overwritten via the following settings in the helm values file:

  • streaming.serviceConfig.backend_csp_cls

  • streaming.serviceConfig.backend_csp_args

Below is the Generic class defined in _csp.py. It inherits from the _CSP class. The class always get passed a configured Kubernetes client that uses the service account defined in the helm chart of the streaming session manager. An example of it being used can be seen in the _fetch_resources function inside the _CSP class.

class Generic(_CSP):
    """Generic and default CSP manager."""

    def __init__(
        self,
        k8s_client: KubernetesClient,
        enable_wss: bool = False,
        hostname_annotation_key: str = "external-dns.alpha.kubernetes.io/hostname",
        service_annotations_location: str = "streamingKit.service.annotations",
        base_domain: str = ""
    ) -> Any:
        """Initialize."""
        super().__init__(k8s_client)
        self._enable_wss = enable_wss
        self._service_annotations_location = service_annotations_location
        self._hostname_annotation_key = hostname_annotation_key
        self._base_domain = base_domain

    async def on_create(self, profile_data: Dict, settings: Dict) -> Dict:
        """Process CSP customisations on stream creation."""
        if not self._enable_wss:
            return {}

        values = profile_data.get("settings", {}).get("values", {})
        keys = self._service_annotations_location.split(".")
        data = values
        for key in keys:
            data = data.get(key, {})

        prefix = self._generate_random_dns_prefix()
        data[self._hostname_annotation_key] = f"{prefix}.{self._base_domain}"
        settings[self._service_annotations_location] = data

        logging.debug(f"Generated settings {settings}")

        return settings

    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
        """Resolve the endpoint to connect to for a given session."""
        services = await self._fetch_resources("service", selectors={"sessionId": session_id})
        routes, status = await self._extract_routes(services)
        return routes, status
121class Generic(_CSP):
122    """Generic and default CSP manager."""
123
124    def __init__(
125        self,
126        k8s_client: KubernetesClient,
127        enable_wss: bool = False,
128        hostname_annotation_key: str = "external-dns.alpha.kubernetes.io/hostname",
129        service_annotations_location: str = "streamingKit.service.annotations",
130        base_domain: str = ""
131    ) -> Any:
132        """Initialize."""
133        super().__init__(k8s_client)
134        self._enable_wss = enable_wss
135        self._service_annotations_location = service_annotations_location
136        self._hostname_annotation_key = hostname_annotation_key
137        self._base_domain = base_domain
138
139    async def on_create(self, profile_data: Dict, settings: Dict) -> Dict:
140        """Process CSP customisations on stream creation."""
141        if not self._enable_wss:
142            return {}
143
144        values = profile_data.get("settings", {}).get("values", {})
145        keys = self._service_annotations_location.split(".")
146        data = values
147        for key in keys:
148            data = data.get(key, {})
149
150        prefix = self._generate_random_dns_prefix()
151        data[self._hostname_annotation_key] = f"{prefix}.{self._base_domain}"
152        settings[self._service_annotations_location] = data
153
154        logging.debug(f"Generated settings {settings}")
155
156        return settings
157
158    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
159        """Resolve the endpoint to connect to for a given session."""
160        services = await self._fetch_resources("service", selectors={"sessionId": session_id})
161        routes, status = await self._extract_routes(services)
162        return routes, status
163
164    def _extract_ports(self, service_spec: Dict) -> Tuple[Dict, bool]:
165        """Extract the port information from the service specification.
166
167        Args:
168            service_spec (Dict): The service specification dictionary.
169
170        Returns:
171            Dict: A dictionary containing the route information.
172        """
173        ports = service_spec.spec.ports
174
175        routes = []
176        for port in ports:
177            routes.append(
178                {
179                    "source_port": port.port,
180                    "description": port.name,
181                    "protocol": port.protocol,
182                    "destination_port": port.node_port
183                }
184            )
185
186        status = True if routes else False
187
188        logging.debug(f"Extracted port mappings {routes}, readiness status {status}")
189        return {"routes": routes}, status
190
191    async def _extract_routes(self, services):
192        routes = {}
193        statuses = []
194
195        for service in services:
196            ports, port_ready = self._extract_ports(service)
197
198            entries = []
199            lb_ready = False
200            if self._enable_wss:
201                hostname, lb_ready = await self._extract_hostname(service)
202                entries.append(hostname)
203            else:
204                ips, lb_ready = await self._extract_lb_ips(service)
205                entries.extend(ips)
206
207            statuses.append(all([port_ready, lb_ready]))
208            if not lb_ready:
209                continue
210
211            for entry in entries:
212                routes[entry] = ports
213
214        status = all(statuses) if statuses else False
215        logging.debug(f"Extracted routes {routes}, readiness status {status}")
216        return routes, status
217
218    async def _extract_lb_ips(self, service) -> Tuple[List, bool]:
219        ips = []
220        hostname = None
221
222        ingress = service.status.load_balancer.ingress
223        if not ingress:
224            return ips, False
225
226        for entry in ingress:
227            if entry.ip:
228                ips.append(entry.ip)
229            hostname = entry.hostname
230
231        if not ips and hostname:
232            logging.debug("No IPs were found attached to the service, trying to resolve hostname")
233            ips = await self._resolve_hostname(hostname)
234
235        status = True if ips else False
236
237        logging.debug(f"Extracted IPs {ips}, readiness status {status}")
238        return ips, status
239
240    async def _extract_hostname(self, service) -> Tuple[str, bool]:
241        annotations = service.metadata.annotations
242        hostname = None
243
244        try:
245            hostname = annotations[self._hostname_annotation_key]
246        except KeyError:
247            logging.error(f"Hostname field `{self._hostname_annotation_key}` not found in annotations")
248            raise HostnameNotFoundError("Unable to find hostname")
249
250        ready = False
251        try:
252            ips = await self._resolve_hostname(hostname)
253            if ips:
254                ready = True
255        except aiodns.error.DNSError as exc:
256            logging.warning(f"Unable to resolve {hostname}: {exc}")
257
258        return hostname, ready
class _CSP(object):
    """Custom CSP behaviour base class."""

    def __init__(self, k8s_client: KubernetesClient) -> None:
        """Initialize."""
        self._k8s_client = k8s_client
        self._default_namespace = "omni-streaming"

    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
        """Process CSP customisations on stream creation."""
        return {}

    async def on_delete(self, data: dict) -> None:
        """Proccess CSP customisations on stream termination."""
        pass

    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
        """Resolve the endpoint to connect to for a given session."""
        return {}, False

    async def _fetch_resources(self, resource_type, selectors=None, args: Dict = None):
        api_class = self._get_k8s_api_class(resource_type)

        args = args or {}
        async with self._k8s_client as api_client:
            api_instance = api_class(api_client.api_http)
            func_name = args.pop('func_name', f"list_namespaced_{resource_type}")
            func = getattr(api_instance, func_name)

            if selectors:
                args['label_selector'] = ",".join(f"{k}={v}" for k, v in selectors.items())

            res = await func(
                namespace=self._default_namespace,
                **args
            )
            return res['items'] if isinstance(res, dict) else res.items
 40class _CSP(object):
 41    """Custom CSP behaviour base class."""
 42
 43    def __init__(self, k8s_client: KubernetesClient) -> None:
 44        """Initialize."""
 45        self._k8s_client = k8s_client
 46        self._default_namespace = "omni-streaming"
 47
 48    def _generate_random_dns_prefix(self, length=6):
 49        letters = string.ascii_lowercase
 50        return ''.join(random.choice(letters) for _ in range(length))
 51
 52    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
 53        """Process CSP customisations on stream creation."""
 54        return {}
 55
 56    async def on_delete(self, data: dict) -> None:
 57        """Proccess CSP customisations on stream termination."""
 58        pass
 59
 60    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
 61        """Resolve the endpoint to connect to for a given session."""
 62        return {}, False
 63
 64    async def _resolve_hostname(self, hostname):
 65        ips = []
 66
 67        resolver = aiodns.DNSResolver()
 68        try:
 69            ipv4_records = await resolver.query(hostname, 'A')
 70        except aiodns.error.DNSError as exc:
 71            logging.warning(f"Failed to resolve DNS for {hostname}. The domain might not have propagated yet if this is a new stream. {exc}")
 72            return []
 73
 74        ips.extend([record.host for record in ipv4_records])
 75        return ips
 76
 77    async def _fetch_resources(self, resource_type, selectors=None, args: Dict = None):
 78        api_class = self._get_k8s_api_class(resource_type)
 79
 80        args = args or {}
 81        async with self._k8s_client as api_client:
 82            api_instance = api_class(api_client.api_http)
 83            func_name = args.pop('func_name', f"list_namespaced_{resource_type}")
 84            func = getattr(api_instance, func_name)
 85
 86            if selectors:
 87                args['label_selector'] = ",".join(f"{k}={v}" for k, v in selectors.items())
 88
 89            res = await func(
 90                namespace=self._default_namespace,
 91                **args
 92            )
 93            return res['items'] if isinstance(res, dict) else res.items
 94
 95    def _get_k8s_api_class(self, resource_type):
 96        resource_to_api_class = {
 97            'pod': CoreV1Api,
 98            'service': CoreV1Api,
 99            'deployment': AppsV1Api,
100            'targetgroupbinding': CustomObjectsApi,
101
102        }
103        return resource_to_api_class.get(resource_type, CoreV1Api)
104
105    async def _tcp_port_ready(self, host: str, port: int) -> bool:
106        try:
107            logging.debug(f"Testing stream readiness on {host}:{port}")
108            reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=2)
109            writer.close()
110            await writer.wait_closed()
111            logging.debug(f"Stream connection on {host}:{port} successful")
112            return True
113        except asyncio.TimeoutError:
114            logging.info(f"Failed to connect to {host}:{port}. Connection not ready.")
115        except Exception as exc:
116            logging.error(f"Failed to connect to {host}:{port}: {exc}")
117
118        return False
class AWS(_CSP):
    """AWS customisations."""

    def __init__(
        self,
        k8s_client: KubernetesClient,
        nlb_mgmt_svc: str = "",
        enable_wss: bool = False
    ) -> Any:
        """Initialize AWS class."""
        super().__init__(k8s_client=k8s_client)
        self._nlb_mgmt_svc_url = nlb_mgmt_svc

        self._port_locations = {
            "media": "streamingKit.service.mediaPort",
            "signaling": "streamingKit.service.signalingPort"
        }

        self._targetgroup_arn_locations = {
            "media": "streamingKit.aws.targetgroups.media",
            "signaling": "streamingKit.aws.targetgroups.signaling",
        }

        self._listeners_arn_locations = {
            "media": "streamingKit.aws.listeners.media",
            "signaling": "streamingKit.aws.listeners.signaling",
        }

        self._nlb_location = "streamingKit.aws.nlb"
        self._alias_location = "streamingKit.aws.alias"
        self._enable_wss = enable_wss

    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
        """Process AWS customisations on stream creation."""
        values = profile_data["settings"]["values"]

        ports = {}
        for port_name, location in self._port_locations.items():
            ports[port_name] = self._lookup_nested_dict(values, location)

        allocations = []
        default_protocol = "TLS" if self._enable_wss else "TCP"

        for name in ports.keys():
            allocations.append(
                {
                    "name": name,
                    "protocol": "UDP" if name == "media" else default_protocol
                }
            )

        url = f"{self._nlb_mgmt_svc_url}/allocation"
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json={"allocations": allocations}) as resp:
                if resp.status not in [200]:
                    detail = await resp.text()
                    error_msg = f"Failed request to {url}: {resp.status}, {detail}"
                    logging.error(error_msg)
                    raise _APIError(status_code=resp.status, details=error_msg)

                arns = await resp.json()

        settings[self._nlb_location] = arns["loadbalancer"]["dnsName"]
        settings[self._alias_location] = arns["loadbalancer"].get("alias", "")

        for key, location in self._targetgroup_arn_locations.items():
            listener_arn = arns['allocations'][key]['listenerArn']
            listener_port = arns['allocations'][key]['listenerPort']
            listener_protocol = arns['allocations'][key]['listenerProtocol']
            settings[location] = arns["allocations"][key]["targetGroupArn"]
            settings[self._listeners_arn_locations[key]] = f"{listener_arn}@{listener_port}@{listener_protocol}@{key}"

        logging.debug(f"Generated settings {settings}")
        return settings

    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
        """Resolve the endpoint to connect to for a given session."""

        args = {
            'func_name': 'list_namespaced_custom_object',
            'group': 'elbv2.k8s.aws',
            'version': 'v1beta1',
            'plural': 'targetgroupbindings',
        }

        tgbs = await self._fetch_resources(
            "targetgroupbinding",
            selectors={"sessionId": session_id},
            args=args
        )

        routes, status = await self._extract_routes(tgbs)

        return routes, status
261class AWS(_CSP):
262    """AWS customisations."""
263
264    def __init__(
265        self,
266        k8s_client: KubernetesClient,
267        nlb_mgmt_svc: str = "",
268        enable_wss: bool = False
269    ) -> Any:
270        """Initialize AWS class."""
271        super().__init__(k8s_client=k8s_client)
272        self._nlb_mgmt_svc_url = nlb_mgmt_svc
273
274        self._port_locations = {
275            "media": "streamingKit.service.mediaPort",
276            "signaling": "streamingKit.service.signalingPort"
277        }
278
279        self._targetgroup_arn_locations = {
280            "media": "streamingKit.aws.targetgroups.media",
281            "signaling": "streamingKit.aws.targetgroups.signaling",
282        }
283
284        self._listeners_arn_locations = {
285            "media": "streamingKit.aws.listeners.media",
286            "signaling": "streamingKit.aws.listeners.signaling",
287        }
288
289        self._nlb_location = "streamingKit.aws.nlb"
290        self._alias_location = "streamingKit.aws.alias"
291        self._enable_wss = enable_wss
292
293    def _lookup_nested_dict(self, nested_dict, key_string):
294        keys = key_string.split('.')
295        value = nested_dict
296        for key in keys:
297            value = value[key]
298        return value
299
300    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
301        """Process AWS customisations on stream creation."""
302        values = profile_data["settings"]["values"]
303
304        ports = {}
305        for port_name, location in self._port_locations.items():
306            ports[port_name] = self._lookup_nested_dict(values, location)
307
308        allocations = []
309        default_protocol = "TLS" if self._enable_wss else "TCP"
310
311        for name in ports.keys():
312            allocations.append(
313                {
314                    "name": name,
315                    "protocol": "UDP" if name == "media" else default_protocol
316                }
317            )
318
319        url = f"{self._nlb_mgmt_svc_url}/allocation"
320        async with aiohttp.ClientSession() as session:
321            async with session.post(url, json={"allocations": allocations}) as resp:
322                if resp.status not in [200]:
323                    detail = await resp.text()
324                    error_msg = f"Failed request to {url}: {resp.status}, {detail}"
325                    logging.error(error_msg)
326                    raise _APIError(status_code=resp.status, details=error_msg)
327
328                arns = await resp.json()
329
330        settings[self._nlb_location] = arns["loadbalancer"]["dnsName"]
331        settings[self._alias_location] = arns["loadbalancer"].get("alias", "")
332
333        for key, location in self._targetgroup_arn_locations.items():
334            listener_arn = arns['allocations'][key]['listenerArn']
335            listener_port = arns['allocations'][key]['listenerPort']
336            listener_protocol = arns['allocations'][key]['listenerProtocol']
337            settings[location] = arns["allocations"][key]["targetGroupArn"]
338            settings[self._listeners_arn_locations[key]] = f"{listener_arn}@{listener_port}@{listener_protocol}@{key}"
339
340        logging.debug(f"Generated settings {settings}")
341        return settings
342
343    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
344        """Resolve the endpoint to connect to for a given session."""
345
346        args = {
347            'func_name': 'list_namespaced_custom_object',
348            'group': 'elbv2.k8s.aws',
349            'version': 'v1beta1',
350            'plural': 'targetgroupbindings',
351        }
352
353        tgbs = await self._fetch_resources(
354            "targetgroupbinding",
355            selectors={"sessionId": session_id},
356            args=args
357        )
358
359        routes, status = await self._extract_routes(tgbs)
360
361        return routes, status
362
363    async def _extract_routes(self, tgbs: List) -> Tuple[Dict, bool] :
364        routes = []
365        statuses = []
366
367        hostnames = []
368
369        for tgb in tgbs:
370            annotations = tgb["metadata"]["annotations"]
371            listener = annotations.get('nvidia.com/omniverse.listener', '')
372            nlb_hostname = annotations.get('nvidia.com/omniverse.nlb', '')
373            alias = annotations.get('nvidia.com/omniverse.alias', '')
374
375            hostname = alias if self._enable_wss else nlb_hostname
376
377            if not listener or not hostname:
378                statuses.append(False)
379                logging.error(f"Invalid targetgroupbinding was found. No listener annotation was found {tgb}")
380                continue
381
382            listener_arn, port, protocol, name = listener.split("@")
383            hostnames.append(hostname)
384            routes.append({
385                "source_port": int(port),
386                "description": name,
387                "protocol": protocol,
388                "destination_port": -1
389            })
390            status = await self._tcp_port_ready(hostname, port) if protocol.lower() == "tcp" else True
391            statuses.append(status)
392
393        status = all(statuses) if statuses else False
394
395        assembled_routes = {}
396
397        ips = []
398        if not self._enable_wss:
399            for hostname in hostnames:
400                ips.extend(await self._resolve_hostname(hostname))
401            hostnames = ips
402
403        for hostname in hostnames:
404            assembled_routes[hostname] = {"routes": routes}
405
406        logging.debug(f"Extracted routes {assembled_routes}, readiness status {status}")
407        return assembled_routes, status

_csp.py

Creating a custom CSP resolver#

Because the class and arguments to load can be done dynamically it is possible for a developer to provide their own class and arguments as long as the python file containing the class is within the container and within a python package (ie: an __init__.py file is present).

There are three functions that allow custom behavior:

  • on_create

    • Allows customizing the chart values that are passed to the rmcp service before the session gets instantiated.

    • Takes as arguments the chart values that come from resolving the profile, this gives access to the resolved values and settings, the overrides specified within a version.

    • It returns an updated settings dictionary that are resolved to update the chart values.

  • on_delete

    • Allows additional actions to be taken when a session is terminated

    • Takes as an argument a dictionary which currently only contains the session_id

    • Returns nothing.

  • resolve_endpoints

    • Determines the routes passed back to the web client, and if the session is ready to connect to

    • Takes as an argument the session_id.

    • Returns a tuple of a dictionary with routes and a boolean indicating if the session is ready.

      • If the boolean is false, a 202 will be returned to the client.

Example custom endpoint resolver#

Here is an example of creating a custom endpoint resolver and deploying it to an Omniverse Kit App Streaming instance.

Example python class that extracts a streaming session’s POD IP and ports#
 1from typing import Tuple, Dict, List
 2
 3from nv.svc.streaming._csp import Generic
 4
 5class NodePortResolver(Generic):
 6
 7    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
 8        """Resolve the endpoint to connect to for a given session."""
 9        services = await self._fetch_resources("service", selectors={"sessionId": session_id})
10        pods = await self._fetch_resources("pod", selectors={"sessionId": session_id})
11        routes, status = await self._extract_routes(services, pods)
12        return routes, status
13
14    def _extract_ports(self, service_spec: Dict) -> Tuple[Dict, bool]:
15        """Extract the port information from the service specification.
16
17        Args:
18            service_spec (Dict): The service specification dictionary.
19
20        Returns:
21            Dict: A dictionary containing the route information.
22        """
23        ports = service_spec.spec.ports
24
25        routes = []
26        for port in ports:
27            routes.append(
28                {
29                    "source_port": port.node_port,
30                    "description": port.name,
31                    "protocol": port.protocol,
32                    "destination_port": 0 # this field is no longer used.
33                }
34            )
35
36        status = True if routes else False
37
38        return {"routes": routes}, status
39
40    async def _extract_routes(self, services, pods):
41        routes = {}
42        statuses = []
43
44        for service in services:
45            ports, port_ready = self._extract_ports(service)
46            ips, ip_ready = await self._extract_pod_node_ips(pods)
47            statuses.append(port_ready and ip_ready)
48
49            if not ip_ready:
50                continue
51
52            for ip in ips:
53                routes[ip] = ports
54
55        status = all(statuses) if statuses else False
56        return routes, status
57
58    async def _extract_pod_node_ips(self, pods) -> Tuple[List[str], bool]:
59        """Extract the host IPs of the nodes hosting the pods of the stream."""
60        node_ips = set()
61        for pod in pods.items:  # .items to iterate through pod list
62            if pod.status.host_ip:
63                node_ips.add(pod.status.host_ip)
64
65        return list(node_ips), bool(node_ips)
  1. node_port.py

  2. Create a dockerfile that copies node_port.py into the streaming session manager container:

Note

The code snippets below include a variable for $APP_VERSION. This variable can be edited manually using the latest release version number, or set using an environment variable (e.g., export APP_VERSION=). The latest container versions are listed at the top of this page for quick reference.

FROM nvcr.io/nvidia/omniverse/kit-appstreaming-manager:$APP_VERSION

# Create the extensions directory
RUN mkdir -p /resolvers

# Copy the NodePortResolver.py into /extensions
COPY node_port.py /resolvers/

# Create an empty __init__.py to make it a package
RUN touch /resolvers/__init__.py
  1. Build and push the container, replacing the registry, name and version:

    docker build -t {my-registry}/{kit-appstreaming-manager-node-port}:{1.0.0}
    docker push {my-registry}/{kit-appstreaming-manager-node-port}:{1.0.0}
    
  2. Create or update the values file to use the new container and instruct it to use the new resolver class (this is a partial values file with just the required fields changed):

    streaming:
       image:
          # -- Image repository.
          repository: "my-registry/kit-appstreaming-manager-node-port"
          # -- Image pull policy.
          pullPolicy: Always
          # -- Image tag.
          tag: 1.0.0
    
       serviceConfig:
          # -- CSP customization manager
          backend_csp_cls: "resolvers.node_port:NodePortResolver"
    
          # -- CSP Customization manager arguments
          backend_csp_args: {}
    
  3. Apply the helm chart.

At this point, the NodePortResolver will be loaded and called to resolve the routes based on NodePort and HostIP.