5. Adding Multiple Robots¶
5.1. Learning Objectives¶
This tutorial integrates two different types of robot into the same simulation. It details how to build program logic to switch between subtasks. After this tutorial, you will have experience building more complex simulations of robots interacting.
15-20 Minute Tutorial
5.2. Getting Started¶
Prerequisites
Review Adding a Manipulator Robot prior to beginning this tutorial.
Begin with the source code open from the previous tutorial, Adding a Manipulator Robot.
5.3. Creating the Scene¶
Begin by adding the Jetbot, Franka Panda, and the Cube from the previous tutorials to the scene. Use a subtask in the task to simplify the code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 from omni.isaac.examples.base_sample import BaseSample from omni.isaac.franka.tasks import PickPlace from omni.isaac.wheeled_robots.robots import WheeledRobot from omni.isaac.core.utils.nucleus import get_assets_root_path from omni.isaac.core.tasks import BaseTask import numpy as np class RobotsPlaying(BaseTask): def __init__( self, name ): super().__init__(name=name, offset=None) self._jetbot_goal_position = np.array([130, 30, 0]) # Add a subtask of pick and place instead # of writing the same task again # we just need to add a jetbot and change the positions of the assets and # the cube target position self._pick_place_task = PickPlace(cube_initial_position=np.array([0.1, 0.3, 0.05]), target_position=np.array([0.7, -0.3, 0.0515 / 2.0])) return def set_up_scene(self, scene): super().set_up_scene(scene) self._pick_place_task.set_up_scene(scene) assets_root_path = get_assets_root_path() jetbot_asset_path = assets_root_path + "/Isaac/Robots/Jetbot/jetbot.usd" self._jetbot = scene.add( WheeledRobot( prim_path="/World/Fancy_Robot", name="fancy_robot", wheel_dof_names=["left_wheel_joint", "right_wheel_joint"], create_robot=True, usd_path=jetbot_asset_path, position=np.array([0, 0.3, 0])) ) pick_place_params = self._pick_place_task.get_params() self._franka = scene.get_object(pick_place_params["robot_name"]["value"]) # Changes Franka's default position # so that it is set at this position after reset self._franka.set_world_pose(position=np.array([1.0, 0, 0])) self._franka.set_default_state(position=np.array([1.0, 0, 0])) return def get_observations(self): current_jetbot_position, current_jetbot_orientation = self._jetbot.get_world_pose() # Observations needed to drive the jetbot to push the cube observations= { self._jetbot.name: { "position": current_jetbot_position, "orientation": current_jetbot_orientation, "goal_position": self._jetbot_goal_position } } return observations def get_params(self): # To avoid hard coding names..etc. pick_place_params = self._pick_place_task.get_params() params_representation = pick_place_params params_representation["jetbot_name"] = {"value": self._jetbot.name, "modifiable": False} params_representation["franka_name"] = pick_place_params["robot_name"] return params_representation def post_reset(self): self._franka.gripper.set_joint_positions(self._franka.gripper.joint_opened_positions) class HelloWorld(BaseSample): def __init__(self) -> None: super().__init__() return def setup_scene(self): world = self.get_world() # Add task here world.add_task(RobotsPlaying(name="awesome_task")) return
5.4. Integrating a Second Robot¶
Next, drive the Jetbot to push the cube to the Franka.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 from omni.isaac.examples.base_sample import BaseSample from omni.isaac.franka.tasks import PickPlace from omni.isaac.wheeled_robots.robots import WheeledRobot from omni.isaac.core.utils.nucleus import get_assets_root_path from omni.isaac.motion_generation import WheelBasePoseController from omni.isaac.wheeled_robots.controllers.differential_controller import DifferentialController from omni.isaac.core.tasks import BaseTask import numpy as np class RobotsPlaying(BaseTask): def __init__( self, name ): super().__init__(name=name, offset=None) self._jetbot_goal_position = np.array([1.3, 0.3, 0]) self._pick_place_task = PickPlace(cube_initial_position=np.array([0.1, 0.3, 0.05]), target_position=np.array([0.7, -0.3, 0.0515 / 2.0])) return def set_up_scene(self, scene): super().set_up_scene(scene) self._pick_place_task.set_up_scene(scene) assets_root_path = get_assets_root_path() jetbot_asset_path = assets_root_path + "/Isaac/Robots/Jetbot/jetbot.usd" self._jetbot = scene.add( WheeledRobot( prim_path="/World/Fancy_Jetbot", name="fancy_jetbot", wheel_dof_names=["left_wheel_joint", "right_wheel_joint"], create_robot=True, usd_path=jetbot_asset_path, position=np.array([0, 0.3, 0]), ) ) pick_place_params = self._pick_place_task.get_params() self._franka = scene.get_object(pick_place_params["robot_name"]["value"]) self._franka.set_world_pose(position=np.array([1.0, 0, 0])) self._franka.set_default_state(position=np.array([1.0, 0, 0])) return def get_observations(self): current_jetbot_position, current_jetbot_orientation = self._jetbot.get_world_pose() observations= { self._jetbot.name: { "position": current_jetbot_position, "orientation": current_jetbot_orientation, "goal_position": self._jetbot_goal_position } } return observations def get_params(self): pick_place_params = self._pick_place_task.get_params() params_representation = pick_place_params params_representation["jetbot_name"] = {"value": self._jetbot.name, "modifiable": False} params_representation["franka_name"] = pick_place_params["robot_name"] return params_representation def post_reset(self): self._franka.gripper.set_joint_positions(self._franka.gripper.joint_opened_positions) class HelloWorld(BaseSample): def __init__(self) -> None: super().__init__() return def setup_scene(self): world = self.get_world() world.add_task(RobotsPlaying(name="awesome_task")) return async def setup_post_load(self): self._world = self.get_world() task_params = self._world.get_task("awesome_task").get_params() self._jetbot = self._world.scene.get_object(task_params["jetbot_name"]["value"]) self._cube_name = task_params["cube_name"]["value"] self._jetbot_controller = WheelBasePoseController(name="cool_controller", open_loop_wheel_controller= DifferentialController(name="simple_control", wheel_radius=0.03, wheel_base=0.1125)) self._world.add_physics_callback("sim_step", callback_fn=self.physics_step) await self._world.play_async() return async def setup_post_reset(self): self._jetbot_controller.reset() await self._world.play_async() return def physics_step(self, step_size): current_observations = self._world.get_observations() self._jetbot.apply_action( self._jetbot_controller.forward( start_position=current_observations[self._jetbot.name]["position"], start_orientation=current_observations[self._jetbot.name]["orientation"], goal_position=current_observations[self._jetbot.name]["goal_position"])) return
5.5. Adding Task Logic¶
Next, drive Jetbot backwards after it delivers the cube to give space to Franka to pick up the cube.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 from omni.isaac.examples.base_sample import BaseSample from omni.isaac.franka.tasks import PickPlace from omni.isaac.wheeled_robots.robots import WheeledRobot from omni.isaac.core.utils.nucleus import get_assets_root_path from omni.isaac.motion_generation import WheelBasePoseController from omni.isaac.wheeled_robots.controllers.differential_controller import DifferentialController from omni.isaac.core.tasks import BaseTask from omni.isaac.core.utils.types import ArticulationAction import numpy as np class RobotsPlaying(BaseTask): def __init__( self, name ): super().__init__(name=name, offset=None) self._jetbot_goal_position = np.array([1.3, 0.3, 0]) # Add task logic to signal to the robots which task is active self._task_event = 0 self._pick_place_task = PickPlace(cube_initial_position=np.array([0.1, 0.3, 0.05]), target_position=np.array([0.7, -0.3, 0.0515 / 2.0])) return def set_up_scene(self, scene): super().set_up_scene(scene) self._pick_place_task.set_up_scene(scene) assets_root_path = get_assets_root_path() jetbot_asset_path = assets_root_path + "/Isaac/Robots/Jetbot/jetbot.usd" self._jetbot = scene.add( WheeledRobot( prim_path="/World/Fancy_Jetbot", name="fancy_jetbot", wheel_dof_names=["left_wheel_joint", "right_wheel_joint"], create_robot=True, usd_path=jetbot_asset_path, position=np.array([0, 0.3, 0]), ) ) pick_place_params = self._pick_place_task.get_params() self._franka = scene.get_object(pick_place_params["robot_name"]["value"]) self._franka.set_world_pose(position=np.array([1.0, 0, 0])) self._franka.set_default_state(position=np.array([1.0, 0, 0])) return def get_observations(self): current_jetbot_position, current_jetbot_orientation = self._jetbot.get_world_pose() observations= { "task_event": self._task_event, self._jetbot.name: { "position": current_jetbot_position, "orientation": current_jetbot_orientation, "goal_position": self._jetbot_goal_position } } return observations def get_params(self): pick_place_params = self._pick_place_task.get_params() params_representation = pick_place_params params_representation["jetbot_name"] = {"value": self._jetbot.name, "modifiable": False} params_representation["franka_name"] = pick_place_params["robot_name"] return params_representation def pre_step(self, control_index, simulation_time): if self._task_event == 0: current_jetbot_position, _ = self._jetbot.get_world_pose() if np.mean(np.abs(current_jetbot_position[:2] - self._jetbot_goal_position[:2])) < 0.04: self._task_event += 1 self._cube_arrive_step_index = control_index elif self._task_event == 1: # Jetbot has 200 time steps to back off from Franka if control_index - self._cube_arrive_step_index == 200: self._task_event += 1 return def post_reset(self): self._franka.gripper.set_joint_positions(self._franka.gripper.joint_opened_positions) self._task_event = 0 return class HelloWorld(BaseSample): def __init__(self) -> None: super().__init__() return def setup_scene(self): world = self.get_world() world.add_task(RobotsPlaying(name="awesome_task")) return async def setup_post_load(self): self._world = self.get_world() task_params = self._world.get_task("awesome_task").get_params() self._jetbot = self._world.scene.get_object(task_params["jetbot_name"]["value"]) self._cube_name = task_params["cube_name"]["value"] self._jetbot_controller = WheelBasePoseController(name="cool_controller", open_loop_wheel_controller= DifferentialController(name="simple_control", wheel_radius=0.03, wheel_base=0.1125)) self._world.add_physics_callback("sim_step", callback_fn=self.physics_step) await self._world.play_async() return async def setup_post_reset(self): self._jetbot_controller.reset() await self._world.play_async() return def physics_step(self, step_size): current_observations = self._world.get_observations() if current_observations["task_event"] == 0: self._jetbot.apply_wheel_actions( self._jetbot_controller.forward( start_position=current_observations[self._jetbot.name]["position"], start_orientation=current_observations[self._jetbot.name]["orientation"], goal_position=current_observations[self._jetbot.name]["goal_position"])) elif current_observations["task_event"] == 1: # Go backwards self._jetbot.apply_wheel_actions(ArticulationAction(joint_velocities=[-8, -8])) elif current_observations["task_event"] == 2: # Apply zero velocity to override the velocity applied before. # Note: target joint positions and target joint velocities will stay active unless changed self._jetbot.apply_wheel_actions(ArticulationAction(joint_velocities=[0.0, 0.0])) return
5.6. Robot Handover¶
Now, it’s the Franka’s turn to pick it up.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 from omni.isaac.examples.base_sample import BaseSample from omni.isaac.franka.tasks import PickPlace from omni.isaac.wheeled_robots.robots import WheeledRobot from omni.isaac.core.utils.nucleus import get_assets_root_path from omni.isaac.motion_generation import WheelBasePoseController from omni.isaac.franka.controllers import PickPlaceController from omni.isaac.wheeled_robots.controllers.differential_controller import DifferentialController from omni.isaac.core.tasks import BaseTask from omni.isaac.core.utils.types import ArticulationAction import numpy as np class RobotsPlaying(BaseTask): def __init__( self, name ): super().__init__(name=name, offset=None) self._jetbot_goal_position = np.array([1.3, 0.3, 0]) self._task_event = 0 self._pick_place_task = PickPlace(cube_initial_position=np.array([0.1, 0.3, 0.05]), target_position=np.array([0.7, -0.3, 0.0515 / 2.0])) return def set_up_scene(self, scene): super().set_up_scene(scene) self._pick_place_task.set_up_scene(scene) assets_root_path = get_assets_root_path() jetbot_asset_path = assets_root_path + "/Isaac/Robots/Jetbot/jetbot.usd" self._jetbot = scene.add( WheeledRobot( prim_path="/World/Fancy_Jetbot", name="fancy_jetbot", wheel_dof_names=["left_wheel_joint", "right_wheel_joint"], create_robot=True, usd_path=jetbot_asset_path, position=np.array([0, 0.3, 0]), ) ) pick_place_params = self._pick_place_task.get_params() self._franka = scene.get_object(pick_place_params["robot_name"]["value"]) self._franka.set_world_pose(position=np.array([1.0, 0, 0])) self._franka.set_default_state(position=np.array([1.0, 0, 0])) return def get_observations(self): current_jetbot_position, current_jetbot_orientation = self._jetbot.get_world_pose() observations= { "task_event": self._task_event, self._jetbot.name: { "position": current_jetbot_position, "orientation": current_jetbot_orientation, "goal_position": self._jetbot_goal_position } } # add the subtask's observations as well observations.update(self._pick_place_task.get_observations()) return observations def get_params(self): pick_place_params = self._pick_place_task.get_params() params_representation = pick_place_params params_representation["jetbot_name"] = {"value": self._jetbot.name, "modifiable": False} params_representation["franka_name"] = pick_place_params["robot_name"] return params_representation def pre_step(self, control_index, simulation_time): if self._task_event == 0: current_jetbot_position, _ = self._jetbot.get_world_pose() if np.mean(np.abs(current_jetbot_position[:2] - self._jetbot_goal_position[:2])) < 0.04: self._task_event += 1 self._cube_arrive_step_index = control_index elif self._task_event == 1: if control_index - self._cube_arrive_step_index == 200: self._task_event += 1 return def post_reset(self): self._franka.gripper.set_joint_positions(self._franka.gripper.joint_opened_positions) self._task_event = 0 return class HelloWorld(BaseSample): def __init__(self) -> None: super().__init__() return def setup_scene(self): world = self.get_world() world.add_task(RobotsPlaying(name="awesome_task")) return async def setup_post_load(self): self._world = self.get_world() task_params = self._world.get_task("awesome_task").get_params() # We need franka later to apply to it actions self._franka = self._world.scene.get_object(task_params["franka_name"]["value"]) self._jetbot = self._world.scene.get_object(task_params["jetbot_name"]["value"]) # We need the cube later on for the pick place controller self._cube_name = task_params["cube_name"]["value"] # Add Franka Controller self._franka_controller = PickPlaceController(name="pick_place_controller", gripper=self._franka.gripper, robot_articulation=self._franka) self._jetbot_controller = WheelBasePoseController(name="cool_controller", open_loop_wheel_controller= DifferentialController(name="simple_control", wheel_radius=0.03, wheel_base=0.1125)) self._world.add_physics_callback("sim_step", callback_fn=self.physics_step) await self._world.play_async() return async def setup_post_reset(self): self._franka_controller.reset() self._jetbot_controller.reset() await self._world.play_async() return def physics_step(self, step_size): current_observations = self._world.get_observations() if current_observations["task_event"] == 0: self._jetbot.apply_wheel_actions( self._jetbot_controller.forward( start_position=current_observations[self._jetbot.name]["position"], start_orientation=current_observations[self._jetbot.name]["orientation"], goal_position=current_observations[self._jetbot.name]["goal_position"])) elif current_observations["task_event"] == 1: self._jetbot.apply_wheel_actions(ArticulationAction(joint_velocities=[-8, -8])) elif current_observations["task_event"] == 2: self._jetbot.apply_wheel_actions(ArticulationAction(joint_velocities=[0.0, 0.0])) # Pick up the block actions = self._franka_controller.forward( picking_position=current_observations[self._cube_name]["position"], placing_position=current_observations[self._cube_name]["target_position"], current_joint_positions=current_observations[self._franka.name]["joint_positions"]) self._franka.apply_action(actions) # Pause once the controller is done if self._franka_controller.is_done(): self._world.pause() return
5.7. Summary¶
This tutorial covered the following topics:
Adding multiple robots to the scene
Controlling robot behavior with program states in the task structure.
5.7.1. Next Steps¶
Continue on to the next tutorial in our Essential Tutorials series, Multiple Tasks, to learn how to add multiple tasks and manage them.