Skip to main content

rclpy Integration: Bringing ROS 2 to Python

rclpy is the official Python client library for ROS 2. It provides a straightforward and intuitive way to write ROS 2 nodes, allowing Python developers to leverage the full power of the ROS 2 ecosystem for robotics applications. Its ease of use makes it a popular choice for prototyping and high-level control logic.

Creating Your First ROS 2 Node in Python

Every ROS 2 application starts with a node. In rclpy, creating a node involves inheriting from rclpy.node.Node and initializing the superclass.

import rclpy
from rclpy.node import Node

class MyCustomNode(Node):
def __init__(self):
super().__init__('my_custom_node')
self.get_logger().info('My Custom Node has been started!')

def main(args=None):
rclpy.init(args=args) # Initialize rclpy
my_node = MyCustomNode() # Create the node
rclpy.spin(my_node) # Keep node alive
my_node.destroy_node() # Destroy node when done
rclpy.shutdown() # Shut down rclpy

if __name__ == '__main__':
main()

Implementing Publishers and Subscribers

rclpy makes it easy to create publishers and subscribers to communicate via topics.

Publisher Example

import rclpy
from rclpy.node import Node
from std_msgs.msg import String # Example message type

class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0

def timer_callback(self):
msg = String()
msg.data = 'Hello ROS 2: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1

def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
minimal_publisher.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Subscriber Example

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning

def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)

def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
minimal_subscriber.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Implementing Service Clients and Servers

Services facilitate a request-response pattern.

Service Server Example

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts # Example service type

class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response

def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()
rclpy.spin(minimal_service)
minimal_service.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Service Client Example

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import sys

class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()

def send_request(self):
self.req.a = int(sys.argv[1])
self.req.b = int(sys.argv[2])
self.future = self.cli.call_async(self.req)
self.get_logger().info('Request sent')

def main(args=None):
rclpy.init(args=args)
minimal_client = MinimalClientAsync()
minimal_client.send_request()

while rclpy.ok():
rclpy.spin_once(minimal_client)
if minimal_client.future.done():
try:
response = minimal_client.future.result()
except Exception as e:
minimal_client.get_logger().error('Service call failed %r' % (e,))
else:
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(minimal_client.req.a, minimal_client.req.b, response.sum))
break

minimal_client.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Co-Learning Elements

💡 Theory: The Event Loop

rclpy.spin() is central to how ROS 2 Python nodes process events. It essentially puts the node into an infinite loop, allowing it to listen for incoming messages, service requests, action goals, and timer callbacks. This "event loop" is crucial for managing asynchronous operations and ensuring the node remains responsive to the ROS 2 graph.

🎓 Key Insight: Pythonic ROS 2 Development

rclpy is designed to be idiomatic Python, making it accessible to developers familiar with the language. It leverages Python's strengths for rapid prototyping and scripting, allowing for quicker development cycles, especially for higher-level control logic and user interfaces, complementing rclcpp (C++ client library) for performance-critical components.

💬 Practice Exercise: Ask your AI

Prompt: "Generate a Python rclpy node that publishes a custom JointState message (from sensor_msgs.msg) for a simple one-joint robot. The node should publish the joint's position, velocity, and effort every 0.1 seconds, simulating real-time sensor data."

Instructions: Use your preferred AI assistant to write an rclpy node. Define a timer callback that populates a JointState message with arbitrary values for name, position, velocity, and effort, and publishes it to a /joint_states topic. Ensure proper rclpy initialization and shutdown.

}