Sensor Simulation

Multi-Camera

The script below is an example of accessing synthetic data from multiple cameras in a scene. It redundantly demonstrates how to access the data either through a custom replicator writer or directly through the built-in annotators.

  1from omni.isaac.kit import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import os
  6
  7import numpy as np
  8import omni.kit
  9import omni.replicator.core as rep
 10from omni.replicator.core import AnnotatorRegistry, Writer
 11from PIL import Image
 12from pxr import Sdf, UsdGeom
 13
 14NUM_FRAMES = 5
 15
 16# Save rgb image to file
 17def save_rgb(rgb_data, file_name):
 18    rgb_img = Image.fromarray(rgb_data, "RGBA")
 19    rgb_img.save(file_name + ".png")
 20
 21
 22# Randomize cube color every frame using a replicator randomizer
 23def cube_color_randomizer():
 24    cube_prims = rep.get.prims(path_pattern="Cube")
 25    with cube_prims:
 26        rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
 27    return cube_prims.node
 28
 29
 30# Access data through a custom replicator writer
 31class MyWriter(Writer):
 32    def __init__(self, rgb: bool = True):
 33        self._frame_id = 0
 34        if rgb:
 35            self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))
 36        # Create writer output directory
 37        self.file_path = os.path.join(os.getcwd(), "_out_writer", "")
 38        print(f"Writing writer data to {self.file_path}")
 39        dir = os.path.dirname(self.file_path)
 40        os.makedirs(dir, exist_ok=True)
 41
 42    def write(self, data):
 43        for annotator in data.keys():
 44            annotator_split = annotator.split("-")
 45            if len(annotator_split) > 1:
 46                render_product_name = annotator_split[-1]
 47            if annotator.startswith("rgb"):
 48                save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}")
 49        self._frame_id += 1
 50
 51
 52rep.WriterRegistry.register(MyWriter)
 53
 54# Create a new stage and add a dome light
 55omni.usd.get_context().new_stage()
 56stage = omni.usd.get_context().get_stage()
 57dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
 58dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(1000.0)
 59
 60# Create cube
 61cube_prim = stage.DefinePrim("/World/Cube", "Cube")
 62UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0))
 63
 64# Run a few frames to ensure all materials are properly loaded
 65for _ in range(5):
 66    simulation_app.update()
 67
 68# Register cube color randomizer to trigger on every frame
 69rep.randomizer.register(cube_color_randomizer)
 70with rep.trigger.on_frame():
 71    rep.randomizer.cube_color_randomizer()
 72
 73# Create cameras
 74camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera")
 75UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0))
 76UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0))
 77
 78camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera")
 79UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0))
 80UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0))
 81
 82# Create render products
 83rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320), name="rp1")
 84rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640), name="rp2")
 85rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024), name="rp3")
 86
 87# Acess the data through a custom writer
 88writer = rep.WriterRegistry.get("MyWriter")
 89writer.initialize(rgb=True)
 90writer.attach([rp1, rp2, rp3])
 91
 92# Acess the data through annotators
 93rgb_annotators = []
 94for rp in [rp1, rp2, rp3]:
 95    rgb = rep.AnnotatorRegistry.get_annotator("rgb")
 96    rgb.attach(rp)
 97    rgb_annotators.append(rgb)
 98
 99# Create annotator output directory
100file_path = os.path.join(os.getcwd(), "_out_annot", "")
101print(f"Writing annotator data to {file_path}")
102dir = os.path.dirname(file_path)
103os.makedirs(dir, exist_ok=True)
104
105for i in range(NUM_FRAMES):
106    # Improve the rendering quality by rendering multiple subframes per captured frame
107    rep.orchestrator.step(rt_subframes=8)
108    # Get annotator data after each replicator process step
109    for j, rgb_annot in enumerate(rgb_annotators):
110        save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}")
111
112simulation_app.close()

Get synthetic data at custom events in simulations

The script below will use annotators and writers (redundantly) to access synthetic data at specific timepoints during a simulation.

 1from omni.isaac.kit import SimulationApp
 2
 3simulation_app = SimulationApp(launch_config={"renderer": "RayTracedLighting", "headless": False})
 4
 5import json
 6import os
 7
 8import carb.settings
 9import numpy as np
10import omni
11import omni.replicator.core as rep
12from omni.isaac.core import World
13from omni.isaac.core.objects import DynamicCuboid
14from omni.isaac.core.utils.semantics import add_update_semantics
15from PIL import Image
16
17
18# Util function to save rgb annotator data
19def write_rgb_data(rgb_data, file_path):
20    rgb_img = Image.fromarray(rgb_data, "RGBA")
21    rgb_img.save(file_path + ".png")
22
23
24# Util function to save semantic segmentation annotator data
25def write_sem_data(sem_data, file_path):
26    id_to_labels = sem_data["info"]["idToLabels"]
27    with open(file_path + ".json", "w") as f:
28        json.dump(id_to_labels, f)
29    sem_image_data = np.frombuffer(sem_data["data"], dtype=np.uint8).reshape(*sem_data["data"].shape, -1)
30    sem_img = Image.fromarray(sem_image_data, "RGBA")
31    sem_img.save(file_path + ".png")
32
33
34# Create a new stage with the default ground plane
35omni.usd.get_context().new_stage()
36
37# Setup the simulation world
38world = World()
39world.scene.add_default_ground_plane()
40world.reset()
41
42# Setting capture on play to False will prevent the replicator from capturing data each frame
43carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
44
45# Run a few frames to ensure all materials are properly loaded
46for _ in range(5):
47    simulation_app.update()
48
49# Create a camera and render product to collect the data from
50cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0))
51rp = rep.create.render_product(cam, (512, 512))
52
53# Set the output directory for the data
54out_dir = os.getcwd() + "/_out_sim_event"
55os.makedirs(out_dir, exist_ok=True)
56print(f"Outputting data to {out_dir}..")
57
58# Example of using a writer to save the data
59writer = rep.WriterRegistry.get("BasicWriter")
60writer.initialize(
61    output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True
62)
63writer.attach(rp)
64
65# Run a preview to ensure the replicator graph is initialized
66rep.orchestrator.preview()
67
68# Example of accesing the data directly from annotators
69rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
70rgb_annot.attach(rp)
71sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
72sem_annot.attach(rp)
73
74# Spawn and drop a few cubes, capture data when they stop moving
75for i in range(5):
76    cuboid = world.scene.add(DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i)))
77    add_update_semantics(cuboid.prim, "Cuboid")
78
79    for s in range(500):
80        world.step(render=False)
81        vel = np.linalg.norm(cuboid.get_linear_velocity())
82        if vel < 0.1:
83            print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
84            # Tigger the writer and update the annotators with new data
85            rep.orchestrator.step(rt_subframes=4, pause_timeline=False)
86            write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb")
87            write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem")
88            break
89
90simulation_app.close()

Get Camera Parameters

The below script show how to get the camera parameters associated with a viewport.

 1import omni
 2from omni.syntheticdata import helpers
 3import math
 4
 5stage = omni.usd.get_context().get_stage()
 6viewport_api = omni.kit.viewport.utility.get_active_viewport()
 7# Set viewport resolution, changes will occur on next frame
 8viewport_api.set_texture_resolution((512, 512))
 9# get resolution
10(width, height) = viewport_api.get_texture_resolution()
11aspect_ratio = width / height
12# get camera prim attached to viewport
13camera = stage.GetPrimAtPath(viewport_api.get_active_camera())
14focal_length = camera.GetAttribute("focalLength").Get()
15horiz_aperture = camera.GetAttribute("horizontalAperture").Get()
16vert_aperture = camera.GetAttribute("verticalAperture").Get()
17# Pixels are square so we can also do:
18# vert_aperture = height / width * horiz_aperture
19near, far = camera.GetAttribute("clippingRange").Get()
20fov = 2 * math.atan(horiz_aperture / (2 * focal_length))
21# helper to compute projection matrix
22proj_mat = helpers.get_projection_matrix(fov, aspect_ratio, near, far)
23
24# compute focal point and center
25focal_x = height * focal_length / vert_aperture
26focal_y = width * focal_length / horiz_aperture
27center_x = height * 0.5
28center_y = width * 0.5