10. Reinforcement Learning using Stable Baselines

10.1. Learning Objectives

This tutorial shows how to train a policy/ controller for jetbot in Omniverse Isaac Sim to drive towards a random goal cube using its camera. After this tutorial, you can use stable baselines using custom environments in Omniverse Isaac Sim.

10-15 Minute Tutorial, Not Including Training time

10.2. Getting Started

Prerequisites

  • Please review the Required Tutorial series, prior to beginning this tutorial.

10.3. The Environment

This is a simple environment to showcase how to use stable baselines with Omniverse Isaac Sim. The environment consists of a ground plane, jetbot and a goal cube.

The goal is to train a policy to drive the jetbot to a random goal cube using it’s camera input.

  • The action space is velocity targets for the jetbot wheels.

  • The observation space is a 128 X 128 RGB egocentric image from the jetbot’s camera.

  • At the beginning of each episode (i.e: on reset) the goal cube is randomized.

  • The reward is the difference between the previous distance to goal before executing the current action and the current distance to the goal (i.e is the jetbot is getting closer or further to the goal?)

10.4. Train the Policy

  • Navigate to the standalone example located at standalone_examples/api/omni.isaac.jetbot/stable_baselines_example

    cd ~/.local/share/ov/pkg/isaac_sim-2021.2.1/
    cd standalone_examples/api/omni.isaac.jetbot/stable_baselines_example
    
  • Train the policy by running the following script

    ~/.local/share/ov/pkg/isaac_sim-2021.2.1/python.sh train.py
    
  • Launch tensorboard to look at how the policy training progresses.

    ~/.local/share/ov/pkg/isaac_sim-2021.2.1/python.sh ~/.local/share/ov/pkg/isaac_sim-2021.2.1/tensorboard --logdir ./
    
../_images/isaac_sim_tensorboard.png

10.4.1. Code Overview

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from env import JetBotEnv
from stable_baselines3 import PPO
from stable_baselines3.ppo import CnnPolicy
from stable_baselines3.common.callbacks import CheckpointCallback
import torch as th

# Log directory of the tensorboard files to visualize the training and for the final policy as well
log_dir = "./cnn_policy"
# set headles to False to visualize the policy while training
my_env = JetBotEnv(headless=True)


policy_kwargs = dict(activation_fn=th.nn.Tanh, net_arch=[16, dict(pi=[64, 32], vf=[64, 32])]) # Policy params
policy = CnnPolicy
total_timesteps = 500000

# Saves a checkpoint policy in the same log directory
checkpoint_callback = CheckpointCallback(save_freq=10000, save_path=log_dir, name_prefix="jetbot_policy_checkpoint")
# PPO algorithm params
model = PPO(
    policy,
    my_env,
    policy_kwargs=policy_kwargs,
    verbose=1,
    n_steps=10000,
    batch_size=1000,
    learning_rate=0.00025,
    gamma=0.9995,
    device="cuda",
    ent_coef=0,
    vf_coef=0.5,
    max_grad_norm=10,
    tensorboard_log=log_dir,
)
model.learn(total_timesteps=total_timesteps, callback=[checkpoint_callback])

model.save(log_dir + "/jetbot_policy") # Saves the final policy

my_env.close() # Closes the environment

10.5. Visualize the Trained Policy

  • Navigate to the standalone example located at standalone_examples/api/omni.isaac.jetbot/stable_baselines_example

    cd ~/.local/share/ov/pkg/isaac_sim-2021.2.1/
    cd standalone_examples/api/omni.isaac.jetbot/stable_baselines_example
    
  • Visualize the policy by running the following script

    ~/.local/share/ov/pkg/isaac_sim-2021.2.1/python.sh eval.py
    
  • Policy After 100K steps

../_images/isaac_sim_100k_policy.gif
  • Policy After 300K steps

../_images/isaac_sim_300k.gif
  • Policy After 500K steps

../_images/isaac_sim_trained_policy.gif

10.5.1. Code Overview

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from env import JetBotEnv
from stable_baselines3 import PPO

# Choose the policy path to visualize
policy_path = "./cnn_policy/jetbot_policy.zip"

my_env = JetBotEnv(headless=False)
model = PPO.load(policy_path)

for _ in range(20):
    obs = my_env.reset()
    done = False
    while not done:
        actions, _ = model.predict(observation=obs, deterministic=True)
        obs, reward, done, info = my_env.step(actions)
        my_env.render()

my_env.close()

10.6. Create a Custom Environment

Have a look at the environment code at standalone_examples/api/omni.isaac.jetbot/stable_baselines_example/env.py to create similar environments.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import gym
from gym import spaces
import numpy as np
import math


class JetBotEnv(gym.Env):
    metadata = {"render.modes": ["human"]}

    def __init__(
        self,
        skip_frame=1,
        physics_dt=1.0 / 60.0,
        rendering_dt=1.0 / 60.0,
        max_episode_length=1000,
        seed=0,
        headless=True,
    ) -> None:
        from omni.isaac.kit import SimulationApp

        self.headless = headless
        self._simulation_app = SimulationApp({"headless": self.headless, "anti_aliasing": 0})
        self._skip_frame = skip_frame
        self._dt = physics_dt * self._skip_frame
        self._max_episode_length = max_episode_length
        self._steps_after_reset = int(rendering_dt / physics_dt)
        from omni.isaac.core import World
        from omni.isaac.jetbot import Jetbot
        from omni.isaac.core.objects import VisualCuboid

        self._my_world = World(physics_dt=physics_dt, rendering_dt=rendering_dt, stage_units_in_meters=0.01)
        self._my_world.scene.add_default_ground_plane()
        self.jetbot = self._my_world.scene.add(
            Jetbot(
                prim_path="/jetbot",
                name="my_jetbot",
                position=np.array([0, 0.0, 2.0]),
                orientation=np.array([1.0, 0.0, 0.0, 0.0]),
            )
        )
        self.goal = self._my_world.scene.add(
            VisualCuboid(
                prim_path="/new_cube_1",
                name="visual_cube",
                position=np.array([60, 30, 2.5]),
                size=np.array([5, 5, 5]),
                color=np.array([1.0, 0, 0]),
            )
        )
        self.seed(seed)
        self.sd_helper = None
        self.viewport_window = None
        self._set_camera()
        self.reward_range = (-float("inf"), float("inf"))
        gym.Env.__init__(self)
        self.action_space = spaces.Box(low=-10.0, high=10.0, shape=(2,), dtype=np.float32)
        self.observation_space = spaces.Box(low=0, high=255, shape=(128, 128, 3), dtype=np.uint8)
        return

    def get_dt(self):
        return self._dt

    def step(self, action):
        previous_jetbot_position, _ = self.jetbot.get_world_pose()
        for i in range(self._skip_frame):
            from omni.isaac.core.utils.types import ArticulationAction

            self.jetbot.apply_wheel_actions(ArticulationAction(joint_velocities=action * 10.0))
            self._my_world.step(render=False)
        observations = self.get_observations()
        info = {}
        done = False
        if self._my_world.current_time_step_index - self._steps_after_reset >= self._max_episode_length:
            done = True
        goal_world_position, _ = self.goal.get_world_pose()
        current_jetbot_position, _ = self.jetbot.get_world_pose()
        previous_dist_to_goal = np.linalg.norm(goal_world_position - previous_jetbot_position)
        current_dist_to_goal = np.linalg.norm(goal_world_position - current_jetbot_position)
        reward = previous_dist_to_goal - current_dist_to_goal
        return observations, reward, done, info

    def reset(self):
        self._my_world.reset()
        # randomize goal location in circle around robot
        alpha = 2 * math.pi * np.random.rand()
        r = 100 * math.sqrt(np.random.rand()) + 20
        self.goal.set_world_pose(np.array([math.sin(alpha) * r, math.cos(alpha) * r, 2.5]))
        observations = self.get_observations()
        return observations

    def get_observations(self):
        self._my_world.render()
        # wait_for_sensor_data is recommended when capturing multiple sensors, in this case we can set it to zero as we only need RGB
        gt = self.sd_helper.get_groundtruth(
            ["rgb"], self.viewport_window, verify_sensor_init=False, wait_for_sensor_data=0
        )
        return gt["rgb"][:, :, :3]

    def render(self, mode="human"):
        return

    def close(self):
        self._simulation_app.close()
        return

    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        np.random.seed(seed)
        return [seed]

    def _set_camera(self):
        import omni.kit
        from omni.isaac.synthetic_utils import SyntheticDataHelper

        camera_path = "/jetbot/chassis/rgb_camera/jetbot_camera"
        if self.headless:
            viewport_handle = omni.kit.viewport.get_viewport_interface()
            viewport_handle.get_viewport_window().set_active_camera(str(camera_path))
            viewport_window = viewport_handle.get_viewport_window()
            self.viewport_window = viewport_window
            viewport_window.set_texture_resolution(128, 128)
        else:
            viewport_handle = omni.kit.viewport.get_viewport_interface().create_instance()
            new_viewport_name = omni.kit.viewport.get_viewport_interface().get_viewport_window_name(viewport_handle)
            viewport_window = omni.kit.viewport.get_viewport_interface().get_viewport_window(viewport_handle)
            viewport_window.set_active_camera(camera_path)
            viewport_window.set_texture_resolution(128, 128)
            viewport_window.set_window_pos(1000, 400)
            viewport_window.set_window_size(420, 420)
            self.viewport_window = viewport_window
        self.sd_helper = SyntheticDataHelper()
        self.sd_helper.initialize(sensor_names=["rgb"], viewport=self.viewport_window)
        self._my_world.render()
        self.sd_helper.get_groundtruth(["rgb"], self.viewport_window)
        return

10.7. Summary

This tutorial covered the following topics:

  1. Using stable baselines with Isaac Sim.

  2. Training a PPO policy for jetbot to drive to a random cube.

  3. Visualize a trained policy.

  4. Launch tensorboard to visualize the training progress.