Decider networks
Note
Cortex is deprecated as of 4.2.0. Future versions will be independent of Isaac Sim.
This tutorial steps through the basics of decider networks and demonstrates the concepts with some simple examples. Decider networks, as a class of decision tools, include state machines, and we include some examples of how to construct state machines using our built in tooling.
In all command line examples, we use the abbreviation isaac_python
for the Isaac Sim python
script (<isaac_sim_root>/python.sh
on Linux and <isaac_sim_root>\python.bat
on Windows).
The command lines are written relative to the working directory
standalone_examples/api/omni.isaac.cortex
.
Each example will launch Isaac Sim without playing the simulation. Press play to run the simulation and behavior.
Related tutorials: Behavior Examples: Peck Games steps through scripting a series of simple games for the Franka robot, building off the concepts presented here. The tutorial emphsizes some of the limitations of state machines and illustrates how decider networks simplify the development of reactive behaviors. Likewise, Walkthrough: Franka Block Stacking walks through a complete demo of how decider networks and state machines are used to develop an interactive block stacking behavior for the Franka.
Basics of decider networks
All behaviors in Cortex are decider networks. This section describes decider networks and covers the basics of the framework tooling for implementing them. We also show how to implement state machines using the framework.
Decision framework tooling
A decider network is similar to a decision tree (although not strictly a tree, as described below),
but with a notion of statefulness. The full decision framework is implemented in
omni.isaac.cortex/omni/isaac/cortex/df.py
. The decider network is represented by the
DfNetwork
class.
Decider networks are formally directed acyclic graphs of DfDecider
nodes. One node is designated
the root, and leaf nodes are nodes with no children. Each decider node’s job is to choose among
its children. This choice is made by the decider node’s decide()
method. Every step of the
behavior (generally at 60hz, one step per physics step) the decider network algorithm traces from
the root down to a leaf following the sequence of decisions made by the decider nodes encountered
along the way.
The decider network is represented by the DfNetwork
class. The root is passed into the
DfNetwork
object on construction along with a custom context object which handles monitoring
logical state and gives each decider node access to the command API. Each decider node can access
the context object as self.context
during execution.
Context objects derive from DfLogicalState
which provides an API for adding logical state
monitors. Logical state monitors are simply functions that take the context object as input (e.g.
member functions of the context object are common); they’re called once every cycle in the order
they’re added. The context object also generally provides access to the robot’s command API. See for
instance DfContext
the behavior tools in the module
omni.isaac.cortex/omni/isaac/cortex/dfb.py
. The DfContext
is a common base class which
additionally provides access to the robot along with its command API.
Here’s a simple example:
from isaacsim import SimulationApp
simulation_app = SimulationApp({"headless": False})
from omni.isaac.cortex.cortex_world import CortexWorld
from omni.isaac.cortex.df import DfNetwork, DfDecider
from omni.isaac.cortex.dfb import DfContext
from omni.isaac.cortex.robot import add_franka_to_stage
class CustomContext(DfContext):
def __init__(self, robot):
super().__init__(robot)
def reset(self):
# Called before the behavior is run. This is where logical state can be initialized.
self.has_work = False
def monitor_work(self):
# Set the self.has_work logical state member if there's currently work to do.
...
class Dispatch(DfDecider):
def __init__(self):
super().__init__()
self.add_child("go_home", GoHome())
self.add_child("do_work", DoWork())
def decide(self):
# The decide method has access to the context object
if self.context.has_work:
return DfDecision("do_work")
else:
return DfDecision("go_home")
world = CortexWorld()
robot = world.add_robot(add_franka_to_stage(name="franka", prim_path="/World/franka"))
world.scene.add_default_ground_plane()
decider_network = DfNetwork(Dispatch(), context=CustomContext())
world.add_decider_network(decider_network)
world.run(simulation_app)
simulation_app.close()
The section at the bottom illustrates how to create a world, add a robot and behavior, and run it.
See omni.isaac.cortex/omni/isaac/cortex/cortex_world.py
for more information on the cortex world
and its API. Its stepped automatically from world.run(simulation_app)
in a standard loop runner.
Stepping the world processes the logical state, decision (behavior), and command API (policy) layers
of the Cortex pipeline:
def step(self, render: bool = True, step_sim: bool = True) -> None:
if self._task_scene_built:
...
if self.is_playing():
# Cortex pipeline: Process logical state monitors, then make decisions based on that
# logical state (sends commands to the robot's commanders), and finally step the
# robot's commanders to handle those commands.
for ls_monitor in self._logical_state_monitors.values():
ls_monitor.pre_step()
for behavior in self._behaviors.values():
behavior.pre_step()
for robot in self._robots.values():
robot.pre_step()
...
The world.add_decider_network(decider_network)
automatically adds a logical state monitor which
calls the context object’s monitors and adds a behavior which steps the decider network. More
generally, logical state monitors can be added using world.add_logical_state_monitor(...)
and
behaviors can be added using world.add_behavior(...)
.
Statefulness of decider nodes and state machines
Every cycle, the decider network algorithm traces from the root to a leaf creating an execution path
(the sequence of decider nodes visited). (See the above figure.) From cycle to cycle it keeps track
of the execution path and uses the previous path to determine whether the decider nodes are making
the same decisions as before, or making different decisions. Each decide node has an enter()
and
exit()
method, and every time a new branch to a leaf is chosen, exit()
is called on the
branch no longer taken in reverse order from the leaf to the branching point, and enter()
is
called along the new branch in the order the new decider nodes are visited. The full decider node
API is
class DfDecider(DfBindable):
...
def enter(self):
pass
def decide(self):
pass
def exit(self):
pass
enter()
is called (along with decide()
) only when the decider node is entered in the sense
defined above. As long as the execution path to this node remains consistent from step to step, only
decide()
is called. Once it’s no longer reached, exit()
is called. (DfBindable
indicates that objects of this type will be able to access the custom context object as
self.context
during execution.)
These concepts are analogous to the entry and exit concepts of state machines. The decision
framework provides a DfState
base class for defining state machines with an analogous API:
class DfState(DfBindable):
...
def enter(self):
pass
def step(self):
pass
def exit(self):
pass
enter()
is called on entry to the state, step()
is called while in the state, and exit()
is called when the state is exited. The step()
method indicates the state machine transition
through its return value. E.g. returning self
will transition back to itself, and returning
None
will transition to the terminal “do nothing” state. More generally, state transitions to
new states are implemented by returning a reference to that state object.
Since the concepts of entry, step/decide, and exit align between state machines and decider nodes
they are compatible within the decision framework. A DfStateMachineDecider
is a decider node
which takes a start state of a state machine on construction and runs the state machine. The decider
node’s enter()
method resets the state machine to the start state and the decide()
method
steps the state machine.
One common use case is the sequential state machine. If State1
, State2
, and State3
are
each DfState
objects which transition to themselves while doing work and terminate (transition
to None
) when finished, we can string them together into a sequential state machine using
DfStateSequence([State1(), State2(), State3()])
. A DfStateSequence
is itself a DfState
object which transitions back to itself, making it a hierarchical state machine. Internally, it runs
the states in sequence, transitioning to the next state whenever a state terminates. We can loop the
sequence using DfStateSequence([State1(), State2(), State3()], loop=True)
We can create a decider network that runs this state machine using:
state = DfStateSequence([State1(), State2(), State3()], loop=True)
decider_network = DfNetwork(DfStateMachineDecider(state)), context=DfContext(robot))
To see a complete example of using a looping sequential state machine run:
isaac_python example_command_api_main.py
The robot will move the end-effector to a fixed target and maintain that target while changing the nullspace arm configuration and opening and closing the gripper.
Simple follow example
Run the follow example:
isaac_python follow_example_main.py
It’ll launch the robot with a sphere at the end-effector. Select the sphere and drag it around with the Move gizmo.
We’ll modify this simple example below. The final modified code is shown in
follow_example_modified_main.py
for reference.
Add an end-effector monitor
Currently, the decider network is created with just the default context object DfContext
. We’ll
modify it to include a logical state monitor that monitors whether the end-effector has converged.
Add the following code
class FollowContext(DfContext):
def __init__(self, robot):
super().__init__(robot)
self.reset()
self.add_monitors(
[FollowContext.monitor_end_effector, FollowContext.monitor_diagnostics]
)
def reset(self):
self.is_target_reached = False
def monitor_end_effector(self):
eff_p = self.robot.arm.get_fk_p()
target_p, _ = self.robot.follow_sphere.get_world_pose()
self.is_target_reached = np.linalg.norm(target_p - eff_p) < 0.01
def monitor_diagnostics(self):
print("is_target_reached: {}".format(self.is_target_reached))
Then modify the creation of the decider network to use this context object.
# It used to have context=DfContext(robot). Now we use the custom FollowContext class.
world.add_decider_network(DfNetwork(DfStateMachineDecider(FollowState()), context=FollowContext(robot)))
Run the example again, and you’ll see is_target_reached: <val>
printed out where <val>
is
False
when the end-effector is away from the target and True
when it reaches the target.
Setup automatic action on the monitored logical state
Adding the end-effector monitor toggles the is_target_reached
logical state, but doesn’t do
anything with it. Now we’ll add a second monitor to the FollowContext
class to automatically
open and close the gripper based on whether the end-effector is at the target.
class FollowContext(DfContext):
def __init__(self, robot):
super().__init__(robot)
self.reset()
# New: add FollowContext.monitor_gripper to the monitor list
self.add_monitors(
[FollowContext.monitor_end_effector, FollowContext.monitor_gripper, FollowContext.monitor_diagnostics]
)
def reset(self):
self.is_target_reached = False
def monitor_end_effector(self):
eff_p = self.robot.arm.get_fk_p()
target_p, _ = self.robot.follow_sphere.get_world_pose()
self.is_target_reached = np.linalg.norm(target_p - eff_p) < 0.01
# New: Implement monitor_gripper()
def monitor_gripper(self):
if self.context.is_target_reached:
self.robot.gripper.close()
else:
self.robot.gripper.open()
def monitor_diagnostics(self):
print("is_target_reached: {}".format(self.is_target_reached))
This will close the gripper once the target’s been reached and open it when it’s not.
Run the example again and play with the sphere target. If you move the target away from the end-effector, you’ll see the gripper open and the end-effector each toward the target. Once the target is reached, the gripper will close.
Simple state machine
Run the following to launch an example of a simple state machine.
isaac_python franka_examples_main.py --behavior=simple_state_machine
You’ll see the robot move its end-effector up and down moving between two pre-specified points.
Simple decider network
Run the following to launch an example of a simple state machine.
isaac_python franka_examples_main.py --behavior=simple_decider_network
You’ll see “<middle>” printed in the console. Select the /World/motion_commander_target
prim in
the stage listing and select the Move gizmo. Move the end-effector to the left and right. When it
enters the left region (from the user’s perspective) it’ll print out “<left>”; when it moves back
into the middle region it’ll print out “<middle>”; and when it moves into the right region it’ll
print out “<right>”.
Note that this example additionally demonstrates passing parameters to a decider node.
class Dispatch(DfDecider):
def __init__(self):
super().__init__()
self.add_child("print_left", PrintAction("<left>"))
self.add_child("print_right", PrintAction("<right>"))
self.add_child("print", PrintAction())
def decide(self):
if self.context.is_middle:
return DfDecision("print", "<middle>") # Send parameters down to generic print.
if self.context.is_left:
return DfDecision("print_left")
else:
return DfDecision("print_right")
Running other behaviors
Any of the behaviors listed in omni.isaac.cortex.sample_behaviors/omni/isaac/cortex/sample_behaviors/franka
can be
loaded with this franka_examples_main.py
example.
The full commandline is
isaac_python franka_examples_main.py --behavior=<behavior_name>
with <behavior_name>
set to any of the following
block_stacking_behavior
peck_state_machine
peck_decider_network
peck_game
simple_state_machine
simple_decider_network
Alternatively, you can load behaviors directly from their python module:
isaac_python franka_examples_main.py --behavior=<path_to_behavior>
This tutorial stepped through the last two “simple” behaviors. The peck_state_machine
,
peck_decider_network
and peck_game
behaviors will be covered in
Behavior Examples: Peck Games, and the block_stacking_behavior
is
walked through in detail in Walkthrough: Franka Block Stacking.