Usage Examples#
Work with Different Endpoint Types#
import omni.sensors.net
# Create different types of endpoints
ip_endpoint = omni.sensors.net.IP4Endpoint(port=8080)
print(f"IP4 endpoint port: {ip_endpoint.port}")
# Create an IP endpoint
ip_ep = omni.sensors.net.Endpoint(
egress_timestamp=1234567890,
type=omni.sensors.net.EndpointType.E_IP
)
ip_ep.ip = ip_endpoint
# Create a CAN endpoint
can_ep = omni.sensors.net.Endpoint(
egress_timestamp=1234567891,
type=omni.sensors.net.EndpointType.E_CAN
)
can_ep.can = 0x123 # CAN ID
# Create a custom endpoint
custom_ep = omni.sensors.net.Endpoint(
egress_timestamp=1234567892,
type=omni.sensors.net.EndpointType.E_CUSTOM
)
# Print endpoint information
print(f"IP endpoint type: {ip_ep.type.name} (value: {ip_ep.type.value})")
print(f"CAN endpoint type: {can_ep.type.name} (value: {can_ep.type.value})")
print(f"Custom endpoint type: {custom_ep.type.name} (value: {custom_ep.type.value})")
# Check endpoint types
if ip_ep.type == omni.sensors.net.EndpointType.E_IP:
print("This is an IP endpoint")
Create Network Factory and Channel#
# Warning: This code snippet is not working. Please fix it.
import omni.sensors.net
import carb
# Get the network factory interface using carb acquire_interface
factory = carb.acquire_interface(omni.sensors.net.INetworkFactory)
# Create a UDP channel configuration
udp_config = {
"backend": "udp",
"port": 8080,
"broadcast": True
}
# Create a UDP channel using the factory
channel = factory.make_channel(udp_config)
# Check if the channel is connected
if channel.connected:
print("Channel is connected and ready to use")
Send and Receive Data with Channel#
# Warning: This code snippet is not working. Please fix it.
import omni.sensors.net
import carb
# Get the network factory and create a channel
factory = carb.acquire_interface(omni.sensors.net.INetworkFactory)
channel = factory.make_channel({"backend": "udp", "port": 8080})
# Create an IP4 endpoint for sending data
ip_endpoint = omni.sensors.net.IP4Endpoint(port=9090)
endpoint = omni.sensors.net.Endpoint(
egress_timestamp=1234567890,
type=omni.sensors.net.EndpointType.E_IP
)
endpoint.ip = ip_endpoint
# Define a reception consumer callback
def on_data_received(data: bytes, sender_endpoint: omni.sensors.net.Endpoint):
print(f"Received data: {data.decode('utf-8')}")
print(f"From endpoint type: {sender_endpoint.type.name}")
# Add the reception consumer to the channel
consumer = channel.add_reception_consumer(on_data_received)
# Send data through the channel
message = b"Hello from Omniverse!"
success = channel.send(message, endpoint)
if success:
print("Message sent successfully")
# Remove the consumer when done
channel.remove_reception_consumer(consumer)
Create Server with Connection Consumer#
# Warning: This code snippet is not working. Please fix it.
import omni.sensors.net
import carb
# Get the network factory interface
factory = carb.acquire_interface(omni.sensors.net.INetworkFactory)
# Create a TCP server configuration
server_config = {
"backend": "tcp",
"port": 8888,
"bind_address": "localhost"
}
# Create a server using the factory
server = factory.make_server(server_config)
# Define a connection consumer class
class MyConnectionConsumer(omni.sensors.net.IConnectionConsumer):
def on_connect(self, channel: omni.sensors.net.IChannel, endpoint: omni.sensors.net.Endpoint):
print(f"New client connected from endpoint type: {endpoint.type.name}")
# Define data reception for the new connection
def handle_client_data(data: bytes, sender: omni.sensors.net.Endpoint):
print(f"Received from client: {data.decode('utf-8')}")
# Echo the message back
channel.send(b"Echo: " + data, sender)
# Add reception consumer to the client channel
channel.add_reception_consumer(handle_client_data)
# Create and add the connection consumer to the server
connection_consumer = MyConnectionConsumer()
server.add_connection_consumer(connection_consumer)
print("Server is running and waiting for connections...")