Data Augmentation
Example of using Isaac Sim and Replicator to capture augmented synthetic data.
Learning Objectives
This tutorial provides examples on how to use omni.replicator augmentations on annotators or writers. The provided examples will showcase how to augment rgb and depth annotator data using warp (GPU) or numpy (CPU) kernel/filters. The use of warp is particularly advantageous for executing parallelizable tasks, especially if the data already resides in the GPUs memory, thus avoiding memory copies from GPU to CPU.
For a better understanding of the tutorial, familiarty with omni.replicator, annotators, writers and warp is recommended.
Add Noise to Camera is a similar tutorial using augmentation to publish noisy camera images using ROS.
Scenario
The depicted figure showcases the example augmentations used throughout the examples. The first image is an illustrative example switching the red and blue channels of the image. The second image is a composed augmentation of converting the rgb data to hsv, adding gaussian noise, and converting back to rgb. The third and forth image are results of applying gaussian noise filters with various sigma values to the depth data.
For the example scenario a red cube is spawned with a camera looking at it from a top view. For the cube a replicator randomization graph is created which will trigger a random rotation for every frame capture.
Implementation
The tutorial is split into two parts, first example will showcase means how to augment annotators directly, and secondly how to augment writers. Both examples can be run as Standalone Applications or from the UI using the Script Editor.
Annotator Augmentation
The annotator example will output rgb images with the red and blue channels switched, and two depth images with different gaussian noise levels (saved as grayscale pngs). The example can switch between using warp or numpy augmentations.
To be able to run the augmentation functions one needs to enable scripting in the settings:
Enable Scripting
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
To augment the rgb data we provide for illustrative purposes a function that switches the red and blue channels in the rgb data using numpy (CPU) and warp (GPU) kernels:
RGB to BGR using Warp and Numpy
def rgb_to_bgr_np(data_in):
data_in[:, :, [0, 2]] = data_in[:, :, [2, 0]]
return data_in
@wp.kernel
def rgb_to_bgr_wp(data_in: wp.array3d(dtype=wp.uint8), data_out: wp.array3d(dtype=wp.uint8)):
i, j = wp.tid()
data_out[i, j, 0] = data_in[i, j, 2]
data_out[i, j, 1] = data_in[i, j, 1]
data_out[i, j, 2] = data_in[i, j, 0]
data_out[i, j, 3] = data_in[i, j, 3]
For the depth data we use gaussian noise filters. Note that the functions are registered in the AnnotatorRegistry for later access:
Depth Gaussian Noise using Warp and Numpy
def gaussian_noise_depth_np(data_in, sigma: float, seed: int):
np.random.seed(seed)
return data_in + np.random.randn(*data_in.shape) * sigma
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_np", rep.annotators.Augmentation.from_function(gaussian_noise_depth_np, sigma=0.1, seed=None)
)
@wp.kernel
def gaussian_noise_depth_wp(
data_in: wp.array2d(dtype=wp.float32), data_out: wp.array2d(dtype=wp.float32), sigma: float, seed: int
):
i, j = wp.tid()
state = wp.rand_init(seed, wp.tid())
data_out[i, j] = data_in[i, j] + sigma * wp.randn(state)
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_wp", rep.annotators.Augmentation.from_function(gaussian_noise_depth_wp, sigma=0.1, seed=None)
)
Create the augmentations (warp or numpy) once using the function directly and once from the registry:
Augmentations using Warp or Numpy
rgb_to_bgr_augm = None
gn_depth_augm = None
if USE_WARP:
rgb_to_bgr_augm = rep.annotators.Augmentation.from_function(rgb_to_bgr_wp)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_wp")
else:
rgb_to_bgr_augm = rep.annotators.Augmentation.from_function(rgb_to_bgr_np)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_np")
One can also register a new annotator together with its augmentation:
Register Augmentated Annotator
rep.annotators.register(
name="rgb_to_bgr_augm",
annotator=rep.annotators.augment(
source_annotator=rep.AnnotatorRegistry.get_annotator("rgb"),
augmentation=rgb_to_bgr_augm,
),
)
Finally create the augmented annotators (1x rgb, 2x depth) and attach them to a render product to generate data:
Annotator Augmentation
rgb_to_bgr_annot = rep.AnnotatorRegistry.get_annotator("rgb_to_bgr_augm")
depth_annot_1 = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
depth_annot_1.augment(gn_depth_augm)
depth_annot_2 = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
depth_annot_2.augment(gn_depth_augm, sigma=0.5)
rgb_to_bgr_annot.attach(rp)
depth_annot_1.attach(rp)
depth_annot_2.attach(rp)
[..]
await rep.orchestrator.step_async()
rgb_data = rgb_to_bgr_annot.get_data()
depth_data_1 = depth_annot_1.get_data()
depth_data_2 = depth_annot_2.get_data()
The example can be run as a standalone application using the following commands in the terminal:
./python.sh standalone_examples/replicator/augmentation/annotator_augmentation.py
Optionally the following arguments can be used to change the default behavior:
--use_warp
– flag to use warp (GPU) instead of numpy (CPU) for the augmentation functions (default: False)--num_frames
– the number of frames to be captured (default: 25)
./python.sh standalone_examples/replicator/augmentation/annotator_augmentation.py --use_warp --num_frames 25
import asyncio
import os
import time
import carb.settings
import numpy as np
import omni
import omni.replicator.core as rep
import warp as wp
from omni.isaac.nucleus import get_assets_root_path
from omni.isaac.core.utils.stage import open_stage
from PIL import Image
NUM_FRAMES = 25
USE_WARP = False
ENV_URL = "/Isaac/Environments/Grid/default_environment.usd"
# Enable scripts
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
# Illustrative augmentation switching red and blue channels in rgb data using numpy (CPU) and warp (GPU)
def rgb_to_bgr_np(data_in):
data_in[:, :, [0, 2]] = data_in[:, :, [2, 0]]
return data_in
@wp.kernel
def rgb_to_bgr_wp(data_in: wp.array3d(dtype=wp.uint8), data_out: wp.array3d(dtype=wp.uint8)):
i, j = wp.tid()
data_out[i, j, 0] = data_in[i, j, 2]
data_out[i, j, 1] = data_in[i, j, 1]
data_out[i, j, 2] = data_in[i, j, 0]
data_out[i, j, 3] = data_in[i, j, 3]
# Gaussian noise augmentation on depth data in numpy (CPU) and warp (GPU)
def gaussian_noise_depth_np(data_in, sigma: float, seed: int):
np.random.seed(seed)
return data_in + np.random.randn(*data_in.shape) * sigma
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_np", rep.annotators.Augmentation.from_function(gaussian_noise_depth_np, sigma=0.1, seed=None)
)
@wp.kernel
def gaussian_noise_depth_wp(
data_in: wp.array2d(dtype=wp.float32), data_out: wp.array2d(dtype=wp.float32), sigma: float, seed: int
):
i, j = wp.tid()
state = wp.rand_init(seed, wp.tid())
data_out[i, j] = data_in[i, j] + sigma * wp.randn(state)
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_wp", rep.annotators.Augmentation.from_function(gaussian_noise_depth_wp, sigma=0.1, seed=None)
)
# Helper functions for writing images from annotator data
def write_rgb(data, path):
rgb_img = Image.fromarray(data, mode="RGBA")
rgb_img.save(path + ".png")
def write_depth(data, path):
# Convert to numpy (if warp), normalize, handle any nan values, and convert to from float32 to 8-bit int array
if isinstance(data, wp.array):
data = data.numpy()
# Replace any -inf and inf values with nan, then calculate the mean value and replace nan with the mean
data[np.isinf(data)] = np.nan
data = np.nan_to_num(data, nan=np.nanmean(data), copy=False)
normalized_array = (data - np.min(data)) / (np.max(data) - np.min(data))
integer_array = (normalized_array * 255).astype(np.uint8)
depth_img = Image.fromarray(integer_array, mode="L")
depth_img.save(path + ".png")
# Setup the environment
assets_root_path = get_assets_root_path()
open_stage(assets_root_path + ENV_URL)
# Disable capture on play and async rendering
carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
carb.settings.get_settings().set("/omni/replicator/asyncRendering", False)
carb.settings.get_settings().set("/app/asyncRendering", False)
# Create a red cube and a render product from a camera looking at the cube from the top
red_mat = rep.create.material_omnipbr(diffuse=(1, 0, 0))
red_cube = rep.create.cube(position=(0, 0, 0.71), material=red_mat)
cam = rep.create.camera(position=(0, 0, 5), look_at=(0, 0, 0))
rp = rep.create.render_product(cam, (512, 512))
# Get the local augmentations, either from function or from the registry
rgb_to_bgr_augm = None
gn_depth_augm = None
if USE_WARP:
rgb_to_bgr_augm = rep.annotators.Augmentation.from_function(rgb_to_bgr_wp)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_wp")
else:
rgb_to_bgr_augm = rep.annotators.Augmentation.from_function(rgb_to_bgr_np)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_np")
# Output directories
out_dir = os.path.join(os.getcwd(), "_out_augm_annot")
print(f"Writing data to: {out_dir}")
os.makedirs(out_dir, exist_ok=True)
# Register the annotator together with its augmentation
rep.annotators.register(
name="rgb_to_bgr_augm",
annotator=rep.annotators.augment(
source_annotator=rep.AnnotatorRegistry.get_annotator("rgb"),
augmentation=rgb_to_bgr_augm,
),
)
rgb_to_bgr_annot = rep.AnnotatorRegistry.get_annotator("rgb_to_bgr_augm")
depth_annot_1 = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
depth_annot_1.augment(gn_depth_augm)
depth_annot_2 = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
depth_annot_2.augment(gn_depth_augm, sigma=0.5)
rgb_to_bgr_annot.attach(rp)
depth_annot_1.attach(rp)
depth_annot_2.attach(rp)
# Generate a replicator graph to rotate the cube every capture frame
with rep.trigger.on_frame():
with red_cube:
rep.randomizer.rotation()
# Evaluate the graph
rep.orchestrator.preview()
# The orchestrator.step() function will trigger the randomization graph and feed annotators with new data
async def run_example_async(num_frames):
# Wait a few frames for the new stage to fully load
for _ in range(10):
await omni.kit.app.get_app().next_update_async()
# Measure the duration of capturing the data
start_time = time.time()
for i in range(num_frames):
await rep.orchestrator.step_async()
rgb_data = rgb_to_bgr_annot.get_data()
depth_data_1 = depth_annot_1.get_data()
depth_data_2 = depth_annot_2.get_data()
write_rgb(rgb_data, os.path.join(out_dir, f"annot_rgb_{i}"))
write_depth(depth_data_1, os.path.join(out_dir, f"annot_depth_1_{i}"))
write_depth(depth_data_2, os.path.join(out_dir, f"annot_depth_2_{i}"))
return start_time
def on_task_done(task):
start_time = task.result()
print(
f"The duration for capturing {NUM_FRAMES} frames using '{'warp' if USE_WARP else 'numpy'}' was: {time.time() - start_time:.4f} seconds, with an average of {(time.time() - start_time) / NUM_FRAMES:.4f} seconds per frame."
)
task = asyncio.ensure_future(run_example_async(NUM_FRAMES))
task.add_done_callback(on_task_done)
Writer Augmentation
The writer example will output gaussian noise augmented rgb and depth annotator data from a writer.
To be able to run the augmentation functions one needs to enable scripting in the settings:
Enable Scripting
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
For the rgb (LdrColor) annotator of the writer, we provide gaussian noise functions using numpy (CPU) and warp (GPU) kernels, applied on the RGB channels of the RGBA provided data format.
RGB Gaussian Noise using Warp and Numpy
def gaussian_noise_rgb_np(data_in, sigma: float, seed: int):
np.random.seed(seed)
data_in[:, :, 0] = data_in[:, :, 0] + np.random.randn(*data_in.shape[:-1]) * sigma
data_in[:, :, 1] = data_in[:, :, 1] + np.random.randn(*data_in.shape[:-1]) * sigma
data_in[:, :, 2] = data_in[:, :, 2] + np.random.randn(*data_in.shape[:-1]) * sigma
return data_in
@wp.kernel
def gaussian_noise_rgb_wp(
data_in: wp.array3d(dtype=wp.uint8), data_out: wp.array3d(dtype=wp.uint8), sigma: float, seed: int
):
i, j = wp.tid()
state = wp.rand_init(seed, wp.tid())
data_out[i, j, 0] = wp.uint8(wp.int32(data_in[i, j, 0]) + wp.int32(sigma * wp.randn(state)))
data_out[i, j, 1] = wp.uint8(wp.int32(data_in[i, j, 1]) + wp.int32(sigma * wp.randn(state)))
data_out[i, j, 2] = wp.uint8(wp.int32(data_in[i, j, 2]) + wp.int32(sigma * wp.randn(state)))
data_out[i, j, 3] = data_in[i, j, 3]
For the depth annotator of the writer, we provide gaussian noise functions using numpy (CPU) and warp (GPU) kernels, applied on the 2d array of float32 values. Note that the functions are registered in the AnnotatorRegistry for later access:
Depth Gaussian Noise using Warp and Numpy
def gaussian_noise_depth_np(data_in, sigma: float, seed: int):
np.random.seed(seed)
return data_in + np.random.randn(*data_in.shape) * sigma
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_np", rep.annotators.Augmentation.from_function(gaussian_noise_depth_np, sigma=0.1, seed=None)
)
@wp.kernel
def gaussian_noise_depth_wp(
data_in: wp.array2d(dtype=wp.float32), data_out: wp.array2d(dtype=wp.float32), sigma: float, seed: int
):
i, j = wp.tid()
state = wp.rand_init(seed, wp.tid())
data_out[i, j] = data_in[i, j] + sigma * wp.randn(state)
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_wp", rep.annotators.Augmentation.from_function(gaussian_noise_depth_wp, sigma=0.1, seed=None)
)
Here we access default (rgb) augmentations from replicator:
Built-in Replicator Augmentations
rgb_to_hsv_augm = rep.annotators.Augmentation.from_function(rep.augmentations_default.aug_rgb_to_hsv)
hsv_to_rgb_augm = rep.annotators.Augmentation.from_function(rep.augmentations_default.aug_hsv_to_rgb)
Furthermore the custom augmentations are created (warp or numpy), once using the function directly and once from the registry:
Augmentations using Warp or Numpy
gn_rgb_augm = None
gn_depth_augm = None
if USE_WARP:
gn_rgb_augm = rep.annotators.Augmentation.from_function(gaussian_noise_rgb_wp, sigma=6.0, seed=None)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_wp")
else:
gn_rgb_augm = rep.annotators.Augmentation.from_function(gaussian_noise_rgb_np, sigma=6.0, seed=None)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_np")
Finally the writer is created and initialized to use the rgb and depth (distance_to_camera) annotators. The built-in rgb
annotator is replaced by a new augmented one by using the same name="rgb"
name and adding it to the writer (add_annotator
). The augmented rgb annotator uses a composition by switching the data to hsv, adding gaussian noise, and switching back to rgb. The distance_to_camera
annotator is augmented by using the built-in augment_annotator
function:
Writer Augmentation
writer = rep.WriterRegistry.get("BasicWriter")
writer.initialize(output_dir=out_dir, rgb=True, distance_to_camera=True)
augmented_rgb_annot = rep.annotators.get("rgb").augment_compose(
[rgb_to_hsv_augm, gn_rgb_augm, hsv_to_rgb_augm], name="rgb"
)
writer.add_annotator(augmented_rgb_annot)
writer.augment_annotator("distance_to_camera", gn_depth_augm)
The example can be run as a standalone application using the following commands in the terminal:
./python.sh standalone_examples/replicator/augmentation/writer_augmentation.pyOptionally the following arguments can be used to change the default behavior:
--use_warp
– flag to use warp (GPU) instead of numpy (CPU) for the augmentation functions (default: False)
--num_frames
– the number of frames to be captured (default: 25)./python.sh standalone_examples/replicator/augmentation/writer_augmentation.py --use_warp --num_frames 25
import asyncio
import os
import time
import carb.settings
import numpy as np
import omni
import omni.replicator.core as rep
import warp as wp
from omni.isaac.nucleus import get_assets_root_path
from omni.isaac.core.utils.stage import open_stage
NUM_FRAMES = 25
USE_WARP = False
ENV_URL = "/Isaac/Environments/Grid/default_environment.usd"
# Enable scripts
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
# Gaussian noise augmentation on rgba data in numpy (CPU) and warp (GPU)
def gaussian_noise_rgb_np(data_in, sigma: float, seed: int):
np.random.seed(seed)
data_in[:, :, 0] = data_in[:, :, 0] + np.random.randn(*data_in.shape[:-1]) * sigma
data_in[:, :, 1] = data_in[:, :, 1] + np.random.randn(*data_in.shape[:-1]) * sigma
data_in[:, :, 2] = data_in[:, :, 2] + np.random.randn(*data_in.shape[:-1]) * sigma
return data_in
@wp.kernel
def gaussian_noise_rgb_wp(
data_in: wp.array3d(dtype=wp.uint8), data_out: wp.array3d(dtype=wp.uint8), sigma: float, seed: int
):
i, j = wp.tid()
state = wp.rand_init(seed, wp.tid())
data_out[i, j, 0] = wp.uint8(wp.int32(data_in[i, j, 0]) + wp.int32(sigma * wp.randn(state)))
data_out[i, j, 1] = wp.uint8(wp.int32(data_in[i, j, 1]) + wp.int32(sigma * wp.randn(state)))
data_out[i, j, 2] = wp.uint8(wp.int32(data_in[i, j, 2]) + wp.int32(sigma * wp.randn(state)))
data_out[i, j, 3] = data_in[i, j, 3]
# Gaussian noise augmentation on depth data in numpy (CPU) and warp (GPU)
def gaussian_noise_depth_np(data_in, sigma: float, seed: int):
np.random.seed(seed)
return data_in + np.random.randn(*data_in.shape) * sigma
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_np", rep.annotators.Augmentation.from_function(gaussian_noise_depth_np, sigma=0.1, seed=None)
)
@wp.kernel
def gaussian_noise_depth_wp(
data_in: wp.array2d(dtype=wp.float32), data_out: wp.array2d(dtype=wp.float32), sigma: float, seed: int
):
i, j = wp.tid()
state = wp.rand_init(seed, wp.tid())
data_out[i, j] = data_in[i, j] + sigma * wp.randn(state)
rep.AnnotatorRegistry.register_augmentation(
"gn_depth_wp", rep.annotators.Augmentation.from_function(gaussian_noise_depth_wp, sigma=0.1, seed=None)
)
# Setup the environment
assets_root_path = get_assets_root_path()
open_stage(assets_root_path + ENV_URL)
# Disable capture on play and async rendering
carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
carb.settings.get_settings().set("/omni/replicator/asyncRendering", False)
carb.settings.get_settings().set("/app/asyncRendering", False)
# Create a red cube and a render product from a camera looking at the cube from the top
red_mat = rep.create.material_omnipbr(diffuse=(1, 0, 0))
red_cube = rep.create.cube(position=(0, 0, 0.71), material=red_mat)
cam = rep.create.camera(position=(0, 0, 5), look_at=(0, 0, 0))
rp = rep.create.render_product(cam, (512, 512))
# Access default annotators from replicator
rgb_to_hsv_augm = rep.annotators.Augmentation.from_function(rep.augmentations_default.aug_rgb_to_hsv)
hsv_to_rgb_augm = rep.annotators.Augmentation.from_function(rep.augmentations_default.aug_hsv_to_rgb)
# Access the custom annotators as functions or from the registry
gn_rgb_augm = None
gn_depth_augm = None
if USE_WARP:
gn_rgb_augm = rep.annotators.Augmentation.from_function(gaussian_noise_rgb_wp, sigma=6.0, seed=None)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_wp")
else:
gn_rgb_augm = rep.annotators.Augmentation.from_function(gaussian_noise_rgb_np, sigma=6.0, seed=None)
gn_depth_augm = rep.AnnotatorRegistry.get_augmentation("gn_depth_np")
# Create a writer and apply the augmentations to its corresponding annotators
out_dir = os.path.join(os.getcwd(), "_out_augm_writer")
print(f"Writing data to: {out_dir}")
writer = rep.WriterRegistry.get("BasicWriter")
writer.initialize(output_dir=out_dir, rgb=True, distance_to_camera=True)
augmented_rgb_annot = rep.annotators.get("rgb").augment_compose(
[rgb_to_hsv_augm, gn_rgb_augm, hsv_to_rgb_augm], name="rgb"
)
writer.add_annotator(augmented_rgb_annot)
writer.augment_annotator("distance_to_camera", gn_depth_augm)
# Attach render product to writer
writer.attach([rp])
# Generate a replicator graph randomizing the cube's rotation every frame
with rep.trigger.on_frame():
with red_cube:
rep.randomizer.rotation()
# Evaluate the graph
rep.orchestrator.preview()
# The `step()` function will trigger the randomization graph and the writers
async def run_example_async(num_frames):
# Wait a few frames for the new stage to fully load
for _ in range(10):
await omni.kit.app.get_app().next_update_async()
# Measure the duration of capturing the data
start_time = time.time()
for _ in range(num_frames):
await rep.orchestrator.step_async()
return start_time
def on_task_done(task):
start_time = task.result()
print(
f"The duration for capturing {NUM_FRAMES} frames using '{'warp' if USE_WARP else 'numpy'}' was: {time.time() - start_time:.4f} seconds, with an average of {(time.time() - start_time) / NUM_FRAMES:.4f} seconds per frame."
)
task = asyncio.ensure_future(run_example_async(NUM_FRAMES))
task.add_done_callback(on_task_done)