1、心跳

客户端在连接到MQTT服务器时(connect方法),可在options中指定keepalive字段进行配置,默认为60s。称为最大空闲时间T,当客户端检测到当前连接空闲时间超过最大空闲时间时,则会想Broker发送一个结构为

{cmd: “pingreq”}

的心跳包,Broker会在收到消息后,返回一个心跳响应。结构为:

  1. {
  2. cmd: "pingresp"
  3. dup: false
  4. length: 0
  5. payload: null
  6. qos: 0
  7. retain: false
  8. topic: null
  9. }

如果Broker在此客户端连接时设定的1.5T的时间内仍未收到心跳包,则会断开此客户端的连接。并投递 遗嘱消息 到订阅方。如果客户端在1.5T内未收到pingresp包,也会主动断开与服务器的连接。这样做的好处是弱化对带宽的依赖。

2、保留消息

在服务端收到一个retain标志位true的消息时,服务端在正常转发的情况下,会保留一份到本地。每个主题的保留消息只能有一条,就得会被新的所覆盖。当一个连接订阅这个主题时,保留消息会第一时间推送给订阅者。

EMQ保留消息功能由 emqx_retainer插件提供,这个插件是默认开启的

配置方式:

1.要禁用保留消息需要在etc/emqx.conf中修改mqtt.retain_available 为 false

如果在Broker端禁用了保留消息的情况下,任然收到了来自客户端的保留消息,会返回原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。

2.通过修改emqx_retainer 插件的配置,可以修改储存保留消息的位置,限制接收保留消息数量和 Payload 最大长度,以及调整保留消息的过期时间。

保留消息可以存储到外部数据库(企业版)

3、共享订阅

共享订阅在多个订阅者之间实现负载均衡的订阅方式。

EMQX提供两种方式的共享订阅:

实例 前缀 真实主题
$queue/t/1 $queue/ t/1
$share/abc/t/1 $share/abc t/1

上表中$ queue和 $share/abc称为共享订阅前缀。

3.1 $share//

这种形式的共享订阅成为带群组的共享订阅,属于同一个群组的订阅者会以负载均衡的方式接收消息,但EMQX会在不同的群组之间广播消息。看一下EMQ文档中的一个例子:

EMQ X 用户指南 - 图1

实验:

1.材料:部署到linux的EMQ服务器,EMQ后台DashBoard,MQTTX客户端,VUE前端

2.步骤:

1)三个客户端都连接到EMQ,DashBoard和VUE前端订阅一个命名为$ share/group1/abc的主题,MQTTX订阅一个名为 $share/group2/abc的主题

2)在MQTTX客户端连续发布一个topic为abc的消息

3.预期:MQTTX一定会受到消息,因为group2群组只有它自己。DashBoard和VUE中每条消息有且只有一个会收到。因为他们在同一分组,遵循负载均衡的方式。

4.结果:

MQTTX端:

EMQ X 用户指南 - 图2

VUE端:

EMQ X 用户指南 - 图3

DashBoard端:

EMQ X 用户指南 - 图4

预期成立。

3.2$queue/

不带群组的共享订阅,与所有人都在同一个群组相同。

3.3 EMQ的均衡策略与派发ACK

EMQ的均衡策略和派发ACK可以在 etc/emqx.conf进行配置:

  1. # etc/emqx.conf
  2. # 均衡策略
  3. broker.shared_subscription_strategy = random
  4. # 适用于 QoS1 QoS2 消息,启用时在其中一个组离线时,将派发给另一个组
  5. broker.shared_dispatch_ack_enabled = false

均衡策略配置可选项:

均衡策略 描述
random 在所有订阅者中随机选择
round_robin 按照订阅顺序
sticky 一直发往上次选取的订阅者
hash 按照发布者 ClientID 的哈希值

4、延迟发布

延迟发布功能由 emqx_mod_delayed模块实现,默认启用

延迟发布与共享订阅类似,使用特殊主体前缀实现。主题结构为:

$delayed/{DelayInterval}/{TopicName}

  • $delayed: 延迟发布主题前缀,使用此前缀的主题的消息都被视为延迟发布的消息
  • {DelayInterval}:时间间隔,单位为秒,最大为4294967,若 {DelayInterval}无法被解析成一个整形数字,则这个消息会被丢弃。
  • {TopicName}:真实主题名

案例:

  1. ![](https://cdn.nlark.com/yuque/0/2021/png/369045/1612516688348-60fee47f-7626-45d8-8162-b98854040832.png)

5、代理订阅

代理订阅是在在客户端登录时,不需要发送额外的订阅消息即可自动建立预设好的订阅关系。

代理订阅功能由 emqx_mod_subscription 内置模块提供。

配置方式(linux):编辑‘/etc/emqx/emqx.conf’ 文件,在最下方添加代理订阅规则即可。

代理订阅规则:

  1. ## 代理订阅的主题
  2. module.subscription.<number>.topic = <topic>
  3. ## 代理订阅的订阅选项:QoS
  4. ## 可选值: 0、1、2
  5. ## 默认值:1
  6. module.subscription.<number>.qos = <qos>
  7. ## 代理订阅的订阅选项:No Local
  8. ## 可选值: 0、1
  9. ## 默认值:0
  10. module.subscription.<number>.nl = <nl>
  11. ## 代理订阅的订阅选项:Retain As Published
  12. ## 可选值: 0、1
  13. ## 默认值:0
  14. module.subscription.<number>.rap = <rap>
  15. ## 代理订阅的订阅选项:Retain Handling
  16. ## 可选值: 0、1、2
  17. ## 默认值:0
  18. module.subscription.<number>.rh = <rh>

举例:

  1. module.subscription.1.topic = client/%c
  2. module.subscription.2.topic = user/%u
  3. module.subscription.2.qos = 2
  4. module.subscription.2.nl = 1
  5. module.subscription.2.rap = 1
  6. module.subscription.2.rh = 1
  7. // %c 和%u是通配符,%c代表clientid ,%u代表username
  8. //订阅选项中的 No Local、Retain As Published、Retain Handling 仅支持 MQTT V5 协议
  9. //在当前使用的协议版本小于5的时候,只有topic和qos生效。
  10. //除topic外,别的配置选项都有默认值

6、集群部署

配置方式:

  1. 确保两台服务器可以相互通信
  1. 使用static方式创建集群,修改etc/emqx/emqx.conf文件中如下两处:

EMQ X 用户指南 - 图5

3.重启服务

7、消息桥接

EMQ的消息桥接分为两种:

  • RPC模式:使用 Erlang RPC 协议的桥接方式,适合与另外一台EMQ服务器桥接,使用比较简单
  • MQTT模式,使用 MQTT 协议、适合与另外一台MQTT服务器或者EMQ服务器桥接,如RocketMQ等。

下文中主要说明RPC模式.

EMQ X 用户指南 - 图6

发布者可以通过桥接发送消息到远程的EMQ服务器

EMQ X 用户指南 - 图7