Write a ROS Wrapper (Python)

You can find the original tutorial on The Robotics Back-End.

Please read What is a ROS Wrapper? first before you starts continuing this tutorial.

You have a Python driver code and want to add a ROS interface to it, so you can control it from your ROS application? Well, in this case you need to write a Python ROS wrapper.

In this tutorial I’ll show you exactly how to do that. We’ll create a basic and dummy driver, and then focus on writing the ROS wrapper step by step.

Even if this tutorial is oriented towards writing a wrapper for a hardware driver, you can use it to write a wrapper for anything (library, module, etc.) you want.

The Python hardware driver

Before writing a Python ROS wrapper, let’s create a simple hardware driver interface for a speed controlled motor.

class MotorDriver:
    def __init__(self, max_speed=10):
        """
        Init communication, set default settings, ...
        """
        ...
    def set_speed(self, speed):
        """
        Give a speed that the motor will try to reach.
        """
        ...
    def stop(self):
        """
        Set speed to 0 and thus stop the motor
        """
        ...
    def get_speed(self):
        """
        Return current speed
        """
        ...
    def get_status(self):
        """
        Get hardware information from the motor
        """
        ...

Implementing the code is your task to do, or you could find an already existing driver online.

In order to be able to use this driver in this tutorial – without an actual piece of hardware – let’s fill the MotorDriver class with a “dummy” behavior.

class MotorDriver:
    def __init__(self, max_speed=10):
        """
        Init communication, set default settings, ...
        """
        self.max_speed = max_speed
        self.current_speed = 0
        self.voltage = 12
        self.temperature = 47
    def set_speed(self, speed):
        """
        Give a speed that the motor will try to reach.
        """
        if speed <= self.max_speed:
            self.current_speed = speed
        else:
            self.current_speed = self.max_speed
    def stop(self):
        """
        Set speed to 0 and thus stop the motor
        """
        self.current_speed = 0
    def get_speed(self):
        """
        Return current speed
        """
        return self.current_speed
    def get_status(self):
        """
        Get hardware information from the motor
        """
        return {
            'temperature': self.temperature,
            'voltage': self.voltage
        }

Again, the implementation code is just here for this tutorial’s purpose. The real driver will actually talk to the hardware!

Now, let’s write the Python ROS wrapper for this driver.

Setup your Python ROS wrapper node

Without OOP

import rospy
from motor_driver import MotorDriver
if __name__ == "__main__":
    rospy.init_node("motor_driver")
    motor = MotorDriver(max_speed=8)
    rospy.on_shutdown(motor.stop)
    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

Even if this code doesn’t do much, it’s already a working ROS wrapper.

So, how is structured this minimal Python ROS wrapper?

import rospy
from motor_driver import MotorDriver
if __name__ == "__main__":
    rospy.init_node("motor_driver")

First we include the Python module which contains the driver. The import line depends on where you have placed your driver. Here, it’s on the same folder so it’s quite easy (later on this post series you’ll see what to do if you want to create your driver in the src/ folder of your package, or in a different package).

Then we initialize the node that we call “motor_driver”.

    motor = MotorDriver(max_speed=8)
    rospy.on_shutdown(motor.stop)
    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

Here we create an instance of the driver, and initialize it.

Thanks to the function rospy.on_shutdown() , we can tell the node to execute some final actions before it shuts down (shutdown hook). So, if the node is shut down we want to stop the motor first. You can just pass the name of the function – motor.stop, without the parenthesis.

A quick message to let the user know that the driver is setup, and then we call rospy.spin(). This will keep the program alive (and thus the driver) while the node is still running.

With those few lines we have an already working basic ROS wrapper. Create a node, initialize the driver, and call rospy.spin().

Setup ROS wrapper – with OOP

This code has the exact same behavior, but it’s written with OOP. Check out OOP with ROS in Python to refresh your memory, or if you’re not familiar with it yet.

import rospy
from motor_driver import MotorDriver
class MotorDriverROSWrapper:
    def __init__(self):
        self.motor = MotorDriver(max_speed=8)
    def stop(self):
        self.motor.stop()
        
if __name__ == "__main__":
    rospy.init_node("motor_driver")
    motor_driver_wrapper = MotorDriverROSWrapper()
    rospy.on_shutdown(motor_driver_wrapper.stop)
    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

From now on we’ll continue with object oriented programming.

Add functionalities to your Python ROS wrapper

Get default settings from ROS params

The MotorDriver init function takes an argument to set the max speed setting. For now we have an hard-coded value. Let’s get this setting from a ROS parameter!

...
def __init__(self):
    max_speed = rospy.get_param("~max_speed", 8)
    self.motor = MotorDriver(max_speed=max_speed)
...

Here we are getting the max speed setting from a ROS param, and we pass this value to the MotorDriver init function.

If you’re not so familiar with ROS params and rospy, check out how to get and set parameters from your code.

Receive speed command from a ROS topic (Subscriber)

We’ll use a rospy subscriber to get speed commands. This subscriber will wrap the set_speed() method.

...
from std_msgs.msg import Int32
class MotorDriverROSWrapper:
    def __init__(self):
        ...
        rospy.Subscriber("speed_command", Int32, self.callback_speed_command)
    ...
    def callback_speed_command(self, msg):
        self.motor.set_speed(msg.data)
...

When you send an integer value to the “speed_command” topic, this value will be passed to the motor driver to set the actual motor speed.

Note: we could’ve also used a ROS service here instead of the subscriber. For choosing that it really depends on your requirements, update frequency, whether the client (node which sends the command) needs to get a confirmation, etc. I just chose a subscriber so we can cover more ROS interfaces in this tutorial.

Stop the motor with a ROS Service

Let’s use a rospy Service server to wrap the stop() method.

...
from std_srvs.srv import Trigger
class MotorDriverROSWrapper:
    def __init__(self):
        ...
        rospy.Service("stop_motor", Trigger, self.callback_stop)
...
        
    def callback_stop(self, req):
        self.stop()
        return {"success": True, "message": "Motor has been stopped"}
...

Sometimes you’ll need to create custom message and service definitions if you can’t find an existing one that does the work. For this example, the Trigger service definition is just what we need.

Send status and current speed to a ROS topic (Publisher)

Now that we are able to set default settings, and send commands to the motor, let’s see how we can receive useful information from it.

To get the status and the current speed, we’ll use 2 rospy Publishers. Each publisher will be run with a rospy Timer, in order to publish at a fixed rate. They will wrap the get_speed() and get_status() methods from the driver.

...
from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_msgs.msg import KeyValue
class MotorDriverROSWrapper:
    def __init__(self):
        ...
        publish_current_speed_frequency = rospy.get_param("~publish_current_speed_frequency", 5.0)
        publish_motor_status_frequency = rospy.get_param("~publish_motor_status_frequency", 1.0)
        
        ...
        self.current_speed_pub = rospy.Publisher("current_speed", Int32, queue_size=10)
        self.motor_status_pub = rospy.Publisher("motor_status", DiagnosticStatus, queue_size=1)
        rospy.Timer(rospy.Duration(1.0/publish_current_speed_frequency), self.publish_current_speed)
        rospy.Timer(rospy.Duration(1.0/publish_motor_status_frequency), self.publish_motor_status)
    def publish_current_speed(self, event=None):
        self.current_speed_pub.publish(self.motor.get_speed())
    def publish_motor_status(self, event=None):
        status = self.motor.get_status()
        data_list = []
        for key in status:
            data_list.append(KeyValue(key, str(status[key])))
        msg = DiagnosticStatus()
        msg.values = data_list
        self.motor_status_pub.publish(msg)
...

We use the DiagnosticStatus message from the diagnostic_msgs package. This message definition is quite appropriate for sending a hardware status of your hardware. Note: here we just fill the message with KeyValue messages but there are more fields you can use. For a complete overview check out the message definitions.

With a few parameters, publishers, and timers, we are able to wrap the methods which get the data from the motor.

Complete Python ROS wrapper code

All the code is now ready! With this wrapper you can control the motor, set default settings, and get useful data.

Here is the complete Python ROS wrapper code for the driver.

import rospy
from motor_driver import MotorDriver
from std_msgs.msg import Int32
from std_srvs.srv import Trigger
from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_msgs.msg import KeyValue
class MotorDriverROSWrapper:
    def __init__(self):
        max_speed = rospy.get_param("~max_speed", 8)
        publish_current_speed_frequency = rospy.get_param("~publish_current_speed_frequency", 5.0)
        publish_motor_status_frequency = rospy.get_param("~publish_motor_status_frequency", 1.0)
        
        self.motor = MotorDriver(max_speed=max_speed)
        rospy.Subscriber("speed_command", Int32, self.callback_speed_command)
        rospy.Service("stop_motor", Trigger, self.callback_stop)
        self.current_speed_pub = rospy.Publisher("current_speed", Int32, queue_size=10)
        self.motor_status_pub = rospy.Publisher("motor_status", DiagnosticStatus, queue_size=1)
        rospy.Timer(rospy.Duration(1.0/publish_current_speed_frequency), self.publish_current_speed)
        rospy.Timer(rospy.Duration(1.0/publish_motor_status_frequency), self.publish_motor_status)
    def publish_current_speed(self, event=None):
        self.current_speed_pub.publish(self.motor.get_speed())
    def publish_motor_status(self, event=None):
        status = self.motor.get_status()
        data_list = []
        for key in status:
            data_list.append(KeyValue(key, str(status[key])))
        msg = DiagnosticStatus()
        msg.values = data_list
        self.motor_status_pub.publish(msg)
    def stop(self):
        self.motor.stop()
    def callback_speed_command(self, msg):
        self.motor.set_speed(msg.data)
        
    def callback_stop(self, req):
        self.stop()
        return {"success": True, "message": "Motor has been stopped"}
if __name__ == "__main__":
    rospy.init_node("motor_driver")
    motor_driver_wrapper = MotorDriverROSWrapper()
    rospy.on_shutdown(motor_driver_wrapper.stop)
    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

This code example will help you when you need to create your own Python ROS wrapper. As you can see, you can basically wrap everything with a simple combination of ROS core functionalities: parameters, topics, services, (actions), timers, etc.

You can continue with Write a ROS Wrapper (C++) if you want to learn the same with C++ or with Package and Test Your Driver.

Wiki: Drivers/Tutorials/ROSWrapperPython (last edited 2022-11-02 16:07:52 by Michdo93)