6. Example: Reactivity Using Deciders

6.1. Learning Objectives

We’ll see here how to leverage decider networks to make our state machines more reactive.

In Example: State Machines we saw an example of a simple state machine which made the Franka robot peck at the floor while avoiding obstacle locations. However, it chose that peck target at the beginning of the state machine, on entry, so it remained fixed throughout. If the user moved an obstacle over the peck target it would get stuck trying to avoid the obstacle and failing to reach the target. Here we show how to leverage decider networks to monitor for that situation and switch targets dynamically when necessary.

Prerequisites

Scripts throughout this tutorial are referenced relative to standalone_examples/cortex.

6.2. Running the example

This tutorial is based on the example franka/peck_decider_network.py.

To run the example:

./cortex launch  # Launches the default franka blocks belief world.
./cortex activate franka/peck_decider_network.py  # Run the example.

The Franka robot will peck at the ground avoiding the blocks. You can move the blocks around to see how that affects where the robot chooses to peck. We saw a similar demo in tutorial Example: State Machines, however there the system would hang if a block moved into the way of a peck target. In this version, using decider networks, we can easily monitor that situation and switch targets as needed if a block ever overlaps a target.

Try moving a block into the path of a target. A new target will be chosen immediately.

6.3. The Code

Example: franka/peck_decider_network.py

import numpy as np
from omni.isaac.cortex.df import (
        DfNetwork, DfDecider, DfDecision, DfAction, DfBindableState, DfStateSequence, DfTimedDeciderState,
        DfStateMachineDecider, DfSetLockState, DfWriteContextState
)
from omni.isaac.cortex.dfb import DfToolsContext, DfLift, DfCloseGripper, make_go_home
import omni.isaac.cortex.math_util as math_util
from omni.isaac.cortex.motion_commander import MotionCommand, ApproachParams, PosePq

def sample_target_p():
    min_x = .3
    max_x = .7
    min_y = -.4
    max_y = .4

    pt = np.zeros(3)
    pt[0] = (max_x-min_x) * np.random.random_sample() + min_x
    pt[1] = (max_y-min_y) * np.random.random_sample() + min_y
    pt[2] = .01

    return pt

def make_target_rotation(target_p):
    return math_util.matrix_to_quat(math_util.make_rotation_matrix(
        az_dominant=np.array([0., 0., -1.]),
        ax_suggestion=-target_p))

class PeckContext(DfToolsContext):
    def __init__(self, tools):
        super().__init__(tools)

        self.blocks = []
        for _,block in self.tools.objects.items():
            self.blocks.append(block)

        self.is_done = True
        self.active_target_p = None

        self.monitors = [
            PeckContext.monitor_active_target_p
        ]

    def monitor_active_target_p(self):
        if self.active_target_p is not None and self.is_near_obs(self.active_target_p):
            self.is_done = True

    def set_is_done(self):
        self.is_done = True

    def is_near_obs(self, p):
        for _,obs in self.tools.obstacles.items():
            obs_p,_ = obs.get_world_pose()
            if np.linalg.norm(obs_p - p) < .2:
                return True
        return False

    def sample_target_p_away_from_obs(self):
        target_p = sample_target_p()
        while self.is_near_obs(target_p):
            target_p = sample_target_p()
        return target_p

    def choose_next_target(self):
        self.active_target_p = self.sample_target_p_away_from_obs()

class PeckState(DfBindableState):
    def enter(self):
        target_p = self.context.active_target_p
        target_q = make_target_rotation(target_p)
        self.target = PosePq(target_p, target_q)
        self.approach_params = ApproachParams(direction=np.array([0.,0.,-.1]), std_dev=.04)

    def step(self):
        # Send the command each cycle so exponential smoothing will converge.
        self.context.tools.commander.set_command(
                MotionCommand(self.target, approach_params=self.approach_params))
        target_dist = np.linalg.norm(self.context.tools.commander.get_fk_p() - self.target.p)

        if target_dist < .01:
            return None  # Exit
        return self  # Keep going

class ChooseTarget(DfAction):
    def step(self):
        self.context.is_done = False
        self.context.choose_next_target()

class Dispatch(DfDecider):
    """ The top-level decider.

    If the current peck task is done, then it will choose a target.  Otherwise, it executes the peck
    behavior. The peck behavior is a sequential state machine which 1. closes the gripper, 2. pecks,
    3. lifts the end-effector slightly, 4. writes to the context that it's done.

    This behavior by itself is equivalent to the state machine variant in peck_state_machine.py.
    However, the context is also continually monitoring the situation and if it sees that its
    current target is blocked, it'll set the context.is_done flag to True triggering this Dispatch
    decider to choose a new target.
    """

    def enter(self):
        self.add_child("choose_target", ChooseTarget())
        self.add_child("peck", DfStateMachineDecider(DfStateSequence([
            DfCloseGripper(width=.0),
            PeckState(),
            DfTimedDeciderState(DfLift(height=.05), activity_duration=.25),
            DfWriteContextState(lambda context: context.set_is_done())])))

    def decide(self):
        if self.context.is_done:
            return DfDecision("choose_target")
        else:
            return DfDecision("peck")


def build_behavior(tools):
    tools.enable_obstacles()
    tools.commander.set_target_full_pose()
    return DfNetwork(decider=Dispatch(),context=PeckContext(tools))

This example uses a top-level Dispatch decider to choose what to do. It chooses simply from choose_target and peck, choosing a new target if the flag is_done in the context object is set, otherwise it pecking.

The PeckState class is similar to the class of the same name in Example: State Machines. However, now it gets its target information from the context’s active target member self.context.active_target_p, and we add one more state to the peck behavior’s DfStateSequence to write “is done” to the context when finished to trigger the Dispatch decider to choose a new target.

Importantly, we solve the reactivity problem using a context monitor PeckContext.monitor_active_target_p(context). That monitor monitors the active target, and if it finds it to be too near an obstacle, it sets self.is_done to True to trigger the Dispatch decider to choose a new target.