(!) Please ask about problems and questions regarding this tutorial on answers.ros.org. Don't forget to include in your question the link to this page, the versions of your OS & ROS, and also add appropriate tags.

Writing a simple SMACH for Move Base Flex

Description: In this tutorial you learn how to set up a SMACH to use Move Base Flex for flexible and more specific navigation tasks. Using a SMACH lets you easily include your navigation tasks into your global robot behavior state machine. We will use RViz to receive a goal pose as the first input for the SMACH.

Keywords: smach, move_base_flex, navigation, planning, state machine

Tutorial Level: ADVANCED

Description

In this tutorial we address the actions GetPath, ExePath and Recovery provided by Move Base Flex. While GetPath runs the global planner searching for a path to the target pose, ExePath runs the local planner executing the planned path. The Recovery action can be used to execute various behaviors for error handling during planning and controlling. We connect these actions by setting up a SMACH state machine using Simple Action States. In addition to the actions described above, the implementation of a state that receives a navigation goal by the user is required. The target pose can be easily set via the visualization tool RViz and published on a specific topic.

SMACH State Machine

mbf_state_machine.png

The implemented state machine visualized by the smach_viewer

SMACH States

In the following four SMACH states are described: WaitForGoal, GetPath, ExePath and Recovery. Furthermore, the connection between these states is shown and explained in the manner of transferring data from one state to another, e.g. the goal pose or the computed path.

WaitForGoal

The smach_ros package provides the Monitor State as a simple wrapper state for subscribing topics. The navigation goal is published as a geometry_msgs/PoseStamped on the topic /move_base_simple/goal. For implementing the Monitor State, add the following lines to your code.

   1 smach.StateMachine.add(
   2     'WAIT_FOR_GOAL',
   3     smach_ros.MonitorState(
   4         '/move_base_simple/goal',
   5         PoseStamped,
   6         goal_cb,
   7         output_keys=['goal']
   8     ),
   9     transitions={
  10         'invalid': 'GET_PATH',
  11         'valid': 'WAIT_FOR_GOAL',
  12         'preempted': 'preempted'
  13     }
  14 )

There is a callback function goal_cb which is invoked when a new goal is published on the subscribed topic. The callback passes the target pose to the userdata goal defined as an output key to transfer it to the next state.

   1 def goal_cb(userdata, msg):
   2     userdata.goal = msg
   3     return False

GetPath

The following lines add the GetPath action as a Simple Action State to the state machine.

   1 smach.StateMachine.add(
   2     'GET_PATH',
   3     smach_ros.SimpleActionState(
   4         '/move_base_flex/get_path',
   5         GetPathAction,
   6         goal_slots=['target_pose'],
   7         result_slots=['path']
   8     ),
   9     transitions={
  10         'succeeded': 'EXE_PATH',
  11         'aborted': 'WAIT_FOR_GOAL',
  12         'preempted': 'preempted'
  13     },
  14     remapping={
  15         'target_pose': 'goal'
  16     }
  17 )

Note that you declare the correct namespace for the action. While the goal_slots define the input values, result_slots indicate the output values. To ensure the correct input and output keys, refer to the GetPath.action in the mbf_msgs package. The slots transfer the goals and results of the action automatically to the correlated userdata. If goals/results and userdata are named differently, you can change them with the remapping argument.

ExePath

The ExePath action is similar to GetPath. The state is added to the state machine as follows.

   1 smach.StateMachine.add(
   2     'EXE_PATH',
   3     smach_ros.SimpleActionState(
   4         '/move_base_flex/exe_path',
   5         ExePathAction,
   6         goal_slots=['path']
   7     ),
   8     transitions={
   9         'succeeded': 'succeeded',
  10         'aborted': 'RECOVERY',
  11         'preempted': 'preempted'
  12     }
  13 )

Here, we only have a goal path but no results. Do not forget to change the namespace.

Recovery

   1 smach.StateMachine.add(
   2     'RECOVERY',
   3     smach_ros.SimpleActionState(
   4         'move_base_flex/recovery',
   5         RecoveryAction,
   6         goal_cb=recovery_path_goal_cb,
   7         input_keys=["error", "clear_costmap_flag"],
   8         output_keys = ["error_status", 'clear_costmap_flag']
   9     ),
  10     transitions={
  11         'succeeded': 'GET_PATH',
  12         'aborted': 'aborted',
  13         'preempted': 'preempted'
  14     }
  15 )

For the input, the recovery could react to different errors, which we not implemented yet and it takes the clear costmap_flag into account, which I'll explain in reference to the callback functions. If the recovery is successfull, it transitions to a new planning, if even the recovery failed the user must specify a new goal.

   1 def recovery_path_goal_cb(userdata, goal):
   2     if userdata.clear_costmap_flag == False:
   3         goal.behavior = 'clear_costmap'
   4         userdata.clear_costmap_flag = True
   5     else:
   6         goal.behavior = 'straf_recovery'
   7         userdata.clear_costmap_flag = False

The Recovery has only a goal_cb where at the moment per default by turns clear_costmap_recovery and straf_recovery are written into the goal and therefore executed. This is determined by the clear_costmap flag which changes alternately from True to false to ensure the turnwise calls.

Complete State Machine

   1 import rospy
   2 import smach
   3 import smach_ros
   4 
   5 from geometry_msgs.msg import PoseStamped
   6 from nav_msgs.msg import Path
   7 
   8 from mbf_msgs.msg import ExePathAction
   9 from mbf_msgs.msg import GetPathAction
  10 from mbf_msgs.msg import RecoveryAction
  11 
  12 
  13 def main():
  14     rospy.init_node('mbf_state_machine')
  15 
  16     # Create SMACH state machine
  17     sm = smach.StateMachine(outcomes=['succeeded', 'aborted', 'preempted'])
  18 
  19     # Define userdata
  20     sm.userdata.goal = None
  21     sm.userdata.path = None
  22     sm.userdata.error = None
  23     sm.userdata.clear_costmap_flag = False
  24     sm.userdata.error_status = None
  25 
  26     with sm:
  27         # Goal callback for state WAIT_FOR_GOAL
  28         def goal_cb(userdata, msg):
  29             userdata.goal = msg
  30             return False
  31 
  32         # Monitor topic to get MeshGoal from RViz plugin
  33         smach.StateMachine.add(
  34             'WAIT_FOR_GOAL',
  35             smach_ros.MonitorState(
  36                 '/move_base_simple/goal',
  37                 PoseStamped,
  38                 goal_cb,
  39                 output_keys=['goal']
  40             ),
  41             transitions={
  42                 'invalid': 'GET_PATH',
  43                 'valid': 'WAIT_FOR_GOAL',
  44                 'preempted': 'preempted'
  45             }
  46         )
  47 
  48         # Get path
  49         smach.StateMachine.add(
  50             'GET_PATH',
  51             smach_ros.SimpleActionState(
  52                 '/move_base_flex/get_path',
  53                 GetPathAction,
  54                 goal_slots=['target_pose'],
  55                 result_slots=['path']
  56             ),
  57             transitions={
  58                 'succeeded': 'EXE_PATH',
  59                 'aborted': 'WAIT_FOR_GOAL',
  60                 'preempted': 'preempted'
  61             },
  62             remapping={
  63                 'target_pose': 'goal'
  64             }
  65         )
  66 
  67         # Execute path
  68         smach.StateMachine.add(
  69             'EXE_PATH',
  70             smach_ros.SimpleActionState(
  71                 '/move_base_flex/exe_path',
  72                 ExePathAction,
  73                 goal_slots=['path']
  74             ),
  75             transitions={
  76                 'succeeded': 'succeeded',
  77                 'aborted': 'RECOVERY',
  78                 'preempted': 'preempted'
  79             }
  80         )
  81 
  82         # Goal callback for state RECOVERY
  83         def recovery_path_goal_cb(userdata, goal):
  84             if userdata.clear_costmap_flag == False:
  85                 goal.behavior = 'clear_costmap'
  86                 userdata.clear_costmap_flag = True
  87             else:
  88                 goal.behavior = 'straf_recovery'
  89                 userdata.clear_costmap_flag = False
  90 
  91         # Recovery
  92         smach.StateMachine.add(
  93             'RECOVERY',
  94              smach_ros.SimpleActionState(
  95                 'move_base_flex/recovery',
  96                 RecoveryAction,
  97                 goal_cb=recovery_path_goal_cb,
  98                 input_keys=["error", "clear_costmap_flag"],
  99                 output_keys = ["error_status", 'clear_costmap_flag']
 100             ),
 101             transitions={
 102                 'succeeded': 'GET_PATH',
 103                 'aborted': 'aborted',
 104                 'preempted': 'preempted'
 105             }
 106         )
 107 
 108     # Create and start introspection server
 109     sis = smach_ros.IntrospectionServer('smach_server', sm, '/SM_ROOT')
 110     sis.start()
 111 
 112     # Execute SMACH plan
 113     sm.execute()
 114 
 115     # Wait for interrupt and stop introspection server
 116     rospy.spin()
 117     sis.stop()
 118 
 119 if __name__=="__main__":
 120     main()

To display the state machine in the smach_viewer, run rqt_smach in your terminal.

rosrun rqt_smach rqt_smach

Following instructions will help you to install rqt_smach in your workspace.

Wiki: move_base_flex/Tutorials/SimpleSmachForMoveBaseFlex (last edited 2020-10-07 08:30:39 by Rayman)