9.9. Reinforcement Learning using Stable Baselines

9.9.1. Learning Objectives

This tutorial shows how to train a policy/ controller for Jetbot in Omniverse Isaac Sim to drive towards a random goal cube using its ground truth observations such as positions, orientations and velocities. After this tutorial, you can use stable baselines using custom environments in Omniverse Isaac Sim.

10-15 Minute Tutorial, Not Including Training time

9.9.2. Getting Started

Prerequisites

9.9.3. The Environment

This is a simple environment to showcase how to use stable baselines with Omniverse Isaac Sim. The environment consists of a ground plane, Jetbot and a goal cube. The goal is to train a policy to drive the Jetbot to a random goal cube using its ground truth observations.

  • The action space is [forward_velocity, angular_velocity] for the Jetbot wheels.

  • The observation space is a 16 dim vector consiting of jetbot_world_position, jetbot_world_orientation, jetbot_linear_velocity, jetbot_angular_velocity and goal_world_position.

  • At the beginning of each episode (i.e.: on reset) the goal cube is randomized.

  • The reward is the difference between the previous distance to the goal before executing the current action and the current distance to the goal (i.e. is the Jetbot is getting closer or further to the goal?)

9.9.4. Train the Policy

  1. Install dependencies in the isaac sim python environment

    ./python.sh -m pip install stable-baselines3 tensorboard
    
  2. Train the policy by running the following script.

    ./python.sh standalone_examples/api/omni.isaac.jetbot/stable_baselines_example/train.py
    
  3. Launch tensorboard to look at how the policy training progresses.

    ./python.sh .tensorboard --logdir ./
    
../_images/isaac_sim_tensorboard.png

9.9.4.1. Code Overview

 1from env import JetBotEnv
 2from stable_baselines3 import PPO
 3from stable_baselines3.ppo import CnnPolicy
 4from stable_baselines3.common.callbacks import CheckpointCallback
 5import torch as th
 6
 7# Log directory of the tensorboard files to visualize the training and for the final policy as well
 8log_dir = "./cnn_policy"
 9# set headles to False to visualize the policy while training
10my_env = JetBotEnv(headless=True)
11
12
13policy_kwargs = dict(activation_fn=th.nn.Tanh, net_arch=[16, dict(pi=[128, 128, 128], vf=[128, 128, 128])]) # Policy params
14policy = CnnPolicy
15total_timesteps = 500000
16
17# Saves a checkpoint policy in the same log directory
18checkpoint_callback = CheckpointCallback(save_freq=10000, save_path=log_dir, name_prefix="jetbot_policy_checkpoint")
19# PPO algorithm params
20model = PPO(
21    policy,
22    my_env,
23    policy_kwargs=policy_kwargs,
24    verbose=1,
25    n_steps=2560,
26    batch_size=64,
27    learning_rate=0.000125,
28    gamma=0.9,
29    ent_coef=7.5e-08,
30    clip_range=0.3,
31    n_epochs=5,
32    device="cuda",
33    gae_lambda=1.0,
34    max_grad_norm=0.9,
35    vf_coef=0.95,
36    max_grad_norm=10,
37    tensorboard_log=log_dir,
38)
39model.learn(total_timesteps=total_timesteps, callback=[checkpoint_callback])
40
41model.save(log_dir + "/jetbot_policy") # Saves the final policy
42
43my_env.close() # Closes the environment

9.9.5. Visualize the Trained Policy

  1. Visualize the policy by running the following script

    ./python.sh standalone_examples/api/omni.isaac.jetbot/stable_baselines_example/eval.py
    
  • Policy After 100K steps

../_images/isaac_sim_100k_policy.gif
  • Policy After 300K steps

../_images/isaac_sim_300k.gif
  • Policy After 500K steps

../_images/isaac_sim_trained_policy.gif

9.9.5.1. Code Overview

 1from env import JetBotEnv
 2from stable_baselines3 import PPO
 3
 4# Choose the policy path to visualize
 5policy_path = "./cnn_policy/jetbot_policy.zip"
 6
 7my_env = JetBotEnv(headless=False)
 8model = PPO.load(policy_path)
 9
10for _ in range(20):
11    obs = my_env.reset()
12    done = False
13    while not done:
14        actions, _ = model.predict(observation=obs, deterministic=True)
15        obs, reward, done, info = my_env.step(actions)
16        my_env.render()
17
18my_env.close()

9.9.6. Create a Custom Environment

Have a look at the environment code at standalone_examples/api/omni.isaac.jetbot/stable_baselines_example/env.py to create similar environments.

  1import gym
  2from gym import spaces
  3import numpy as np
  4import math
  5import carb
  6
  7
  8class JetBotEnv(gym.Env):
  9    metadata = {"render.modes": ["human"]}
 10
 11    def __init__(
 12        self,
 13        skip_frame=1,
 14        physics_dt=1.0 / 60.0,
 15        rendering_dt=1.0 / 60.0,
 16        max_episode_length=256,
 17        seed=0,
 18        headless=True,
 19    ) -> None:
 20        from omni.isaac.kit import SimulationApp
 21
 22        self.headless = headless
 23        self._simulation_app = SimulationApp({"headless": self.headless, "anti_aliasing": 0})
 24        self._skip_frame = skip_frame
 25        self._dt = physics_dt * self._skip_frame
 26        self._max_episode_length = max_episode_length
 27        self._steps_after_reset = int(rendering_dt / physics_dt)
 28        from omni.isaac.core import World
 29        from omni.isaac.wheeled_robots.robots import WheeledRobot
 30        from omni.isaac.wheeled_robots.controllers.differential_controller import DifferentialController
 31        from omni.isaac.core.objects import VisualCuboid
 32        from omni.isaac.core.utils.nucleus import get_assets_root_path
 33
 34        self._my_world = World(physics_dt=physics_dt, rendering_dt=rendering_dt, stage_units_in_meters=1.0)
 35        self._my_world.scene.add_default_ground_plane()
 36        assets_root_path = get_assets_root_path()
 37        if assets_root_path is None:
 38            carb.log_error("Could not find Isaac Sim assets folder")
 39            return
 40        jetbot_asset_path = assets_root_path + "/Isaac/Robots/Jetbot/jetbot.usd"
 41        self.jetbot = self._my_world.scene.add(
 42            WheeledRobot(
 43                prim_path="/jetbot",
 44                name="my_jetbot",
 45                wheel_dof_names=["left_wheel_joint", "right_wheel_joint"],
 46                create_robot=True,
 47                usd_path=jetbot_asset_path,
 48                position=np.array([0, 0.0, 0.03]),
 49                orientation=np.array([1.0, 0.0, 0.0, 0.0]),
 50            )
 51        )
 52        self.jetbot_controller = DifferentialController(name="simple_control", wheel_radius=0.0325, wheel_base=0.1125)
 53        self.goal = self._my_world.scene.add(
 54            VisualCuboid(
 55                prim_path="/new_cube_1",
 56                name="visual_cube",
 57                position=np.array([0.60, 0.30, 0.05]),
 58                size=0.1,
 59                color=np.array([1.0, 0, 0]),
 60            )
 61        )
 62        self.seed(seed)
 63        self.reward_range = (-float("inf"), float("inf"))
 64        gym.Env.__init__(self)
 65        self.action_space = spaces.Box(low=-1, high=1.0, shape=(2,), dtype=np.float32)
 66        self.observation_space = spaces.Box(low=float("inf"), high=float("inf"), shape=(16,), dtype=np.float32)
 67
 68        self.max_velocity = 1
 69        self.max_angular_velocity = math.pi
 70        self.reset_counter = 0
 71        return
 72
 73    def get_dt(self):
 74        return self._dt
 75
 76    def step(self, action):
 77        previous_jetbot_position, _ = self.jetbot.get_world_pose()
 78        # action forward velocity , angular velocity on [-1, 1]
 79        raw_forward = action[0]
 80        raw_angular = action[1]
 81
 82        # we want to force the jetbot to always drive forward
 83        # so we transform to [0,1].  we also scale by our max velocity
 84        forward = (raw_forward + 1.0) / 2.0
 85        forward_velocity = forward * self.max_velocity
 86
 87        # we scale the angular, but leave it on [-1,1] so the
 88        # jetbot can remain an ambiturner.
 89        angular_velocity = raw_angular * self.max_angular_velocity
 90
 91        # we apply our actions to the jetbot
 92        for i in range(self._skip_frame):
 93            self.jetbot.apply_wheel_actions(
 94                self.jetbot_controller.forward(command=[forward_velocity, angular_velocity])
 95            )
 96            self._my_world.step(render=False)
 97
 98        observations = self.get_observations()
 99        info = {}
100        done = False
101        if self._my_world.current_time_step_index - self._steps_after_reset >= self._max_episode_length:
102            done = True
103        goal_world_position, _ = self.goal.get_world_pose()
104        current_jetbot_position, _ = self.jetbot.get_world_pose()
105        previous_dist_to_goal = np.linalg.norm(goal_world_position - previous_jetbot_position)
106        current_dist_to_goal = np.linalg.norm(goal_world_position - current_jetbot_position)
107        reward = previous_dist_to_goal - current_dist_to_goal
108        if current_dist_to_goal < 0.1:
109            done = True
110        return observations, reward, done, info
111
112    def reset(self):
113        self._my_world.reset()
114        self.reset_counter = 0
115        # randomize goal location in circle around robot
116        alpha = 2 * math.pi * np.random.rand()
117        r = 1.00 * math.sqrt(np.random.rand()) + 0.20
118        self.goal.set_world_pose(np.array([math.sin(alpha) * r, math.cos(alpha) * r, 0.05]))
119        observations = self.get_observations()
120        return observations
121
122    def get_observations(self):
123        self._my_world.render()
124        jetbot_world_position, jetbot_world_orientation = self.jetbot.get_world_pose()
125        jetbot_linear_velocity = self.jetbot.get_linear_velocity()
126        jetbot_angular_velocity = self.jetbot.get_angular_velocity()
127        goal_world_position, _ = self.goal.get_world_pose()
128        return np.concatenate(
129            [
130                jetbot_world_position,
131                jetbot_world_orientation,
132                jetbot_linear_velocity,
133                jetbot_angular_velocity,
134                goal_world_position,
135            ]
136        )
137
138    def render(self, mode="human"):
139        return
140
141    def close(self):
142        self._simulation_app.close()
143        return
144
145    def seed(self, seed=None):
146        self.np_random, seed = gym.utils.seeding.np_random(seed)
147        np.random.seed(seed)
148        return [seed]

9.9.7. Summary

This tutorial covered the following topics:

  1. Using stable baselines with Isaac Sim.

  2. Training a PPO policy for jetbot to drive to a random cube.

  3. Visualizing a trained policy.

  4. Launching a tensorboard to visualize the training progress.