8. Lula RMPflow

8.1. Learning Objectives

This tutorial shows how the RMPflow class in the Motion Generation extension may be used to generate smooth motions to reach task-space targets while avoiding dynamic obstacles. This tutorial first shows how RmpFlow can be directly instantiated and used to generate motions, then shows how RmpFlow can be loaded and used simply on supported robots , and finally, demonstrates how built-in debugging features can improve easy of use and integration.

8.2. Getting Started

Prerequisites

This tutorial includes multiple variations of a standalone script that loads a Franka robot and has it follow a target. Each of these variations may be saved as a Python file and run as demonstrated in the Standalone Application tutorial.

8.3. Generating Motions with an RMPflow Instance

RMPflow is used heavily throughout Omniverse Isaac Sim for controlling robot manipulators. As documented in RMPflow Configuration, there are three configuration files needed to directly instantiate the RmpFlow class directly. Once these configuration files are loaded and an end effector target has been specified, actions can be computed to move the robot to the desired target.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp({"headless": False})

from omni.isaac.franka.tasks import FollowTarget
from omni.isaac.motion_generation.lula import RmpFlow
from omni.isaac.motion_generation import ArticulationMotionPolicy
from omni.isaac.core import World

from omni.isaac.core.utils.extensions import get_extension_path_from_name
import os

my_world = World(stage_units_in_meters=1.0)
my_task = FollowTarget(name="follow_target_task")
my_world.add_task(my_task)
my_world.reset()
task_params = my_world.get_task("follow_target_task").get_params()
franka_name = task_params["robot_name"]["value"]
target_name = task_params["target_name"]["value"]
my_franka = my_world.scene.get_object(franka_name)

# RMPflow config files for supported robots are stored in the motion_generation extension under "/motion_policy_configs"
mg_extension_path = get_extension_path_from_name("omni.isaac.motion_generation")
rmp_config_dir = os.path.join(mg_extension_path, "motion_policy_configs")

#Initialize an RmpFlow object
rmpflow = RmpFlow(
    robot_description_path = rmp_config_dir + "/franka/rmpflow/robot_descriptor.yaml",
    urdf_path = rmp_config_dir + "/franka/lula_franka_gen.urdf",
    rmpflow_config_path = rmp_config_dir + "/franka/rmpflow/franka_rmpflow_common.yaml",
    end_effector_frame_name = "right_gripper",
    maximum_substep_size = 0.00334
)
#Use the ArticulationMotionPolicy wrapper object to connect rmpflow to the Franka robot articulation.
physics_dt = 1/60.
articulation_rmpflow = ArticulationMotionPolicy(my_franka,rmpflow,physics_dt)

articulation_controller = my_franka.get_articulation_controller()
while simulation_app.is_running():
    my_world.step(render=True)
    if my_world.is_playing():
        if my_world.current_time_step_index == 0:
            my_world.reset()
        observations = my_world.get_observations()

        #Set rmpflow target to be the current position of the target cube.
        rmpflow.set_end_effector_target(
            target_position=observations[target_name]["position"],
            target_orientation=observations[target_name]["orientation"],
        )

        actions = articulation_rmpflow.get_next_articulation_action()
        articulation_controller.apply_action(actions)

simulation_app.close()

RMPflow is an implementation of the Motion Policy interface. Any MotionPolicy can be passed to an Articulation Motion Policy in order to start moving a robot on the USD stage. On line 27, an instance of RmpFlow is instantiated with the required configuration information. The ArticulationMotionPolicy created on line 36 acts as a translational layer between RmpFlow and the simulated Franka robot Articulation. The user may interact with RmpFlow directly to communicate the world state, set an end effector target, or modify internal settings. On each frame, an end effector target is passed directly to the RmpFlow object (line 47). The ArticulationMotionPolicy is used on line 52 to compute an action that can be directly consumed by the Franka Articulation.

../_images/isaac_sim_rmpflow_basic_target.gif

8.3.1. World State

As a Motion Policy, RmpFlow is capable of dynamic collision avoidance while navigating the end effector to a target. The world state may be changing over time while RmpFlow is navigating to its target. Objects created with the omni.isaac.core.objects package (see Inputs: World State) can be registered with RmpFlow and the policy will automatically avoid collisions with these obstacles. RmpFlow is triggered to query the current state of all tracked objects whenever RmpFlow.update_world() is called.

RmpFlow may also be informed about a change in the robot base pose on a given frame by calling RmpFlow.set_robot_base_pose(). As object positions are queried in world coordinates, it is critical to use this function if the base of the robot is moved within the USD stage.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp({"headless": False})

from omni.isaac.franka.tasks import FollowTarget
from omni.isaac.motion_generation.lula import RmpFlow
from omni.isaac.motion_generation import ArticulationMotionPolicy
from omni.isaac.core import World
from omni.isaac.core.objects import sphere
import numpy as np

from omni.isaac.core.utils.extensions import get_extension_path_from_name
import os

my_world = World(stage_units_in_meters=1.0)
my_task = FollowTarget(name="follow_target_task")
my_world.add_task(my_task)
my_world.reset()
task_params = my_world.get_task("follow_target_task").get_params()
franka_name = task_params["robot_name"]["value"]
target_name = task_params["target_name"]["value"]
my_franka = my_world.scene.get_object(franka_name)

# RMPflow config files for supported robots are stored in the motion_generation extension under "/motion_policy_configs"
mg_extension_path = get_extension_path_from_name("omni.isaac.motion_generation")
rmp_config_dir = os.path.join(mg_extension_path, "motion_policy_configs")

#Initialize an RmpFlow object
rmpflow = RmpFlow(
    robot_description_path = rmp_config_dir + "/franka/rmpflow/robot_descriptor.yaml",
    urdf_path = rmp_config_dir + "/franka/lula_franka_gen.urdf",
    rmpflow_config_path = rmp_config_dir + "/franka/rmpflow/franka_rmpflow_common.yaml",
    end_effector_frame_name = "right_gripper",
    maximum_substep_size = 0.00334
)
#Use the ArticulationMotionPolicy wrapper object to connect rmpflow to the Franka robot articulation.
physics_dt = 1/60.
articulation_rmpflow = ArticulationMotionPolicy(my_franka,rmpflow,physics_dt)

#create and add an obstacle from omni.isaac.core.objects
sphere_obstacle = sphere.FixedSphere("/sphere_obstacle",radius=.03,position=np.array([0.1, 0.0, 0.5]),color=np.array([0.,0.,1.]))
rmpflow.add_obstacle(sphere_obstacle)

articulation_controller = my_franka.get_articulation_controller()
while simulation_app.is_running():
    my_world.step(render=True)
    if my_world.is_playing():
        if my_world.current_time_step_index == 0:
            my_world.reset()

        observations = my_world.get_observations()

        #Set rmpflow target to be the current position of the target cube.
        rmpflow.set_end_effector_target(
            target_position=observations[target_name]["position"],
            target_orientation=observations[target_name]["orientation"],
        )
        #Track any movements of the sphere obstacle
        rmpflow.update_world()

        #Track any movements of the robot base
        robot_base_translation,robot_base_orientation = my_franka.get_world_pose()
        rmpflow.set_robot_base_pose(robot_base_translation,robot_base_orientation)

        actions = articulation_rmpflow.get_next_articulation_action()
        articulation_controller.apply_action(actions)

simulation_app.close()

In lines 41 and 42, a sphere is created and registered as an obstacle with RmpFlow. On each frame, RmpFlow.update_world() is called on line 59. This triggers RmpFlow to query the current position of the sphere to account for any movement.

On lines 62-63, the current position of the robot base is queried and passed to RmpFlow. This step is separated from other world state because it is often unnecessary (when the robot base never moves from the origin), or this step may require extra consideration (e.g. RmpFlow is controlling an arm that is mounted on a moving base).

../_images/isaac_sim_rmpflow_world_aware.gif

8.4. Loading RMPflow for Supported Robots

In the previous sections, we see that RmpFlow requires five arguments to be initialized. Three of these arguments are file paths to required configuration files. The end_effector_frame_name argument specifies what frame on the robot (from the frames found in the referenced URDF file) should be considered the end effector. The maximum_substep_size argument specifies a maximum step-size when internally performing Euler Integration.

For manipulators in the Omniverse Isaac Sim library, appropriate config information for loading RmpFlow can be found in the omni.isaac.motion_generation extension. This information is indexed by robot name, and can be accessed simply. The following script allows the user to load different robots from the Omniverse Isaac Sim library in a simple follow-target scenario with an obstacle.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp({"headless": False})

from omni.isaac.motion_generation.lula import RmpFlow
from omni.isaac.motion_generation import ArticulationMotionPolicy
from omni.isaac.core.robots import Robot
from omni.isaac.core.objects import cuboid
from omni.isaac.core import World
from omni.isaac.core.utils.stage import add_reference_to_stage
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.motion_generation.interface_config_loader import (
    get_supported_robot_policy_pairs,
    load_supported_motion_policy_config,
)
import numpy as np
from pprint import pprint
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
    "-v",
    "--verbose",
    action="store_true",
    default=True,
    help="Print useful runtime information such as the list of supported robots",
)
parser.add_argument(
    "--robot-name",
    type=str,
    default="Cobotta_Pro_900",
    help="Key to use to access RMPflow config files for a specific robot.",
)
parser.add_argument(
    "--usd-path",
    type=str,
    default="/Isaac/Robots/Denso/cobotta_pro_900.usd",
    help="Path to supported robot on Nucleus Server",
)
parser.add_argument("--add-orientation-target", action="store_true", default=False, help="Add orientation target")
args = parser.parse_args()

robot_name = args.robot_name
usd_path = get_assets_root_path() + args.usd_path
prim_path = "/my_robot"

add_reference_to_stage(usd_path=usd_path, prim_path=prim_path)

my_world = World(stage_units_in_meters=1.0)

robot = my_world.scene.add(Robot(prim_path=prim_path, name=robot_name))

if args.verbose:
    print("Names of supported robots with provided RMPflow config")
    print("\t", list(get_supported_robot_policy_pairs().keys()))
    print()

# The load_supported_motion_policy_config() function is currently the simplest way to load supported robots.
# In the future, Isaac Sim will provide a centralized registry of robots with Lula robot description files
# and RMP configuration files stored alongside the robot USD.
rmp_config = load_supported_motion_policy_config(robot_name, "RMPflow")

if args.verbose:
    print(
        f"Successfully referenced RMPflow config for {robot_name}.  Using the following parameters to initialize RmpFlow class:"
    )
    pprint(rmp_config)
    print()

# Initialize an RmpFlow object
rmpflow = RmpFlow(**rmp_config)

physics_dt = 1 / 60.0
articulation_rmpflow = ArticulationMotionPolicy(robot, rmpflow, physics_dt)

articulation_controller = robot.get_articulation_controller()

# Make a target to follow
target_cube = cuboid.VisualCuboid(
    "/World/target", position=np.array([0.5, 0, 0.5]), color=np.array([1.0, 0, 0]), size=0.1
)

# Make an obstacle to avoid
obstacle = cuboid.VisualCuboid(
    "/World/obstacle", position=np.array([0.8, 0, 0.5]), color=np.array([0, 1.0, 0]), size=0.1
)
rmpflow.add_obstacle(obstacle)

my_world.reset()
while simulation_app.is_running():
    my_world.step(render=True)
    if my_world.is_playing():
        if my_world.current_time_step_index == 0:
            my_world.reset()

        # Set rmpflow target to be the current position of the target cube.
        if args.add_orientation_target:
            target_orientation = target_cube.get_world_pose()[1]
        else:
            target_orientation = None

        rmpflow.set_end_effector_target(
            target_position=target_cube.get_world_pose()[0], target_orientation=target_orientation
        )

        # Query the current obstacle position
        rmpflow.update_world()

        actions = articulation_rmpflow.get_next_articulation_action()
        articulation_controller.apply_action(actions)

simulation_app.close()
On lines 20-40, command line arguments are specified:

-v,–verbose: If True, prints out useful runtime information such as the list of supported robot names that map to RMPflow config files. Defaults to ‘True’

–robot-name: Name of robot that maps to the stored RMPflow config. Defaults to “Cobotta_Pro_900”

–usd-path: Path to robot USD asset on the Nucleus Server. Defaults to “/Isaac/Robots/Denso/cobotta_pro_900.usd”. The typical location of a specific robot is under “/Isaac/Robots/{manufacturer_name}/{robot_name}/{robot_name}.usd”

–add-orientation-target: Add the orientation of the target cube to the RMPflow target. Defaults to False.

On line 61, the configuration information is loaded using the robot_name argument, and on line 71, the configuration information is passed to RmpFlow.

This script is verbose by default, and running with the default command line arguments produces the following output:

Names of supported robots with provided RMPflow config [‘Franka’, ‘UR3’, ‘UR3e’, ‘UR5’, ‘UR5e’, ‘UR10’, ‘UR10e’, ‘UR16e’, ‘Rizon4’, ‘DofBot’, ‘Cobotta_Pro_900’, ‘Cobotta_Pro_1300’, ‘RS007L’, ‘RS007N’, ‘RS013N’, ‘RS025N’, ‘RS080N’, ‘FestoCobot’]

The names of supported robots are suitable for the –robot-name argument, and each must correctly correspond to the robot USD path. In a future release, configuration data for supported robots will be centralized such that only a single argument will be required. The specific method of accessing supported robot RMPflow configs provided here will then be deprecated.

A copy of this script can be found in the Omniverse Isaac Sim standalone examples. It can be used to load the Franka robot with

python.sh standalone_examples/api/omni.isaac.manipulators/rmpflow_supported_robots/supported_robot_follow_target_example.py --robot-name Franka --usd-path "/Isaac/Robots/Franka/franka.usd"
../_images/isaac_sim_rmpflow_target_with_obstacle.gif

8.5. Debugging Features

The RmpFlow class has contains debugging features that are not generally available in the Motion Policy interface. These debugging features make it simple to decouple the simulator from the RmpFlow algorithm to help diagnose any undesirable behaviors that are encountered (RMPflow Debugging Features).

RmpFlow uses collision spheres internally to avoid collisions with external objects. These spheres can be visualized with the RmpFlow.visualize_collision_spheres() function. This helps to determine whether RmpFlow has a reasonable representation of the simulated robot.

The visualization can be used alongside a flag RmpFlow.set_ignore_state_updates(True) to ignore state updates from the robot Articulation and instead assume that robot joint targets returned by RmpFlow are always perfectly achieved. This causes RmpFlow to compute a robot path over time that is independent of the simulated robot Articulation. At each timestep, RmpFlow returns joint targets that are passed to the robot Articulation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp({"headless": False})

from omni.isaac.franka.tasks import FollowTarget
from omni.isaac.franka.controllers import RMPFlowController
from omni.isaac.core import World
import numpy as np

my_world = World(stage_units_in_meters=1.0)
my_task = FollowTarget(name="follow_target_task")
my_world.add_task(my_task)
my_world.reset()
task_params = my_world.get_task("follow_target_task").get_params()
franka_name = task_params["robot_name"]["value"]
target_name = task_params["target_name"]["value"]
my_franka = my_world.scene.get_object(franka_name)

rmpflow_controller = RMPFlowController(name="target_follower_controller", robot_articulation=my_franka)

#The underlying RmpFlow instance is retrieved from the RMPFlowController
rmpflow = rmpflow_controller.get_motion_policy()

#Set RmpFlow to perform an internal rollout of the robot joint state, ignoring updates from the simulator
rmpflow.set_ignore_state_updates(True)

#Set RmpFlow to visualize where it believes the robot to be
rmpflow.visualize_collision_spheres()

articulation_controller = my_franka.get_articulation_controller()

bad_proportional_gains = articulation_controller.get_gains()[0]/50
articulation_controller.set_gains(kps = bad_proportional_gains)

while simulation_app.is_running():
    my_world.step(render=True)
    if my_world.is_playing():
        if my_world.current_time_step_index == 0:
            my_world.reset()
        observations = my_world.get_observations()

        #This includes an internal call to RmpFlow.update_world()
        actions = rmpflow_controller.forward(
            target_end_effector_position=observations[target_name]["position"],
            target_end_effector_orientation=observations[target_name]["orientation"],
        )

        #The robot base pose will not be automatically tracked in this example
        #But it could be using the rmpflow object.

        articulation_controller.apply_action(actions)

simulation_app.close()

The RmpFlow instance is retrieved from the omni.isaac.franka.RMPFlowController object on line 22. This allows RmpFlow-specific functions to be used, as on lines 25 and 28. The visualization can be very helpful to distinguish between behaviors that are coming from the simulator, and behaviors that are coming from RmpFlow.

In the video below, the franka robot is given weak proportional gains (lines 32-33). Using the debugging visualization, it is easy to determine that RmpFlow is producing reasonable motions, but the simulated robot is simply not able to follow the motions. As the target moves towards the edge of the workspace, the Franka robot Articulation gets worse at gravity compensation and drops significantly below the target position set by RmpFlow.

../_images/isaac_sim_rmpflow_visualization.gif

8.6. Summary

This tutorial reviews using the RmpFlow class in order to generate reactive motions in response to a dynamic environment. The RmpFlow class can be used to generate motions directly alongside an Articulation Motion Policy, or it may be used more simply on supported robots by using a Motion Policy Controller class that has been implemented to use RmpFlow on a specific robot such as the Franka.

This tutorial reviewed four of the main features of RmpFlow:

  1. Navigating the robot through an environment to a target position and orientation.

  2. Adapting to a dynamic world on every frame.

  3. Adapting to a change in the robot’s position on the USD stage.

  4. Using visualization to decouple the simulated robot Articulation from the RMPflow algorithm for quick and easy debugging.

8.6.1. Further Learning

To understand the motivation behind the structure and usage of RmpFlow in Omniverse Isaac Sim, reference the Motion Generation page.