Customizing Endpoint Resolution#
The sample APIs provide an option for developers to customize how the streaming endpoints are resolved.
Latest Container Release: 1.11.0
By default the resolve class is Generic, the sample also includes an AWS Targetgroup specific resolver class.
Both can be found within the container at the following path:
/usr/local/lib/python3.11/site-packages/nv/svc/streaming/_csp.py
The class and arguments to load at startup is managed by the following settings:
- backend.csp.cls
- backend.csp.args
These can be overwritten via the following settings in the helm values file:
- streaming.serviceConfig.backend_csp_cls
- streaming.serviceConfig.backend_csp_args
Below is the Generic class defined in _csp.py. It inherits from the _CSP class. The class always get passed a configured Kubernetes client that uses the service account defined in the helm chart of the streaming session manager. An example of it being used can be seen in the _fetch_resources function inside the _CSP class.
class Generic(_CSP):
    """Generic and default CSP manager."""
    def __init__(
        self,
        k8s_client: KubernetesClient,
        enable_wss: bool = False,
        hostname_annotation_key: str = "external-dns.alpha.kubernetes.io/hostname",
        service_annotations_location: str = "streamingKit.service.annotations",
        base_domain: str = ""
    ) -> Any:
        """Initialize."""
        super().__init__(k8s_client)
        self._enable_wss = enable_wss
        self._service_annotations_location = service_annotations_location
        self._hostname_annotation_key = hostname_annotation_key
        self._base_domain = base_domain
    async def on_create(self, profile_data: Dict, settings: Dict) -> Dict:
        """Process CSP customisations on stream creation."""
        if not self._enable_wss:
            return {}
        values = profile_data.get("settings", {}).get("values", {})
        keys = self._service_annotations_location.split(".")
        data = values
        for key in keys:
            data = data.get(key, {})
        prefix = self._generate_random_dns_prefix()
        data[self._hostname_annotation_key] = f"{prefix}.{self._base_domain}"
        settings[self._service_annotations_location] = data
        logging.debug(f"Generated settings {settings}")
        return settings
    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
        """Resolve the endpoint to connect to for a given session."""
        services = await self._fetch_resources("service", selectors={"sessionId": session_id})
        routes, status = await self._extract_routes(services)
        return routes, status
121class Generic(_CSP):
122    """Generic and default CSP manager."""
123
124    def __init__(
125        self,
126        k8s_client: KubernetesClient,
127        enable_wss: bool = False,
128        hostname_annotation_key: str = "external-dns.alpha.kubernetes.io/hostname",
129        service_annotations_location: str = "streamingKit.service.annotations",
130        base_domain: str = ""
131    ) -> Any:
132        """Initialize."""
133        super().__init__(k8s_client)
134        self._enable_wss = enable_wss
135        self._service_annotations_location = service_annotations_location
136        self._hostname_annotation_key = hostname_annotation_key
137        self._base_domain = base_domain
138
139    async def on_create(self, profile_data: Dict, settings: Dict) -> Dict:
140        """Process CSP customisations on stream creation."""
141        if not self._enable_wss:
142            return {}
143
144        values = profile_data.get("settings", {}).get("values", {})
145        keys = self._service_annotations_location.split(".")
146        data = values
147        for key in keys:
148            data = data.get(key, {})
149
150        prefix = self._generate_random_dns_prefix()
151        data[self._hostname_annotation_key] = f"{prefix}.{self._base_domain}"
152        settings[self._service_annotations_location] = data
153
154        logging.debug(f"Generated settings {settings}")
155
156        return settings
157
158    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
159        """Resolve the endpoint to connect to for a given session."""
160        services = await self._fetch_resources("service", selectors={"sessionId": session_id})
161        routes, status = await self._extract_routes(services)
162        return routes, status
163
164    def _extract_ports(self, service_spec: Dict) -> Tuple[Dict, bool]:
165        """Extract the port information from the service specification.
166
167        Args:
168            service_spec (Dict): The service specification dictionary.
169
170        Returns:
171            Dict: A dictionary containing the route information.
172        """
173        ports = service_spec.spec.ports
174
175        routes = []
176        for port in ports:
177            routes.append(
178                {
179                    "source_port": port.port,
180                    "description": port.name,
181                    "protocol": port.protocol,
182                    "destination_port": port.node_port
183                }
184            )
185
186        status = True if routes else False
187
188        logging.debug(f"Extracted port mappings {routes}, readiness status {status}")
189        return {"routes": routes}, status
190
191    async def _extract_routes(self, services):
192        routes = {}
193        statuses = []
194
195        for service in services:
196            ports, port_ready = self._extract_ports(service)
197
198            entries = []
199            lb_ready = False
200            if self._enable_wss:
201                hostname, lb_ready = await self._extract_hostname(service)
202                entries.append(hostname)
203            else:
204                ips, lb_ready = await self._extract_lb_ips(service)
205                entries.extend(ips)
206
207            statuses.append(all([port_ready, lb_ready]))
208            if not lb_ready:
209                continue
210
211            for entry in entries:
212                routes[entry] = ports
213
214        status = all(statuses) if statuses else False
215        logging.debug(f"Extracted routes {routes}, readiness status {status}")
216        return routes, status
217
218    async def _extract_lb_ips(self, service) -> Tuple[List, bool]:
219        ips = []
220        hostname = None
221
222        ingress = service.status.load_balancer.ingress
223        if not ingress:
224            return ips, False
225
226        for entry in ingress:
227            if entry.ip:
228                ips.append(entry.ip)
229            hostname = entry.hostname
230
231        if not ips and hostname:
232            logging.debug("No IPs were found attached to the service, trying to resolve hostname")
233            ips = await self._resolve_hostname(hostname)
234
235        status = True if ips else False
236
237        logging.debug(f"Extracted IPs {ips}, readiness status {status}")
238        return ips, status
239
240    async def _extract_hostname(self, service) -> Tuple[str, bool]:
241        annotations = service.metadata.annotations
242        hostname = None
243
244        try:
245            hostname = annotations[self._hostname_annotation_key]
246        except KeyError:
247            logging.error(f"Hostname field `{self._hostname_annotation_key}` not found in annotations")
248            raise HostnameNotFoundError("Unable to find hostname")
249
250        ready = False
251        try:
252            ips = await self._resolve_hostname(hostname)
253            if ips:
254                ready = True
255        except aiodns.error.DNSError as exc:
256            logging.warning(f"Unable to resolve {hostname}: {exc}")
257
258        return hostname, ready
class _CSP(object):
    """Custom CSP behaviour base class."""
    def __init__(self, k8s_client: KubernetesClient) -> None:
        """Initialize."""
        self._k8s_client = k8s_client
        self._default_namespace = "omni-streaming"
    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
        """Process CSP customisations on stream creation."""
        return {}
    async def on_delete(self, data: dict) -> None:
        """Proccess CSP customisations on stream termination."""
        pass
    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
        """Resolve the endpoint to connect to for a given session."""
        return {}, False
    async def _fetch_resources(self, resource_type, selectors=None, args: Dict = None):
        api_class = self._get_k8s_api_class(resource_type)
        args = args or {}
        async with self._k8s_client as api_client:
            api_instance = api_class(api_client.api_http)
            func_name = args.pop('func_name', f"list_namespaced_{resource_type}")
            func = getattr(api_instance, func_name)
            if selectors:
                args['label_selector'] = ",".join(f"{k}={v}" for k, v in selectors.items())
            res = await func(
                namespace=self._default_namespace,
                **args
            )
            return res['items'] if isinstance(res, dict) else res.items
 40class _CSP(object):
 41    """Custom CSP behaviour base class."""
 42
 43    def __init__(self, k8s_client: KubernetesClient) -> None:
 44        """Initialize."""
 45        self._k8s_client = k8s_client
 46        self._default_namespace = "omni-streaming"
 47
 48    def _generate_random_dns_prefix(self, length=6):
 49        letters = string.ascii_lowercase
 50        return ''.join(random.choice(letters) for _ in range(length))
 51
 52    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
 53        """Process CSP customisations on stream creation."""
 54        return {}
 55
 56    async def on_delete(self, data: dict) -> None:
 57        """Proccess CSP customisations on stream termination."""
 58        pass
 59
 60    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
 61        """Resolve the endpoint to connect to for a given session."""
 62        return {}, False
 63
 64    async def _resolve_hostname(self, hostname):
 65        ips = []
 66
 67        resolver = aiodns.DNSResolver()
 68        try:
 69            ipv4_records = await resolver.query(hostname, 'A')
 70        except aiodns.error.DNSError as exc:
 71            logging.warning(f"Failed to resolve DNS for {hostname}. The domain might not have propagated yet if this is a new stream. {exc}")
 72            return []
 73
 74        ips.extend([record.host for record in ipv4_records])
 75        return ips
 76
 77    async def _fetch_resources(self, resource_type, selectors=None, args: Dict = None):
 78        api_class = self._get_k8s_api_class(resource_type)
 79
 80        args = args or {}
 81        async with self._k8s_client as api_client:
 82            api_instance = api_class(api_client.api_http)
 83            func_name = args.pop('func_name', f"list_namespaced_{resource_type}")
 84            func = getattr(api_instance, func_name)
 85
 86            if selectors:
 87                args['label_selector'] = ",".join(f"{k}={v}" for k, v in selectors.items())
 88
 89            res = await func(
 90                namespace=self._default_namespace,
 91                **args
 92            )
 93            return res['items'] if isinstance(res, dict) else res.items
 94
 95    def _get_k8s_api_class(self, resource_type):
 96        resource_to_api_class = {
 97            'pod': CoreV1Api,
 98            'service': CoreV1Api,
 99            'deployment': AppsV1Api,
100            'targetgroupbinding': CustomObjectsApi,
101
102        }
103        return resource_to_api_class.get(resource_type, CoreV1Api)
104
105    async def _tcp_port_ready(self, host: str, port: int) -> bool:
106        try:
107            logging.debug(f"Testing stream readiness on {host}:{port}")
108            reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=2)
109            writer.close()
110            await writer.wait_closed()
111            logging.debug(f"Stream connection on {host}:{port} successful")
112            return True
113        except asyncio.TimeoutError:
114            logging.info(f"Failed to connect to {host}:{port}. Connection not ready.")
115        except Exception as exc:
116            logging.error(f"Failed to connect to {host}:{port}: {exc}")
117
118        return False
class AWS(_CSP):
    """AWS customisations."""
    def __init__(
        self,
        k8s_client: KubernetesClient,
        nlb_mgmt_svc: str = "",
        enable_wss: bool = False
    ) -> Any:
        """Initialize AWS class."""
        super().__init__(k8s_client=k8s_client)
        self._nlb_mgmt_svc_url = nlb_mgmt_svc
        self._port_locations = {
            "media": "streamingKit.service.mediaPort",
            "signaling": "streamingKit.service.signalingPort"
        }
        self._targetgroup_arn_locations = {
            "media": "streamingKit.aws.targetgroups.media",
            "signaling": "streamingKit.aws.targetgroups.signaling",
        }
        self._listeners_arn_locations = {
            "media": "streamingKit.aws.listeners.media",
            "signaling": "streamingKit.aws.listeners.signaling",
        }
        self._nlb_location = "streamingKit.aws.nlb"
        self._alias_location = "streamingKit.aws.alias"
        self._enable_wss = enable_wss
    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
        """Process AWS customisations on stream creation."""
        values = profile_data["settings"]["values"]
        ports = {}
        for port_name, location in self._port_locations.items():
            ports[port_name] = self._lookup_nested_dict(values, location)
        allocations = []
        default_protocol = "TLS" if self._enable_wss else "TCP"
        for name in ports.keys():
            allocations.append(
                {
                    "name": name,
                    "protocol": "UDP" if name == "media" else default_protocol
                }
            )
        url = f"{self._nlb_mgmt_svc_url}/allocation"
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json={"allocations": allocations}) as resp:
                if resp.status not in [200]:
                    detail = await resp.text()
                    error_msg = f"Failed request to {url}: {resp.status}, {detail}"
                    logging.error(error_msg)
                    raise _APIError(status_code=resp.status, details=error_msg)
                arns = await resp.json()
        settings[self._nlb_location] = arns["loadbalancer"]["dnsName"]
        settings[self._alias_location] = arns["loadbalancer"].get("alias", "")
        for key, location in self._targetgroup_arn_locations.items():
            listener_arn = arns['allocations'][key]['listenerArn']
            listener_port = arns['allocations'][key]['listenerPort']
            listener_protocol = arns['allocations'][key]['listenerProtocol']
            settings[location] = arns["allocations"][key]["targetGroupArn"]
            settings[self._listeners_arn_locations[key]] = f"{listener_arn}@{listener_port}@{listener_protocol}@{key}"
        logging.debug(f"Generated settings {settings}")
        return settings
    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
        """Resolve the endpoint to connect to for a given session."""
        args = {
            'func_name': 'list_namespaced_custom_object',
            'group': 'elbv2.k8s.aws',
            'version': 'v1beta1',
            'plural': 'targetgroupbindings',
        }
        tgbs = await self._fetch_resources(
            "targetgroupbinding",
            selectors={"sessionId": session_id},
            args=args
        )
        routes, status = await self._extract_routes(tgbs)
        return routes, status
261class AWS(_CSP):
262    """AWS customisations."""
263
264    def __init__(
265        self,
266        k8s_client: KubernetesClient,
267        nlb_mgmt_svc: str = "",
268        enable_wss: bool = False
269    ) -> Any:
270        """Initialize AWS class."""
271        super().__init__(k8s_client=k8s_client)
272        self._nlb_mgmt_svc_url = nlb_mgmt_svc
273
274        self._port_locations = {
275            "media": "streamingKit.service.mediaPort",
276            "signaling": "streamingKit.service.signalingPort"
277        }
278
279        self._targetgroup_arn_locations = {
280            "media": "streamingKit.aws.targetgroups.media",
281            "signaling": "streamingKit.aws.targetgroups.signaling",
282        }
283
284        self._listeners_arn_locations = {
285            "media": "streamingKit.aws.listeners.media",
286            "signaling": "streamingKit.aws.listeners.signaling",
287        }
288
289        self._nlb_location = "streamingKit.aws.nlb"
290        self._alias_location = "streamingKit.aws.alias"
291        self._enable_wss = enable_wss
292
293    def _lookup_nested_dict(self, nested_dict, key_string):
294        keys = key_string.split('.')
295        value = nested_dict
296        for key in keys:
297            value = value[key]
298        return value
299
300    async def on_create(self, profile_data: dict, settings: dict) -> Dict:
301        """Process AWS customisations on stream creation."""
302        values = profile_data["settings"]["values"]
303
304        ports = {}
305        for port_name, location in self._port_locations.items():
306            ports[port_name] = self._lookup_nested_dict(values, location)
307
308        allocations = []
309        default_protocol = "TLS" if self._enable_wss else "TCP"
310
311        for name in ports.keys():
312            allocations.append(
313                {
314                    "name": name,
315                    "protocol": "UDP" if name == "media" else default_protocol
316                }
317            )
318
319        url = f"{self._nlb_mgmt_svc_url}/allocation"
320        async with aiohttp.ClientSession() as session:
321            async with session.post(url, json={"allocations": allocations}) as resp:
322                if resp.status not in [200]:
323                    detail = await resp.text()
324                    error_msg = f"Failed request to {url}: {resp.status}, {detail}"
325                    logging.error(error_msg)
326                    raise _APIError(status_code=resp.status, details=error_msg)
327
328                arns = await resp.json()
329
330        settings[self._nlb_location] = arns["loadbalancer"]["dnsName"]
331        settings[self._alias_location] = arns["loadbalancer"].get("alias", "")
332
333        for key, location in self._targetgroup_arn_locations.items():
334            listener_arn = arns['allocations'][key]['listenerArn']
335            listener_port = arns['allocations'][key]['listenerPort']
336            listener_protocol = arns['allocations'][key]['listenerProtocol']
337            settings[location] = arns["allocations"][key]["targetGroupArn"]
338            settings[self._listeners_arn_locations[key]] = f"{listener_arn}@{listener_port}@{listener_protocol}@{key}"
339
340        logging.debug(f"Generated settings {settings}")
341        return settings
342
343    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
344        """Resolve the endpoint to connect to for a given session."""
345
346        args = {
347            'func_name': 'list_namespaced_custom_object',
348            'group': 'elbv2.k8s.aws',
349            'version': 'v1beta1',
350            'plural': 'targetgroupbindings',
351        }
352
353        tgbs = await self._fetch_resources(
354            "targetgroupbinding",
355            selectors={"sessionId": session_id},
356            args=args
357        )
358
359        routes, status = await self._extract_routes(tgbs)
360
361        return routes, status
362
363    async def _extract_routes(self, tgbs: List) -> Tuple[Dict, bool] :
364        routes = []
365        statuses = []
366
367        hostnames = []
368
369        for tgb in tgbs:
370            annotations = tgb["metadata"]["annotations"]
371            listener = annotations.get('nvidia.com/omniverse.listener', '')
372            nlb_hostname = annotations.get('nvidia.com/omniverse.nlb', '')
373            alias = annotations.get('nvidia.com/omniverse.alias', '')
374
375            hostname = alias if self._enable_wss else nlb_hostname
376
377            if not listener or not hostname:
378                statuses.append(False)
379                logging.error(f"Invalid targetgroupbinding was found. No listener annotation was found {tgb}")
380                continue
381
382            listener_arn, port, protocol, name = listener.split("@")
383            hostnames.append(hostname)
384            routes.append({
385                "source_port": int(port),
386                "description": name,
387                "protocol": protocol,
388                "destination_port": -1
389            })
390            status = await self._tcp_port_ready(hostname, port) if protocol.lower() == "tcp" else True
391            statuses.append(status)
392
393        status = all(statuses) if statuses else False
394
395        assembled_routes = {}
396
397        ips = []
398        if not self._enable_wss:
399            for hostname in hostnames:
400                ips.extend(await self._resolve_hostname(hostname))
401            hostnames = ips
402
403        for hostname in hostnames:
404            assembled_routes[hostname] = {"routes": routes}
405
406        logging.debug(f"Extracted routes {assembled_routes}, readiness status {status}")
407        return assembled_routes, status
Creating a custom CSP resolver#
Because the class and arguments to load can be done dynamically it is possible for a developer to provide their own class and arguments as long as the python file containing the class is within the container and within a python package (ie: an __init__.py file is present).
There are three functions that allow custom behavior:
- on_create- Allows customizing the chart values that are passed to the - rmcpservice before the session gets instantiated.
- Takes as arguments the chart values that come from resolving the profile, this gives access to the resolved values and settings, the overrides specified within a version. 
- It returns an updated settings dictionary that are resolved to update the chart values. 
 
- on_delete- Allows additional actions to be taken when a session is terminated 
- Takes as an argument a dictionary which currently only contains the - session_id
- Returns nothing. 
 
- resolve_endpoints- Determines the routes passed back to the web client, and if the session is ready to connect to 
- Takes as an argument the - session_id.
- Returns a tuple of a dictionary with routes and a boolean indicating if the session is ready. - If the boolean is false, a - 202will be returned to the client.
 
 
Example custom endpoint resolver#
Here is an example of creating a custom endpoint resolver and deploying it to an Omniverse Kit App Streaming instance.
 1from typing import Tuple, Dict, List
 2
 3from nv.svc.streaming._csp import Generic
 4
 5class NodePortResolver(Generic):
 6
 7    async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
 8        """Resolve the endpoint to connect to for a given session."""
 9        services = await self._fetch_resources("service", selectors={"sessionId": session_id})
10        pods = await self._fetch_resources("pod", selectors={"sessionId": session_id})
11        routes, status = await self._extract_routes(services, pods)
12        return routes, status
13
14    def _extract_ports(self, service_spec: Dict) -> Tuple[Dict, bool]:
15        """Extract the port information from the service specification.
16
17        Args:
18            service_spec (Dict): The service specification dictionary.
19
20        Returns:
21            Dict: A dictionary containing the route information.
22        """
23        ports = service_spec.spec.ports
24
25        routes = []
26        for port in ports:
27            routes.append(
28                {
29                    "source_port": port.node_port,
30                    "description": port.name,
31                    "protocol": port.protocol,
32                    "destination_port": 0 # this field is no longer used.
33                }
34            )
35
36        status = True if routes else False
37
38        return {"routes": routes}, status
39
40    async def _extract_routes(self, services, pods):
41        routes = {}
42        statuses = []
43
44        for service in services:
45            ports, port_ready = self._extract_ports(service)
46            ips, ip_ready = await self._extract_pod_node_ips(pods)
47            statuses.append(port_ready and ip_ready)
48
49            if not ip_ready:
50                continue
51
52            for ip in ips:
53                routes[ip] = ports
54
55        status = all(statuses) if statuses else False
56        return routes, status
57
58    async def _extract_pod_node_ips(self, pods) -> Tuple[List[str], bool]:
59        """Extract the host IPs of the nodes hosting the pods of the stream."""
60        node_ips = set()
61        for pod in pods.items:  # .items to iterate through pod list
62            if pod.status.host_ip:
63                node_ips.add(pod.status.host_ip)
64
65        return list(node_ips), bool(node_ips)
- Create a dockerfile that copies - node_port.pyinto the streaming session manager container:
Note
The code snippets below include a variable for $APP_VERSION. This variable can be edited manually using the latest release version number, or set using an environment variable (e.g., export APP_VERSION=). The latest container versions are listed at the top of this page for quick reference.
FROM nvcr.io/nvidia/omniverse/kit-appstreaming-manager:$APP_VERSION # Create the extensions directory RUN mkdir -p /resolvers # Copy the NodePortResolver.py into /extensions COPY node_port.py /resolvers/ # Create an empty __init__.py to make it a package RUN touch /resolvers/__init__.py
- Build and push the container, replacing the registry, name and version: - docker build -t {my-registry}/{kit-appstreaming-manager-node-port}:{1.0.0} docker push {my-registry}/{kit-appstreaming-manager-node-port}:{1.0.0} 
- Create or update the values file to use the new container and instruct it to use the new resolver class (this is a partial values file with just the required fields changed): - streaming: image: # -- Image repository. repository: "my-registry/kit-appstreaming-manager-node-port" # -- Image pull policy. pullPolicy: Always # -- Image tag. tag: 1.0.0 serviceConfig: # -- CSP customization manager backend_csp_cls: "resolvers.node_port:NodePortResolver" # -- CSP Customization manager arguments backend_csp_args: {} 
- Apply the helm chart. 
At this point, the NodePortResolver will be loaded and called to resolve the routes based on NodePort and HostIP.
