7. Example: Designing Context Monitors

7.1. Learning Objectives

We’ll see here an example of a common reactive behavior design pattern.

Often it’s useful to carefully design the logical state monitors in the context object to model all aspects of the task as observable changes to the logical state. (Think of logical state simply as variables set in the context object at the beginning of each tick by the monitors.) When complete logical state is represented and monitored each tick, decisions made by the deciders become easy, and these decisions can be straightforwardly reactive.

For instance, in this example the robot needs to peck at any block that moves. We use monitors to monitor the blocks, note which has moved (additionally extracting peck target information), and even note whether the end-effector has pecked it. Given that information, we’ll see the decision logic becomes in the Dispatch decider’s decide() method becomes very simple.

In contrast to the behaviors of Example: State Machines and Example: Reactivity Using Deciders, this behavior is implemented entirely as a decider network (no internal state machines) operating on this monitored logical state.

Scripts throughout this tutorial are referenced relative to standalone_examples/cortex.

Prerequisites

7.2. Running the example

This tutorial is based on the example franka/peck_game.py

To run the example:

./cortex launch  # Launches the default franka blocks belief world.
./cortex activate franka/peck_game.py  # Run the example.

Every time a block is moved, the robot will move to peck it. Play around with moving blocks and see how the robot reacts. Each time a new block is moved, the robot will stop whatever it’s doing and move to peck it, including aborting a current peck. If a block is moved continuously, the robot will track the changing target location.

7.3. Problem Decomposition

If we could know which block is currently active, and whether the end-effector is close to a block, then the decisions are simple.

  1. If there’s an active block, peck at it.

  2. If there’s no active block, but the end-effector is close to a block, then lift the end-effector a bit.

  3. If neither of those are true, just go home.

Note, most of the logic is in the context object’s monitors. In particular, the activity status of the block is entirely governed by monitors. One monitor observes whether a block has moved, and if so, activates it. But then another monitor additionally keeps track of the end-effector and if the end-effector gets close enough to the target point on top of the active block, the monitor deactivates the block. All logical state of the problem is traced and updated by context’s monitors at the beginning of each tick.

Given those monitors, the actions just need to move the end-effector toward an active block, and coax the system toward a logical state transition observed by the appropriate monitor.

7.4. The Code

Example: franka/peck_game.py

import numpy as np
import time

from omni.isaac.cortex.df import DfNetwork, DfDecider, DfDecision, DfAction
from omni.isaac.cortex.dfb import DfToolsContext, DfLift, DfCloseGripper, make_go_home, tick_action
import omni.isaac.cortex.math_util as math_util
from omni.isaac.cortex.motion_commander import MotionCommand, ApproachParams, PosePq

class PeckContext(DfToolsContext):
    def __init__(self, tools):
        super().__init__(tools)

        self.blocks = []
        for _,block in self.tools.objects.items():
            self.blocks.append(block)

        self.block_positions = self.get_latest_block_positions()
        self.active_block = None
        self.active_target_p = None
        self.is_eff_close_to_block = None

        self.time_at_last_diagnostics_print = None

        self.monitors = [
            PeckContext.monitor_block_movement,
            PeckContext.monitor_active_target_p,
            PeckContext.monitor_active_block,
            PeckContext.monitor_eff_block_proximity,
            PeckContext.monitor_diagnostics,
        ]

    @property
    def has_active_block(self):
        return self.active_block is not None

    def clear_active_block(self):
        self.active_block = None
        self.active_target_p = None

    def get_latest_block_positions(self):
        block_positions = []
        for block in self.blocks:
            block_p,_ = block.get_world_pose()
            block_positions.append(block_p)
        return block_positions

    def monitor_block_movement(self):
        block_positions = self.get_latest_block_positions()
        for i in range(len(block_positions)):
            if np.linalg.norm(block_positions[i] - self.block_positions[i]) > .01:
                self.block_positions[i] = block_positions[i]
                self.active_block = self.blocks[i]

    def monitor_active_target_p(self):
        if self.active_block is not None:
            p,_ = self.active_block.get_world_pose()
            self.active_target_p = p + np.array([0.,0.,.03])

    def monitor_active_block(self):
        if self.active_target_p is not None:
            eff_p = self.tools.commander.get_fk_p()
            dist = np.linalg.norm(eff_p - self.active_target_p)
            if np.linalg.norm(eff_p - self.active_target_p) < .01:
                self.clear_active_block()

    def monitor_eff_block_proximity(self):
        self.is_eff_close_to_block = False

        eff_p = self.tools.commander.get_fk_p()
        for block in self.blocks:
            block_p,_ = block.get_world_pose()
            if np.linalg.norm(eff_p - block_p) < .07:
                self.is_eff_close_to_block = True
                return

    def monitor_diagnostics(self):
        now = time.time()
        if self.time_at_last_diagnostics_print is None or (now -
                self.time_at_last_diagnostics_print) >= 1.:
            if self.active_block is not None:
                print("active block:", self.active_block.name)
            self.time_at_last_diagnostics_print = now

class PeckAction(DfAction):
    def enter(self):
        self.block = self.context.active_block
        self.context.tools.commander.disable_obstacle(self.block)

    def step(self):
        target_p = self.context.active_target_p
        target_q = math_util.matrix_to_quat(math_util.make_rotation_matrix(
            az_dominant=np.array([0., 0., -1.]),
            ax_suggestion=-target_p))
        target = PosePq(target_p, target_q)
        approach_params = ApproachParams(direction=np.array([0.,0.,-.1]), std_dev=.04)

        # Send the command each cycle so exponential smoothing will converge.
        self.context.tools.commander.set_command(
                MotionCommand(target, approach_params=approach_params))
        target_dist = np.linalg.norm(self.context.tools.commander.get_fk_p() - target.p)

    def exit(self):
        self.context.tools.commander.enable_obstacle(self.block)

class Dispatch(DfDecider):
    def enter(self):
        self.add_child("peck", PeckAction())
        self.add_child("lift", DfLift(height=.1))
        self.add_child("go_home", make_go_home())

    def decide(self):
        if self.context.has_active_block:
            return DfDecision("peck")
        if self.context.is_eff_close_to_block:
            return DfDecision("lift")

        # If we aren't doing anything else, always just go home.
        return DfDecision("go_home")

def build_behavior(tools):
    tools.enable_obstacles()
    tools.commander.set_target_full_pose()
    tick_action(tools, DfCloseGripper()) # Close the gripper
    return DfNetwork(decider=Dispatch(), context=PeckContext(tools))

Notice how much of the logic is written into the PeckContext monitors. The monitors:

  1. Monitor block movement, setting the active block if it noticed movement.

  2. Monitor and record the active target point.

  3. Monitor monitor the active block and deactivates it if it finds the end-effector’s reached the target point (i.e. the peck has occurred).

  4. Monitor whether the end-effector is close to a block (used to know whether we have to lift away from it before returning home).

With these monitors, the decider network itself ends up being very simple and concise. The logic is entirely written into the PeckAction and the Dispatch decider. The simplicity of the decide() method comes from the work the monitors have already done to observe the logical state. It’s able to directly implement the simple logic described in the problem decomposition above.

Note also the monitors monitor task progress independent of whether PeckAction is actually used. If anything makes the end-effector reach the peck target, it’ll notice and trigger the logical state transition deactivating the block. E.g. if there were a different control interface, such as virtual reality, and a user were controlling the end-effector, that user could take the place of PeckAction entirely and send the end-effector to the target manually. The rest of the system would remain the same and monitor the state as before.