Customizing Endpoint Resolution#
The sample APIs provide an option for developers to customize how the streaming endpoints are resolved.
Latest Container Release: 1.8.0
By default the resolve class is Generic
, the sample also includes an AWS Targetgroup
specific resolver class.
Both can be found within the container at the following path:
/usr/local/lib/python3.11/site-packages/nv/svc/streaming/_csp.py
The class and arguments to load at startup is managed by the following settings:
backend.csp.cls
backend.csp.args
These can be overwritten via the following settings in the helm values file:
streaming.serviceConfig.backend_csp_cls
streaming.serviceConfig.backend_csp_args
Below is the Generic
class defined in _csp.py
. It inherits from the _CSP
class. The class always get passed a configured Kubernetes client that uses the service account defined in the helm chart of the streaming session manager. An example of it being used can be seen in the _fetch_resources
function inside the _CSP
class.
class Generic(_CSP):
"""Generic and default CSP manager."""
def __init__(
self,
k8s_client: KubernetesClient,
enable_wss: bool = False,
hostname_annotation_key: str = "external-dns.alpha.kubernetes.io/hostname",
service_annotations_location: str = "streamingKit.service.annotations",
base_domain: str = ""
) -> Any:
"""Initialize."""
super().__init__(k8s_client)
self._enable_wss = enable_wss
self._service_annotations_location = service_annotations_location
self._hostname_annotation_key = hostname_annotation_key
self._base_domain = base_domain
async def on_create(self, profile_data: Dict, settings: Dict) -> Dict:
"""Process CSP customisations on stream creation."""
if not self._enable_wss:
return {}
values = profile_data.get("settings", {}).get("values", {})
keys = self._service_annotations_location.split(".")
data = values
for key in keys:
data = data.get(key, {})
prefix = self._generate_random_dns_prefix()
data[self._hostname_annotation_key] = f"{prefix}.{self._base_domain}"
settings[self._service_annotations_location] = data
logging.debug(f"Generated settings {settings}")
return settings
async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
"""Resolve the endpoint to connect to for a given session."""
services = await self._fetch_resources("service", selectors={"sessionId": session_id})
routes, status = await self._extract_routes(services)
return routes, status
121class Generic(_CSP):
122 """Generic and default CSP manager."""
123
124 def __init__(
125 self,
126 k8s_client: KubernetesClient,
127 enable_wss: bool = False,
128 hostname_annotation_key: str = "external-dns.alpha.kubernetes.io/hostname",
129 service_annotations_location: str = "streamingKit.service.annotations",
130 base_domain: str = ""
131 ) -> Any:
132 """Initialize."""
133 super().__init__(k8s_client)
134 self._enable_wss = enable_wss
135 self._service_annotations_location = service_annotations_location
136 self._hostname_annotation_key = hostname_annotation_key
137 self._base_domain = base_domain
138
139 async def on_create(self, profile_data: Dict, settings: Dict) -> Dict:
140 """Process CSP customisations on stream creation."""
141 if not self._enable_wss:
142 return {}
143
144 values = profile_data.get("settings", {}).get("values", {})
145 keys = self._service_annotations_location.split(".")
146 data = values
147 for key in keys:
148 data = data.get(key, {})
149
150 prefix = self._generate_random_dns_prefix()
151 data[self._hostname_annotation_key] = f"{prefix}.{self._base_domain}"
152 settings[self._service_annotations_location] = data
153
154 logging.debug(f"Generated settings {settings}")
155
156 return settings
157
158 async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
159 """Resolve the endpoint to connect to for a given session."""
160 services = await self._fetch_resources("service", selectors={"sessionId": session_id})
161 routes, status = await self._extract_routes(services)
162 return routes, status
163
164 def _extract_ports(self, service_spec: Dict) -> Tuple[Dict, bool]:
165 """Extract the port information from the service specification.
166
167 Args:
168 service_spec (Dict): The service specification dictionary.
169
170 Returns:
171 Dict: A dictionary containing the route information.
172 """
173 ports = service_spec.spec.ports
174
175 routes = []
176 for port in ports:
177 routes.append(
178 {
179 "source_port": port.port,
180 "description": port.name,
181 "protocol": port.protocol,
182 "destination_port": port.node_port
183 }
184 )
185
186 status = True if routes else False
187
188 logging.debug(f"Extracted port mappings {routes}, readiness status {status}")
189 return {"routes": routes}, status
190
191 async def _extract_routes(self, services):
192 routes = {}
193 statuses = []
194
195 for service in services:
196 ports, port_ready = self._extract_ports(service)
197
198 entries = []
199 lb_ready = False
200 if self._enable_wss:
201 hostname, lb_ready = await self._extract_hostname(service)
202 entries.append(hostname)
203 else:
204 ips, lb_ready = await self._extract_lb_ips(service)
205 entries.extend(ips)
206
207 statuses.append(all([port_ready, lb_ready]))
208 if not lb_ready:
209 continue
210
211 for entry in entries:
212 routes[entry] = ports
213
214 status = all(statuses) if statuses else False
215 logging.debug(f"Extracted routes {routes}, readiness status {status}")
216 return routes, status
217
218 async def _extract_lb_ips(self, service) -> Tuple[List, bool]:
219 ips = []
220 hostname = None
221
222 ingress = service.status.load_balancer.ingress
223 if not ingress:
224 return ips, False
225
226 for entry in ingress:
227 if entry.ip:
228 ips.append(entry.ip)
229 hostname = entry.hostname
230
231 if not ips and hostname:
232 logging.debug("No IPs were found attached to the service, trying to resolve hostname")
233 ips = await self._resolve_hostname(hostname)
234
235 status = True if ips else False
236
237 logging.debug(f"Extracted IPs {ips}, readiness status {status}")
238 return ips, status
239
240 async def _extract_hostname(self, service) -> Tuple[str, bool]:
241 annotations = service.metadata.annotations
242 hostname = None
243
244 try:
245 hostname = annotations[self._hostname_annotation_key]
246 except KeyError:
247 logging.error(f"Hostname field `{self._hostname_annotation_key}` not found in annotations")
248 raise HostnameNotFoundError("Unable to find hostname")
249
250 ready = False
251 try:
252 ips = await self._resolve_hostname(hostname)
253 if ips:
254 ready = True
255 except aiodns.error.DNSError as exc:
256 logging.warning(f"Unable to resolve {hostname}: {exc}")
257
258 return hostname, ready
class _CSP(object):
"""Custom CSP behaviour base class."""
def __init__(self, k8s_client: KubernetesClient) -> None:
"""Initialize."""
self._k8s_client = k8s_client
self._default_namespace = "omni-streaming"
async def on_create(self, profile_data: dict, settings: dict) -> Dict:
"""Process CSP customisations on stream creation."""
return {}
async def on_delete(self, data: dict) -> None:
"""Proccess CSP customisations on stream termination."""
pass
async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
"""Resolve the endpoint to connect to for a given session."""
return {}, False
async def _fetch_resources(self, resource_type, selectors=None, args: Dict = None):
api_class = self._get_k8s_api_class(resource_type)
args = args or {}
async with self._k8s_client as api_client:
api_instance = api_class(api_client.api_http)
func_name = args.pop('func_name', f"list_namespaced_{resource_type}")
func = getattr(api_instance, func_name)
if selectors:
args['label_selector'] = ",".join(f"{k}={v}" for k, v in selectors.items())
res = await func(
namespace=self._default_namespace,
**args
)
return res['items'] if isinstance(res, dict) else res.items
40class _CSP(object):
41 """Custom CSP behaviour base class."""
42
43 def __init__(self, k8s_client: KubernetesClient) -> None:
44 """Initialize."""
45 self._k8s_client = k8s_client
46 self._default_namespace = "omni-streaming"
47
48 def _generate_random_dns_prefix(self, length=6):
49 letters = string.ascii_lowercase
50 return ''.join(random.choice(letters) for _ in range(length))
51
52 async def on_create(self, profile_data: dict, settings: dict) -> Dict:
53 """Process CSP customisations on stream creation."""
54 return {}
55
56 async def on_delete(self, data: dict) -> None:
57 """Proccess CSP customisations on stream termination."""
58 pass
59
60 async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
61 """Resolve the endpoint to connect to for a given session."""
62 return {}, False
63
64 async def _resolve_hostname(self, hostname):
65 ips = []
66
67 resolver = aiodns.DNSResolver()
68 try:
69 ipv4_records = await resolver.query(hostname, 'A')
70 except aiodns.error.DNSError as exc:
71 logging.warning(f"Failed to resolve DNS for {hostname}. The domain might not have propagated yet if this is a new stream. {exc}")
72 return []
73
74 ips.extend([record.host for record in ipv4_records])
75 return ips
76
77 async def _fetch_resources(self, resource_type, selectors=None, args: Dict = None):
78 api_class = self._get_k8s_api_class(resource_type)
79
80 args = args or {}
81 async with self._k8s_client as api_client:
82 api_instance = api_class(api_client.api_http)
83 func_name = args.pop('func_name', f"list_namespaced_{resource_type}")
84 func = getattr(api_instance, func_name)
85
86 if selectors:
87 args['label_selector'] = ",".join(f"{k}={v}" for k, v in selectors.items())
88
89 res = await func(
90 namespace=self._default_namespace,
91 **args
92 )
93 return res['items'] if isinstance(res, dict) else res.items
94
95 def _get_k8s_api_class(self, resource_type):
96 resource_to_api_class = {
97 'pod': CoreV1Api,
98 'service': CoreV1Api,
99 'deployment': AppsV1Api,
100 'targetgroupbinding': CustomObjectsApi,
101
102 }
103 return resource_to_api_class.get(resource_type, CoreV1Api)
104
105 async def _tcp_port_ready(self, host: str, port: int) -> bool:
106 try:
107 logging.debug(f"Testing stream readiness on {host}:{port}")
108 reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=2)
109 writer.close()
110 await writer.wait_closed()
111 logging.debug(f"Stream connection on {host}:{port} successful")
112 return True
113 except asyncio.TimeoutError:
114 logging.info(f"Failed to connect to {host}:{port}. Connection not ready.")
115 except Exception as exc:
116 logging.error(f"Failed to connect to {host}:{port}: {exc}")
117
118 return False
class AWS(_CSP):
"""AWS customisations."""
def __init__(
self,
k8s_client: KubernetesClient,
nlb_mgmt_svc: str = "",
enable_wss: bool = False
) -> Any:
"""Initialize AWS class."""
super().__init__(k8s_client=k8s_client)
self._nlb_mgmt_svc_url = nlb_mgmt_svc
self._port_locations = {
"media": "streamingKit.service.mediaPort",
"signaling": "streamingKit.service.signalingPort"
}
self._targetgroup_arn_locations = {
"media": "streamingKit.aws.targetgroups.media",
"signaling": "streamingKit.aws.targetgroups.signaling",
}
self._listeners_arn_locations = {
"media": "streamingKit.aws.listeners.media",
"signaling": "streamingKit.aws.listeners.signaling",
}
self._nlb_location = "streamingKit.aws.nlb"
self._alias_location = "streamingKit.aws.alias"
self._enable_wss = enable_wss
async def on_create(self, profile_data: dict, settings: dict) -> Dict:
"""Process AWS customisations on stream creation."""
values = profile_data["settings"]["values"]
ports = {}
for port_name, location in self._port_locations.items():
ports[port_name] = self._lookup_nested_dict(values, location)
allocations = []
default_protocol = "TLS" if self._enable_wss else "TCP"
for name in ports.keys():
allocations.append(
{
"name": name,
"protocol": "UDP" if name == "media" else default_protocol
}
)
url = f"{self._nlb_mgmt_svc_url}/allocation"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"allocations": allocations}) as resp:
if resp.status not in [200]:
detail = await resp.text()
error_msg = f"Failed request to {url}: {resp.status}, {detail}"
logging.error(error_msg)
raise _APIError(status_code=resp.status, details=error_msg)
arns = await resp.json()
settings[self._nlb_location] = arns["loadbalancer"]["dnsName"]
settings[self._alias_location] = arns["loadbalancer"].get("alias", "")
for key, location in self._targetgroup_arn_locations.items():
listener_arn = arns['allocations'][key]['listenerArn']
listener_port = arns['allocations'][key]['listenerPort']
listener_protocol = arns['allocations'][key]['listenerProtocol']
settings[location] = arns["allocations"][key]["targetGroupArn"]
settings[self._listeners_arn_locations[key]] = f"{listener_arn}@{listener_port}@{listener_protocol}@{key}"
logging.debug(f"Generated settings {settings}")
return settings
async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
"""Resolve the endpoint to connect to for a given session."""
args = {
'func_name': 'list_namespaced_custom_object',
'group': 'elbv2.k8s.aws',
'version': 'v1beta1',
'plural': 'targetgroupbindings',
}
tgbs = await self._fetch_resources(
"targetgroupbinding",
selectors={"sessionId": session_id},
args=args
)
routes, status = await self._extract_routes(tgbs)
return routes, status
261class AWS(_CSP):
262 """AWS customisations."""
263
264 def __init__(
265 self,
266 k8s_client: KubernetesClient,
267 nlb_mgmt_svc: str = "",
268 enable_wss: bool = False
269 ) -> Any:
270 """Initialize AWS class."""
271 super().__init__(k8s_client=k8s_client)
272 self._nlb_mgmt_svc_url = nlb_mgmt_svc
273
274 self._port_locations = {
275 "media": "streamingKit.service.mediaPort",
276 "signaling": "streamingKit.service.signalingPort"
277 }
278
279 self._targetgroup_arn_locations = {
280 "media": "streamingKit.aws.targetgroups.media",
281 "signaling": "streamingKit.aws.targetgroups.signaling",
282 }
283
284 self._listeners_arn_locations = {
285 "media": "streamingKit.aws.listeners.media",
286 "signaling": "streamingKit.aws.listeners.signaling",
287 }
288
289 self._nlb_location = "streamingKit.aws.nlb"
290 self._alias_location = "streamingKit.aws.alias"
291 self._enable_wss = enable_wss
292
293 def _lookup_nested_dict(self, nested_dict, key_string):
294 keys = key_string.split('.')
295 value = nested_dict
296 for key in keys:
297 value = value[key]
298 return value
299
300 async def on_create(self, profile_data: dict, settings: dict) -> Dict:
301 """Process AWS customisations on stream creation."""
302 values = profile_data["settings"]["values"]
303
304 ports = {}
305 for port_name, location in self._port_locations.items():
306 ports[port_name] = self._lookup_nested_dict(values, location)
307
308 allocations = []
309 default_protocol = "TLS" if self._enable_wss else "TCP"
310
311 for name in ports.keys():
312 allocations.append(
313 {
314 "name": name,
315 "protocol": "UDP" if name == "media" else default_protocol
316 }
317 )
318
319 url = f"{self._nlb_mgmt_svc_url}/allocation"
320 async with aiohttp.ClientSession() as session:
321 async with session.post(url, json={"allocations": allocations}) as resp:
322 if resp.status not in [200]:
323 detail = await resp.text()
324 error_msg = f"Failed request to {url}: {resp.status}, {detail}"
325 logging.error(error_msg)
326 raise _APIError(status_code=resp.status, details=error_msg)
327
328 arns = await resp.json()
329
330 settings[self._nlb_location] = arns["loadbalancer"]["dnsName"]
331 settings[self._alias_location] = arns["loadbalancer"].get("alias", "")
332
333 for key, location in self._targetgroup_arn_locations.items():
334 listener_arn = arns['allocations'][key]['listenerArn']
335 listener_port = arns['allocations'][key]['listenerPort']
336 listener_protocol = arns['allocations'][key]['listenerProtocol']
337 settings[location] = arns["allocations"][key]["targetGroupArn"]
338 settings[self._listeners_arn_locations[key]] = f"{listener_arn}@{listener_port}@{listener_protocol}@{key}"
339
340 logging.debug(f"Generated settings {settings}")
341 return settings
342
343 async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
344 """Resolve the endpoint to connect to for a given session."""
345
346 args = {
347 'func_name': 'list_namespaced_custom_object',
348 'group': 'elbv2.k8s.aws',
349 'version': 'v1beta1',
350 'plural': 'targetgroupbindings',
351 }
352
353 tgbs = await self._fetch_resources(
354 "targetgroupbinding",
355 selectors={"sessionId": session_id},
356 args=args
357 )
358
359 routes, status = await self._extract_routes(tgbs)
360
361 return routes, status
362
363 async def _extract_routes(self, tgbs: List) -> Tuple[Dict, bool] :
364 routes = []
365 statuses = []
366
367 hostnames = []
368
369 for tgb in tgbs:
370 annotations = tgb["metadata"]["annotations"]
371 listener = annotations.get('nvidia.com/omniverse.listener', '')
372 nlb_hostname = annotations.get('nvidia.com/omniverse.nlb', '')
373 alias = annotations.get('nvidia.com/omniverse.alias', '')
374
375 hostname = alias if self._enable_wss else nlb_hostname
376
377 if not listener or not hostname:
378 statuses.append(False)
379 logging.error(f"Invalid targetgroupbinding was found. No listener annotation was found {tgb}")
380 continue
381
382 listener_arn, port, protocol, name = listener.split("@")
383 hostnames.append(hostname)
384 routes.append({
385 "source_port": int(port),
386 "description": name,
387 "protocol": protocol,
388 "destination_port": -1
389 })
390 status = await self._tcp_port_ready(hostname, port) if protocol.lower() == "tcp" else True
391 statuses.append(status)
392
393 status = all(statuses) if statuses else False
394
395 assembled_routes = {}
396
397 ips = []
398 if not self._enable_wss:
399 for hostname in hostnames:
400 ips.extend(await self._resolve_hostname(hostname))
401 hostnames = ips
402
403 for hostname in hostnames:
404 assembled_routes[hostname] = {"routes": routes}
405
406 logging.debug(f"Extracted routes {assembled_routes}, readiness status {status}")
407 return assembled_routes, status
Creating a custom CSP resolver#
Because the class and arguments to load can be done dynamically it is possible for a developer to provide their own class and arguments as long as the python file containing the class is within the container and within a python package (ie: an __init__.py
file is present).
There are three functions that allow custom behavior:
on_create
Allows customizing the chart values that are passed to the
rmcp
service before the session gets instantiated.Takes as arguments the chart values that come from resolving the profile, this gives access to the resolved values and settings, the overrides specified within a version.
It returns an updated settings dictionary that are resolved to update the chart values.
on_delete
Allows additional actions to be taken when a session is terminated
Takes as an argument a dictionary which currently only contains the
session_id
Returns nothing.
resolve_endpoints
Determines the routes passed back to the web client, and if the session is ready to connect to
Takes as an argument the
session_id
.Returns a tuple of a dictionary with routes and a boolean indicating if the session is ready.
If the boolean is false, a
202
will be returned to the client.
Example custom endpoint resolver#
Here is an example of creating a custom endpoint resolver and deploying it to an Omniverse Kit App Streaming instance.
1from typing import Tuple, Dict, List
2
3from nv.svc.streaming._csp import Generic
4
5class NodePortResolver(Generic):
6
7 async def resolve_endpoints(self, session_id: str) -> Tuple[Dict, bool]:
8 """Resolve the endpoint to connect to for a given session."""
9 services = await self._fetch_resources("service", selectors={"sessionId": session_id})
10 pods = await self._fetch_resources("pod", selectors={"sessionId": session_id})
11 routes, status = await self._extract_routes(services, pods)
12 return routes, status
13
14 def _extract_ports(self, service_spec: Dict) -> Tuple[Dict, bool]:
15 """Extract the port information from the service specification.
16
17 Args:
18 service_spec (Dict): The service specification dictionary.
19
20 Returns:
21 Dict: A dictionary containing the route information.
22 """
23 ports = service_spec.spec.ports
24
25 routes = []
26 for port in ports:
27 routes.append(
28 {
29 "source_port": port.node_port,
30 "description": port.name,
31 "protocol": port.protocol,
32 "destination_port": 0 # this field is no longer used.
33 }
34 )
35
36 status = True if routes else False
37
38 return {"routes": routes}, status
39
40 async def _extract_routes(self, services, pods):
41 routes = {}
42 statuses = []
43
44 for service in services:
45 ports, port_ready = self._extract_ports(service)
46 ips, ip_ready = await self._extract_pod_node_ips(pods)
47 statuses.append(port_ready and ip_ready)
48
49 if not ip_ready:
50 continue
51
52 for ip in ips:
53 routes[ip] = ports
54
55 status = all(statuses) if statuses else False
56 return routes, status
57
58 async def _extract_pod_node_ips(self, pods) -> Tuple[List[str], bool]:
59 """Extract the host IPs of the nodes hosting the pods of the stream."""
60 node_ips = set()
61 for pod in pods.items: # .items to iterate through pod list
62 if pod.status.host_ip:
63 node_ips.add(pod.status.host_ip)
64
65 return list(node_ips), bool(node_ips)
Create a dockerfile that copies
node_port.py
into the streaming session manager container:
Note
The code snippets below include a variable for $APP_VERSION
. This variable can be edited manually using the latest release version number, or set using an environment variable (e.g., export APP_VERSION=
). The latest container versions are listed at the top of this page for quick reference.
FROM nvcr.io/nvidia/omniverse/kit-appstreaming-manager:$APP_VERSION # Create the extensions directory RUN mkdir -p /resolvers # Copy the NodePortResolver.py into /extensions COPY node_port.py /resolvers/ # Create an empty __init__.py to make it a package RUN touch /resolvers/__init__.py
Build and push the container, replacing the registry, name and version:
docker build -t {my-registry}/{kit-appstreaming-manager-node-port}:{1.0.0} docker push {my-registry}/{kit-appstreaming-manager-node-port}:{1.0.0}
Create or update the values file to use the new container and instruct it to use the new resolver class (this is a partial values file with just the required fields changed):
streaming: image: # -- Image repository. repository: "my-registry/kit-appstreaming-manager-node-port" # -- Image pull policy. pullPolicy: Always # -- Image tag. tag: 1.0.0 serviceConfig: # -- CSP customization manager backend_csp_cls: "resolvers.node_port:NodePortResolver" # -- CSP Customization manager arguments backend_csp_args: {}
Apply the helm chart.
At this point, the NodePortResolver
will be loaded and called to resolve the routes based on NodePort
and HostIP
.