6. Example: Reactivity Using Deciders¶
6.1. Learning Objectives¶
We’ll see here how to leverage decider networks to make our state machines more reactive.
In Example: State Machines we saw an example of a simple state machine which made the Franka robot peck at the floor while avoiding obstacle locations. However, it chose that peck target at the beginning of the state machine, on entry, so it remained fixed throughout. If the user moved an obstacle over the peck target it would get stuck trying to avoid the obstacle and failing to reach the target. Here we show how to leverage decider networks to monitor for that situation and switch targets dynamically when necessary.
Completed Scripting Behaviors for an overview of how to setup and run behaviors.
Completed Example: State Machines for the background on setting up state machines and the example we’ll be augmenting here.
Scripts throughout this tutorial are referenced relative to
6.2. Running the example¶
This tutorial is based on the example
To run the example:
./cortex launch # Launches the default franka blocks belief world. ./cortex activate franka/peck_decider_network.py # Run the example.
The Franka robot will peck at the ground avoiding the blocks. You can move the blocks around to see how that affects where the robot chooses to peck. We saw a similar demo in tutorial Example: State Machines, however there the system would hang if a block moved into the way of a peck target. In this version, using decider networks, we can easily monitor that situation and switch targets as needed if a block ever overlaps a target.
Try moving a block into the path of a target. A new target will be chosen immediately.
6.3. The Code¶
import numpy as np from omni.isaac.cortex.df import ( DfNetwork, DfDecider, DfDecision, DfAction, DfBindableState, DfStateSequence, DfTimedDeciderState, DfStateMachineDecider, DfSetLockState, DfWriteContextState ) from omni.isaac.cortex.dfb import DfToolsContext, DfLift, DfCloseGripper, make_go_home import omni.isaac.cortex.math_util as math_util from omni.isaac.cortex.motion_commander import MotionCommand, ApproachParams, PosePq def sample_target_p(): min_x = .3 max_x = .7 min_y = -.4 max_y = .4 pt = np.zeros(3) pt = (max_x-min_x) * np.random.random_sample() + min_x pt = (max_y-min_y) * np.random.random_sample() + min_y pt = .01 return pt def make_target_rotation(target_p): return math_util.matrix_to_quat(math_util.make_rotation_matrix( az_dominant=np.array([0., 0., -1.]), ax_suggestion=-target_p)) class PeckContext(DfToolsContext): def __init__(self, tools): super().__init__(tools) self.blocks =  for _,block in self.tools.objects.items(): self.blocks.append(block) self.is_done = True self.active_target_p = None self.monitors = [ PeckContext.monitor_active_target_p ] def monitor_active_target_p(self): if self.active_target_p is not None and self.is_near_obs(self.active_target_p): self.is_done = True def set_is_done(self): self.is_done = True def is_near_obs(self, p): for _,obs in self.tools.obstacles.items(): obs_p,_ = obs.get_world_pose() if np.linalg.norm(obs_p - p) < .2: return True return False def sample_target_p_away_from_obs(self): target_p = sample_target_p() while self.is_near_obs(target_p): target_p = sample_target_p() return target_p def choose_next_target(self): self.active_target_p = self.sample_target_p_away_from_obs() class PeckState(DfBindableState): def enter(self): target_p = self.context.active_target_p target_q = make_target_rotation(target_p) self.target = PosePq(target_p, target_q) self.approach_params = ApproachParams(direction=np.array([0.,0.,-.1]), std_dev=.04) def step(self): # Send the command each cycle so exponential smoothing will converge. self.context.tools.commander.set_command( MotionCommand(self.target, approach_params=self.approach_params)) target_dist = np.linalg.norm(self.context.tools.commander.get_fk_p() - self.target.p) if target_dist < .01: return None # Exit return self # Keep going class ChooseTarget(DfAction): def step(self): self.context.is_done = False self.context.choose_next_target() class Dispatch(DfDecider): """ The top-level decider. If the current peck task is done, then it will choose a target. Otherwise, it executes the peck behavior. The peck behavior is a sequential state machine which 1. closes the gripper, 2. pecks, 3. lifts the end-effector slightly, 4. writes to the context that it's done. This behavior by itself is equivalent to the state machine variant in peck_state_machine.py. However, the context is also continually monitoring the situation and if it sees that its current target is blocked, it'll set the context.is_done flag to True triggering this Dispatch decider to choose a new target. """ def enter(self): self.add_child("choose_target", ChooseTarget()) self.add_child("peck", DfStateMachineDecider(DfStateSequence([ DfCloseGripper(width=.0), PeckState(), DfTimedDeciderState(DfLift(height=.05), activity_duration=.25), DfWriteContextState(lambda context: context.set_is_done())]))) def decide(self): if self.context.is_done: return DfDecision("choose_target") else: return DfDecision("peck") def build_behavior(tools): tools.enable_obstacles() tools.commander.set_target_full_pose() return DfNetwork(decider=Dispatch(),context=PeckContext(tools))
This example uses a top-level
Dispatch decider to choose what to do. It chooses simply from
peck, choosing a new target if the flag
is_done in the context object
is set, otherwise it pecking.
PeckState class is similar to the class of the same name in
Example: State Machines. However, now it gets its target information
from the context’s active target member
self.context.active_target_p, and we add one more state
to the peck behavior’s
DfStateSequence to write “is done” to the context when finished to
Dispatch decider to choose a new target.
Importantly, we solve the reactivity problem using a context monitor
PeckContext.monitor_active_target_p(context). That monitor monitors the active target, and if it
finds it to be too near an obstacle, it sets
True to trigger the
Dispatch decider to choose a new target.