Usage Examples#

Work with Different Endpoint Types#

import omni.sensors.net

# Create different types of endpoints
ip_endpoint = omni.sensors.net.IP4Endpoint(port=8080)
print(f"IP4 endpoint port: {ip_endpoint.port}")

# Create an IP endpoint
ip_ep = omni.sensors.net.Endpoint(
    egress_timestamp=1234567890,
    type=omni.sensors.net.EndpointType.E_IP
)
ip_ep.ip = ip_endpoint

# Create a CAN endpoint
can_ep = omni.sensors.net.Endpoint(
    egress_timestamp=1234567891,
    type=omni.sensors.net.EndpointType.E_CAN
)
can_ep.can = 0x123  # CAN ID

# Create a custom endpoint
custom_ep = omni.sensors.net.Endpoint(
    egress_timestamp=1234567892,
    type=omni.sensors.net.EndpointType.E_CUSTOM
)

# Print endpoint information
print(f"IP endpoint type: {ip_ep.type.name} (value: {ip_ep.type.value})")
print(f"CAN endpoint type: {can_ep.type.name} (value: {can_ep.type.value})")
print(f"Custom endpoint type: {custom_ep.type.name} (value: {custom_ep.type.value})")

# Check endpoint types
if ip_ep.type == omni.sensors.net.EndpointType.E_IP:
    print("This is an IP endpoint")

Create Network Factory and Channel#

# Warning: This code snippet is not working. Please fix it.
import omni.sensors.net
import carb

# Get the network factory interface using carb acquire_interface
factory = carb.acquire_interface(omni.sensors.net.INetworkFactory)

# Create a UDP channel configuration
udp_config = {
    "backend": "udp",
    "port": 8080,
    "broadcast": True
}

# Create a UDP channel using the factory
channel = factory.make_channel(udp_config)

# Check if the channel is connected
if channel.connected:
    print("Channel is connected and ready to use")

Send and Receive Data with Channel#

# Warning: This code snippet is not working. Please fix it.
import omni.sensors.net
import carb

# Get the network factory and create a channel
factory = carb.acquire_interface(omni.sensors.net.INetworkFactory)
channel = factory.make_channel({"backend": "udp", "port": 8080})

# Create an IP4 endpoint for sending data
ip_endpoint = omni.sensors.net.IP4Endpoint(port=9090)
endpoint = omni.sensors.net.Endpoint(
    egress_timestamp=1234567890,
    type=omni.sensors.net.EndpointType.E_IP
)
endpoint.ip = ip_endpoint

# Define a reception consumer callback
def on_data_received(data: bytes, sender_endpoint: omni.sensors.net.Endpoint):
    print(f"Received data: {data.decode('utf-8')}")
    print(f"From endpoint type: {sender_endpoint.type.name}")

# Add the reception consumer to the channel
consumer = channel.add_reception_consumer(on_data_received)

# Send data through the channel
message = b"Hello from Omniverse!"
success = channel.send(message, endpoint)
if success:
    print("Message sent successfully")

# Remove the consumer when done
channel.remove_reception_consumer(consumer)

Create Server with Connection Consumer#

# Warning: This code snippet is not working. Please fix it.
import omni.sensors.net
import carb

# Get the network factory interface
factory = carb.acquire_interface(omni.sensors.net.INetworkFactory)

# Create a TCP server configuration
server_config = {
    "backend": "tcp",
    "port": 8888,
    "bind_address": "localhost"
}

# Create a server using the factory
server = factory.make_server(server_config)

# Define a connection consumer class
class MyConnectionConsumer(omni.sensors.net.IConnectionConsumer):
    def on_connect(self, channel: omni.sensors.net.IChannel, endpoint: omni.sensors.net.Endpoint):
        print(f"New client connected from endpoint type: {endpoint.type.name}")
        
        # Define data reception for the new connection
        def handle_client_data(data: bytes, sender: omni.sensors.net.Endpoint):
            print(f"Received from client: {data.decode('utf-8')}")
            # Echo the message back
            channel.send(b"Echo: " + data, sender)
        
        # Add reception consumer to the client channel
        channel.add_reception_consumer(handle_client_data)

# Create and add the connection consumer to the server
connection_consumer = MyConnectionConsumer()
server.add_connection_consumer(connection_consumer)

print("Server is running and waiting for connections...")