Retained消息
参考:https://www.emqx.com/zh/blog/mqtt5-features-retain-message
当你为一个topic设置了Retained消息,那么这个消息会保留在Broker里面,当有新的订阅者订阅到此topic之后,就会接收到这个Retained消息(也就是payload)
保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话终结,保留消息也不会被删除。
一个Topic只能有一个Retained消息,后设置的会覆盖前面设置的。
在python中对应的方法是:
matt.publish("test", payload="online", qos=0, retain=True)
删除Retained的方法是
client.publish("test", payload=None, qos=0, retain=True)
MQTT 遗嘱消息也称为Last Will and Testament
参考:https://www.emqx.com/zh/blog/use-of-mqtt-will-message
遗嘱消息会在客户端断开的时候,由Broker发送遗嘱消息给各个订阅者。
在python中设置遗嘱消息的方法是:
mqtt_client.will_set('test', "offline", qos=0, retain=True) # 即是遗嘱也是Retained
mqtt_client.will_set('test', "offline", qos=0, retain=False) # 只是遗嘱
如果设置遗嘱,retain为False的时候,新的client订阅是不会收到遗嘱消息的。
如果retain设置为True,他既是遗嘱也是Retained消息,意味着心的client连接也能收到消息。
如果设置了retain为False的遗嘱消息,清除可以使用下面的方式清除
mqtt_client.will_clear()
retain为True的时候
matt.publish("test", payload=None, qos=0, retain=True)
通过Retained报文和遗嘱消息,我们可以解决实际开发中其中一个问题,客户端上线下线状态维护显示。
第一步:设置遗嘱消息为设备下线的对应报文,并且设置retain=True来保证新的客户端也能每次订阅接收到此报文。
当然,单纯设置retain=True的遗嘱消息还不行,会导致在线都会发送遗嘱消息报文。
第二步:在客户端连接Broker成功之后,更改遗Retained消息为上线的报文
注意:遗嘱报文是连接的时候就会帮你自动发送给Broker,当下线时,由Broker自动往该主题发送。
下面是实现上线下线遗嘱+Retained实现。
import paho.mqtt.client as mqtt
# Retained消息的能够让其他订阅者在一订阅的时候收到这条消息
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("test", payload='{"status": "online"}', qos=0, retain=True) 清除retained报文
# client.publish("test", payload=None, qos=0, retain=True)
pass
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
# 遗嘱消息能在本客户端断开的时候,给对应的订阅了次主题的订阅者发送遗嘱消息
mqtt_client.will_set('test', payload='{"status": "offline"}', qos=0, retain=False)
mqtt_client.on_connect = on_connect
# mqtt_client.will_clear()
mqtt_client.connect("192.168.1.55", 1883)
mqtt_client.loop_forever()
下面代码retain为False,当客户端断开连接会发送遗嘱,已有的订阅者可以接收到,但是新的订阅不会收到遗嘱
import paho.mqtt.client as mqtt
# Retained消息的能够让其他订阅者在一订阅的时候收到这条消息
def on_connect(client, userdata, flags, rc):
if rc == 0:
# client.publish("test", payload='{"status": "online"}', qos=0, retain=True) 清除retained报文
# client.publish("test", payload=None, qos=0, retain=True)
pass
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
# 遗嘱消息能在本客户端断开的时候,给对应的订阅了次主题的订阅者发送遗嘱消息
mqtt_client.will_set('test', payload='{"status": "offline"}', qos=0, retain=False)
mqtt_client.on_connect = on_connect
# mqtt_client.will_clear()
mqtt_client.connect("192.168.1.55", 1883)
mqtt_client.loop_forever()
QOS
参考: https://www.emqx.com/zh/blog/introduction-to-mqtt-qos
参考: https://zhuanlan.zhihu.com/p/80203905
QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。
- QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
- QoS 1:消息传递至少 1 次。
- QoS 2:消息仅传送一次。
QOS实际是客户端和Broker之间的一个服务可靠等级。这个可靠等级最终计算也是取决于订阅和发布双方。
实际的client和broker的QOS是MIN(Publish QoS, Subscribe QoS)。假设现在发布方的主题是2,但是订阅方的主题是0,那么你订阅方最终和Broker之间的QOS也是0。假设发布方的主题是0,订阅方订阅了设置为了1,那最终订阅方和Broker之间也是0.
当A向主题test发送了一个QOS为1的报文,那么A客户端(sdk写好此实现)就需要保证Broker收到了这条消息之后至少一次再丢弃发送报文,如果一直没有收到Broker的成功回复,就一直发送。
下面我们通过代码说明此QOS
import paho.mqtt.client as mqtt
# Retained消息的能够让其他订阅者在一订阅的时候收到这条消息
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.publish("test", payload='online', qos=0, retain=True)
pass
else:
print("connection failed ", rc)
mqtt_client = mqtt.Client(client_id="demo_mqtt_pub")
# 遗嘱消息能在本客户端断开的时候,给对应的订阅了次主题的订阅者发送遗嘱消息
# mqtt_client.will_set('test', None, qos=0, retain=True)
mqtt_client.on_connect = on_connect
mqtt_client.will_clear()
mqtt_client.connect("192.168.1.55", 1883)
mqtt_client.loop_forever()
client.publish("test", payload='online', qos=0, retain=True)
发布了一个QOS等级为0的。
然后我们以QOS2的方式订阅,看收到的消息是QOS几:
![](https://img.haomeiwen.com/i1597444/05cc94cfbf768fc0.png)
如果我们改为2 client.publish("test", payload='online', qos=2, retain=True)
收到的消息就是QOS2:
![](https://img.haomeiwen.com/i1597444/a8a322cb1efe2264.png)
publish为2, sub为1, 订阅方实际的qos得到的是1
![](https://img.haomeiwen.com/i1597444/59454a9b7bc48cf8.png)
网友评论