介绍ROS 2 的服务质量 (QoS) 策略

介绍ROS 2 的服务质量 (QoS) 策略

ROS 2(Robot Operating System 2)引入了服务质量(Quality of Service,QoS)策略,以满足不同的通信需求和应用场景。QoS策略允许开发者根据消息的可靠性、延迟、带宽等方面的要求来配置通信行为。以下是ROS 2中可用的一些重要的QoS策略:

  1. 可靠性(Reliability):可靠性指定了消息传递的保证。在ROS 2中,可靠性可以设置为Reliable或Best-Effort两种级别。Reliable级别确保消息的可靠传输,确保消息不会丢失。Best-Effort级别则不提供可靠性保证,消息可能会丢失。可靠性级别的选择取决于应用的需求。
  2. 保证序列化(Durability):保证序列化决定了消息在发送者和接收者之间的持久性。ROS 2提供了Transient Local、Volatile和Persistent三种保证序列化级别。Transient Local级别保证消息在发布者处保持持久性,但在发布者终止后不会保存。Volatile级别不保证消息的持久性。Persistent级别确保消息的持久性,即使发布者终止后,消息也会保存在系统中。
  3. 延迟(Deadline):延迟定义了接收者处理消息的时间限制。可以通过设置延迟参数来确保在指定的时间范围内处理消息,否则消息将被视为无效。延迟参数可以帮助开发者控制实时性需求。
  4. 带宽(Bandwidth):带宽指定了消息传输的速率。ROS 2允许设置消息的最大发送速率,以限制带宽的使用。这对于网络资源有限或需要限制通信开销的应用非常有用。
  5. 生命周期(Liveliness):生命周期定义了节点或通信实体的活动状态。ROS 2提供了两种生命周期策略:Automatic和Manual。Automatic策略根据节点的活动情况自动管理生命周期。Manual策略需要开发者手动管理生命周期,以便在节点不活动时通知其他节点。
  6. 历史(History):历史策略决定了接收者在订阅主题时接收到的消息数量。ROS 2提供了Keep Last和Keep All两种历史策略。Keep Last策略只保留最新的一些消息,而Keep All策略保留所有消息。
  7. 遗愿(Liveliness Will):遗愿是一种可选的QoS策略,用于在节点失去活动性时通知其他节点。当节点无法发布预定周期内的活动消息时,遗愿将被激活,并通知其他节点该节点已经失去活动性。
  8. 持续性(Persistence):持续性是针对订阅者在启动后是否接收历史消息的策略。ROS 2提供了两种持续性策略:Transient和Keep Last。Transient策略只发送最新的消息,而Keep Last策略发送自节点启动后的所有消息。
  9. 时间同步(Time Synchronization):ROS 2允许节点之间进行时间同步以提供精确的时间戳。通过使用时间同步策略,可以确保节点之间的时间同步,从而实现更精确的数据同步和协调。
  10. 消息尺寸(Message Size):ROS 2还提供了用于限制消息尺寸的QoS策略。通过设置最大消息尺寸,可以确保消息在通信过程中不会超过指定的大小限制,从而避免通信开销和性能问题。
  11. 分区(Partition):分区策略允许将通信实体划分为不同的逻辑分区,以实现数据的隔离和过滤。每个分区可以定义自己的发布者和订阅者,以限制消息的传递范围。这对于构建复杂的分布式系统或多租户环境非常有用。
  12. 优先级(Priority):优先级策略用于确定消息在发送和接收时的优先级顺序。高优先级的消息将在低优先级的消息之前被处理。这对于处理紧急事件或实时反馈非常重要。
  13. 静态和动态配置:ROS 2允许QoS策略的静态和动态配置。静态配置指的是在编译或运行时将QoS策略设置为固定值,而动态配置允许在运行时根据系统状态和需求动态调整QoS参数。
  14. 限流(Flow Control):限流策略用于控制消息发布的速率,以确保接收者能够跟上消息的处理。通过设置限流参数,可以避免消息过载和资源消耗过多的情况。
  15. 增量式(Incremental):增量式策略允许消息的增量传输,只传输自上次发送以来发生变化的部分。这种策略可以减少带宽占用和通信延迟,特别适用于传输大量数据的情况。
  16. 容错性(Fault Tolerance):容错性策略是ROS 2的一个重要特性,用于处理通信中的错误和故障情况。ROS 2提供了可配置的容错机制,例如重试机制和错误处理策略,以确保系统在不稳定的网络环境下能够保持可靠的通信。
  17. 同步和异步通信:ROS 2支持同步和异步两种通信模式。同步通信模式要求发送者等待接收者的响应,而异步通信模式允许发送者继续执行其他任务而不必等待响应。这使得开发者能够根据应用需求选择适当的通信模式。
  18. QoS组合和优先级:ROS 2允许将多个QoS策略组合在一起,并为每个通信实体(例如节点、主题或服务)分配优先级。这种灵活性使开发者能够在不同的层级上定制化QoS策略,并根据重要性和需求对其进行优先级排序。
  19. 广播(Broadcast):广播策略允许消息在系统中的多个订阅者之间进行广播,以实现发布-订阅模式的多对多通信。广播策略可以在ROS 2的通信中实现数据的分发和共享,从而满足多个节点之间的信息交流需求。
  20. 透明性(Transparency):透明性策略允许开发者在ROS 2中配置通信的透明性需求。透明性指的是消息传输时是否需要进行加密、解密或数据压缩等操作,以确保数据的安全性和通信效率。
  21. 过滤(Filtering):过滤策略允许开发者根据特定的条件对消息进行过滤和选择性接收。通过设置过滤器,可以根据消息的内容、属性或源地址等信息来选择性地接收感兴趣的消息,从而减少不必要的通信和数据处理。
  22. 跨域(Cross-Domain):ROS 2支持跨域通信,即在不同的网络域或安全域之间进行通信。通过配置跨域策略,可以实现不同安全级别或隔离环境中的节点间的可靠通信。
  23. 故障恢复(Fault Recovery):ROS 2的QoS策略还包括故障恢复机制,用于处理通信中的错误和故障情况。故障恢复策略可以自动检测和修复通信故障,从而提高系统的可靠性和稳定性。

示例

当涉及到ROS 2的服务质量(QoS)策略的C++示例,以下是几个常见的用法示例:

  1. 设置消息发布者的可靠性和持久性:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("publisher_node");

rclcpp::QoS qos(rclcpp::KeepLast(10)); // 设置消息历史策略,保留最新的10个消息
qos.reliable(); // 设置消息发布者为可靠性级别

auto publisher = node->create_publisher<std_msgs::msg::String>("topic_name", qos);

// 发布消息
auto message = std::make_shared<std_msgs::msg::String>();
message->data = "Hello, ROS 2!";
publisher->publish(message);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置消息订阅者的可靠性和延迟:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

void messageCallback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_INFO(rclcpp::get_logger("subscriber_node"), "Received message: %s", msg->data.c_str());
}

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("subscriber_node");

rclcpp::QoS qos(rclcpp::KeepLast(10)); // 设置消息历史策略,保留最新的10个消息
qos.best_effort(); // 设置消息订阅者为最佳尽力级别

auto subscriber = node->create_subscription<std_msgs::msg::String>(
"topic_name", qos, messageCallback);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置消息发布者的延迟和历史记录:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("publisher_node");

rclcpp::QoS qos(rclcpp::KeepLast(5)); // 设置消息历史策略,保留最新的5个消息
qos.deadline(rclcpp::Duration(1000)); // 设置消息的处理时间限制为1秒

auto publisher = node->create_publisher<std_msgs::msg::String>("topic_name", qos);

// 发布消息
auto message = std::make_shared<std_msgs::msg::String>();
message->data = "Hello, ROS 2!";
publisher->publish(message);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置消息订阅者的带宽和容忍度:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

void messageCallback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_INFO(rclcpp::get_logger("subscriber_node"), "Received message: %s", msg->data.c_str());
}

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("subscriber_node");

rclcpp::QoS qos(10); // 设置消息历史策略,保留最近的10个消息
qos.bandwidth(1000); // 设置消息的最大发送速率为1000字节/秒
qos.durability(rclcpp::DurabilityPolicy::TransientLocal); // 设置消息的持久性为Transient Local

auto subscriber = node->create_subscription<std_msgs::msg::String>(
"topic_name", qos, messageCallback);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置消息发布者和订阅者的生命周期:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

void messageCallback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_INFO(rclcpp::get_logger("subscriber_node"), "Received message: %s", msg->data.c_str());
}

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("node_name");

rclcpp::QoS publisher_qos(10);
publisher_qos.keep_all(); // 设置消息发布者的历史策略为保留所有消息
publisher_qos.liveliness(rclcpp::LivelinessPolicyKind::AUTOMATIC); // 设置消息发布者的生命周期策略为自动管理

auto publisher = node->create_publisher<std_msgs::msg::String>("topic_name", publisher_qos);

rclcpp::QoS subscriber_qos(10);
subscriber_qos.keep_all(); // 设置消息订阅者的历史策略为保留所有消息
subscriber_qos.liveliness(rclcpp::LivelinessPolicyKind::MANUAL_BY_TOPIC); // 设置消息订阅者的生命周期策略为手动管理

auto subscriber = node->create_subscription<std_msgs::msg::String>(
"topic_name", subscriber_qos, messageCallback);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置消息订阅者的优先级和时间同步:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

void messageCallback(const std_msgs::msg::String::SharedPtr msg)
{
RCLCPP_INFO(rclcpp::get_logger("subscriber_node"), "Received message: %s", msg->data.c_str());
}

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("node_name");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置消息订阅者的可靠性级别为最佳尽力级别
qos.priority(1); // 设置消息订阅者的优先级为1
qos.avoid_ros_namespace_conventions(true); // 设置消息订阅者避免ROS命名空间约定

auto subscriber = node->create_subscription<std_msgs::msg::String>(
"topic_name", qos, messageCallback);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置服务端和客户端的QoS策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "rclcpp/rclcpp.hpp"
#include "example_interfaces/srv/add_two_ints.hpp"

void serviceCallback(
const std::shared_ptr<rmw_request_id_t> request_header,
const std::shared_ptr<example_interfaces::srv::AddTwoInts::Request> request,
std::shared_ptr<example_interfaces::srv::AddTwoInts::Response> response)
{
response->sum = request->a + request->b;
RCLCPP_INFO(rclcpp::get_logger("service_node"), "Received request: %d + %d", request->a, request->b);
}

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("service_node");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置服务端和客户端的可靠性级别为最佳尽力级别

auto service = node->create_service<example_interfaces::srv::AddTwoInts>(
"add_two_ints", serviceCallback, qos);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置参数读取器的QoS策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/parameter.hpp"

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("parameter_node");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置参数读取器的可靠性级别为最佳尽力级别

auto parameter_client = std::make_shared<rclcpp::AsyncParametersClient>(node, "parameter_node", qos);

auto parameters = parameter_client->get_parameters({"param1", "param2", "param3"});
for (auto param : parameters)
{
RCLCPP_INFO(rclcpp::get_logger("parameter_node"), "Parameter name: %s, value: %s", param.get_name().c_str(), param.value_to_string().c_str());
}

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置时间同步器的QoS策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/clock.hpp"

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("time_sync_node");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置时间同步器的可靠性级别为最佳尽力级别

auto time_source = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME); // 使用ROS时间源
auto time_sync_sub = std::make_shared<rclcpp::TimeSyncSubscription>(node, time_source, qos);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置日志记录器的QoS策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/logging.hpp"

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("logger_node");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置日志记录器的可靠性级别为最佳尽力级别

rclcpp::logging::initialize();
rclcpp::logging::set_logger_level(node->get_logger().get_name(), rclcpp::logging::LogLevel::INFO);
rclcpp::logging::set_logger_qos(qos);

RCLCPP_INFO(node->get_logger(), "This is a log message.");

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置事件发布器的QoS策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/event.hpp"

void eventCallback(const rclcpp::event::Event::SharedPtr event)
{
if (event->get_event_type() == rclcpp::event::EventType::TIMER_SCHEDULED)
{
RCLCPP_INFO(rclcpp::get_logger("event_node"), "Timer scheduled event triggered.");
}
}

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("event_node");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置事件发布器的可靠性级别为最佳尽力级别

auto event_publisher = node->create_publisher<rclcpp::event::Event>(
"/events", qos);

auto event_timer = node->create_wall_timer(std::chrono::seconds(1), [&]() {
auto event = std::make_shared<rclcpp::event::Event>();
event->set_event_type(rclcpp::event::EventType::TIMER_SCHEDULED);
event_publisher->publish(event);
});

auto event_subscription = node->create_subscription<rclcpp::event::Event>(
"/events", qos, eventCallback);

rclcpp::spin(node);
rclcpp::shutdown();

return 0;
}

  1. 设置TF广播器的QoS策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include "rclcpp/rclcpp.hpp"
#include "tf2_ros/transform_broadcaster.h"
#include "geometry_msgs/msg/transform_stamped.hpp"

int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("tf_node");

rclcpp::QoS qos(10);
qos.best_effort(); // 设置TF广播器的可靠性级别为最佳尽力级别

tf2_ros::TransformBroadcaster tf_broadcaster(node);

geometry_msgs::msg::TransformStamped transform;
transform.header.frame_id = "parent_frame";
transform.child_frame_id = "child_frame";
transform.transform.translation.x = 1.0;
transform.transform.rotation.w = 1.0;

rclcpp::Rate rate(1); // 广播频率为1Hz
while (rclcpp::ok())
{
transform.header.stamp = node->now();
tf_broadcaster.sendTransform(transform);
rate.sleep();
}

rclcpp::shutdown();

return 0;
}


介绍ROS 2 的服务质量 (QoS) 策略
https://qiangsun89.github.io/2023/05/15/介绍ROS-2-的服务质量-QoS-策略/
作者
Qiang Sun
发布于
2023年5月15日
许可协议