DDS編程示例
我們嘗試在代碼中配置DDS,以之前Hello World話(huà)題通信為例。
運(yùn)行效果
啟動(dòng)兩個(gè)終端,分別運(yùn)行發(fā)布者和訂閱者節(jié)點(diǎn):
$ ros2 run learning_qos qos_helloworld_pub
$ ros2 run learning_qos qos_helloworld_sub
可以看到兩個(gè)終端中的通信效果如下,和之前貌似并沒(méi)有太大區(qū)別。
看效果確實(shí)差不多,不過(guò)底層通信機(jī)理上可是有所不同的。
發(fā)布者代碼解析
我們看下在代碼中,如果加入QoS的配置。
learning_qos/qos_helloworld_pub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居(www.guyuehome.com)@說(shuō)明: ROS2 QoS示例-發(fā)布“Hello World”話(huà)題"""import rclpy # ROS2 Python接口庫(kù)from rclpy.node import Node # ROS2 節(jié)點(diǎn)類(lèi)from std_msgs.msg import String # 字符串消息類(lèi)型from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS類(lèi)"""創(chuàng)建一個(gè)發(fā)布者節(jié)點(diǎn)"""class PublisherNode(Node): def __init__(self, name): super().__init__(name) # ROS2節(jié)點(diǎn)父類(lèi)初始化 qos_profile = QoSProfile( # 創(chuàng)建一個(gè)QoS原則 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.pub = self.create_publisher(String, "chatter", qos_profile) # 創(chuàng)建發(fā)布者對(duì)象(消息類(lèi)型、話(huà)題名、QoS原則) self.timer = self.create_timer(0.5, self.timer_callback) # 創(chuàng)建一個(gè)定時(shí)器(單位為秒的周期,定時(shí)執(zhí)行的回調(diào)函數(shù)) def timer_callback(self): # 創(chuàng)建定時(shí)器周期執(zhí)行的回調(diào)函數(shù) msg = String() # 創(chuàng)建一個(gè)String類(lèi)型的消息對(duì)象 msg.data = 'Hello World' # 填充消息對(duì)象中的消息數(shù)據(jù) self.pub.publish(msg) # 發(fā)布話(huà)題消息 self.get_logger().info('Publishing: "%s"' % msg.data)# 輸出日志信息,提示已經(jīng)完成話(huà)題發(fā)布def main(args=None): # ROS2節(jié)點(diǎn)主入口main函數(shù) rclpy.init(args=args) # ROS2 Python接口初始化 node = PublisherNode("qos_helloworld_pub") # 創(chuàng)建ROS2節(jié)點(diǎn)對(duì)象并進(jìn)行初始化 rclpy.spin(node) # 循環(huán)等待ROS2退出 node.destroy_node() # 銷(xiāo)毀節(jié)點(diǎn)對(duì)象 rclpy.shutdown() # 關(guān)閉ROS2 Python接口
完成代碼的編寫(xiě)后需要設(shè)置功能包的編譯選項(xiàng),讓系統(tǒng)知道Python程序的入口,打開(kāi)功能包的setup.py文件,加入如下入口點(diǎn)的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos.qos_helloworld_pub:main',},
訂閱者代碼解析
訂閱者中的QoS配置和發(fā)布者類(lèi)似。
learning_qos/qos_helloworld_sub.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""@作者: 古月居@說(shuō)明: ROS2 QoS示例-訂閱“Hello World”話(huà)題消息"""import rclpy # ROS2 Python接口庫(kù)
from rclpy.node import Node # ROS2 節(jié)點(diǎn)類(lèi)
from std_msgs.msg import String # ROS2標(biāo)準(zhǔn)定義的String消息from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS類(lèi)"""創(chuàng)建一個(gè)訂閱者節(jié)點(diǎn)"""class SubscriberNode(Node): def __init__(self, name): super().__init__(name) # ROS2節(jié)點(diǎn)父類(lèi)初始化 qos_profile = QoSProfile( # 創(chuàng)建一個(gè)QoS原則 # reliability=QoSReliabilityPolicy.BEST_EFFORT, reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST, depth=1 ) self.sub = self.create_subscription( String, "chatter", self.listener_callback, qos_profile) # 創(chuàng)建訂閱者對(duì)象(消息類(lèi)型、話(huà)題名、訂閱者回調(diào)函數(shù)、QoS原則)
def listener_callback(self, msg): # 創(chuàng)建回調(diào)函數(shù),執(zhí)行收到話(huà)題消息后對(duì)數(shù)據(jù)的處理 self.get_logger().info('I heard: "%s"' % msg.data) # 輸出日志信息,提示訂閱收到的話(huà)題消息def main(args=None): # ROS2節(jié)點(diǎn)主入口main函數(shù) rclpy.init(args=args) # ROS2 Python接口初始化 node = SubscriberNode("qos_helloworld_sub") # 創(chuàng)建ROS2節(jié)點(diǎn)對(duì)象并進(jìn)行初始化 rclpy.spin(node) # 循環(huán)等待ROS2退出
node.destroy_node() # 銷(xiāo)毀節(jié)點(diǎn)對(duì)象 rclpy.shutdown() # 關(guān)閉ROS2 Python接口
完成代碼的編寫(xiě)后需要設(shè)置功能包的編譯選項(xiàng),讓系統(tǒng)知道Python程序的入口,打開(kāi)功能包的setup.py文件,加入如下入口點(diǎn)的配置:
entry_points={ 'console_scripts': [ 'qos_helloworld_pub = learning_qos
-
機(jī)器人
+關(guān)注
關(guān)注
211文章
28618瀏覽量
207923 -
通信
+關(guān)注
關(guān)注
18文章
6064瀏覽量
136275 -
DDS
+關(guān)注
關(guān)注
21文章
636瀏覽量
152834 -
代碼
+關(guān)注
關(guān)注
30文章
4819瀏覽量
68879
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論