在介绍mqtt离线消息之前,先了解下mqtt协议的几个概念:
QoS(Quality of Service)
指代消息传输的服务质量。它包括以下级别:
服务质量 | 具体含义 |
---|---|
QoS0 | 代表最多分发一次 |
QoS1 | 代表至少达到一次 |
QoS2 | 代表仅分发一次 |
cleanSession
cleanSession 标志是 MQTT 协议中对一个客户端建立 TCP 连接后是否关心之前状态的定义。具体语义如下:
cleanSession | 具体含义 |
---|---|
true | 非持久化连接,客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息 |
false | 持久化连接,客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效 |
QoS 和 cleanSession 的不同组合产生的结果如下表所示:
QoS 级别 | cleanSession=true | cleanSession=false |
---|---|---|
QoS0 | 无离线消息,在线消息只尝试推一次 | 无离线消息,在线消息只尝试推一次 |
QoS1 | 无离线消息,在线消息保证可达 | 有离线消息,所有消息保证可达 |
QoS2 | 无离线消息,在线消息保证只推一次 | 有离线消息,所有消息保证只推一次 |
对于 QoS > 0的消息,如果是持久化连接,当客户端不在线时,发送消息会保存离线消息到broker,当客户端上线时,mqtt会从broker拉取消息推送给客户端。
mqtt 离线消息实现相关的主要存储结构以及作用为:
inflightWindow :
为了提高消息吞吐效率和减少网络波动带来的影响,已发送但未确认的报文将被存放在 inflightWindow 中直至完成确认从inflightWindow 中移除,key为packetId(报文标识符)
ConcurrentHashMap<Integer, InflightMessage> inflightWindow
consumeOffsetTable:内存中保存了消息的消费进度,当客户端断开连接,从内存中删除,再次上线会重新创建
ConcurrentHashMap<String /*broker^rootTopic*/, Map<String /*queueId^clientId*/, Long/*consumeOffset*/>> consumeOffsetTable
最初的离线消息的方案:
mqtt离线消息实现.png
这个方案在正常收发以及在producer先发送完所有的消息然后consumer上线拉取离线消息时是不会有问题的,之前测试也没有发现问题,但是在以下场景进行离线消息推送会存在问题,最近一次变更测试中,测试了以下场景:
producer发送2000条消息
1、qos =1、 cleansession=false的consumer订阅了一个topicA
2、producer发送500条消息时,consumer断开连接,到1000条消息的时候consumer再次连接,这个过程中producer一直发送消息,此时离线消息和在线消息会一起推送,前500条消息,后1000条消息是在线消息,中间还有500条是离线消息。
这个场景测试会有大量消息丢失和很多重复消息,经过打印大量日志分析造成这个现象的有两个地方代码逻辑需要优化:
1、客户端在线时,推送消息时,消息放入inflightWindow ,收到客户端的ACK,从inflightWindow 中移除该消息,更新consumeOffsetTable,当客户端掉线会更新消费进度到redis中
问题:先收到的ACK删除的消息的消费进度不一定时最大的,比如如下图所示
inflightWindow 中消息
当客户端收到消息进度为5的消息ACK时,consumeOffsetTable消费进度更新到5,此时客户端掉线,consumeOffsetTable消费进度5会更新到redis中,而消费进度6、7、8的消息会作为 NOT ACK的消息持久化到redis中,当客户端在次上线时,会先取出NOT ACK的消息发送,然后从broker拉取消息会从消费进度5开始,这样6、7、8会重复消费,一次是inflightWindow 作为未收到ACK重发,还有一次是作为离线消息从broker拉取消费
2、在上面的例子中,producer发送500条消息时,consumer断开连接,假设此时consumeOffsetTable中offset是500,断开连接会更新到redis中,当producer发送到1000条消息的时候consumer再次连接,会从redis中取出offset,代码逻辑如下:
private long calcNextOffset(ConcurrentHashMap<String, Map<String, Long>> offsetTable, String key,
String innerKey,
PersistService persistService) {
if (!offsetTable.containsKey(key)) {
long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
Map<String, Long> offsetMap = new HashMap<>();
Map<String, Long> previous = offsetTable.putIfAbsent(key, offsetMap);
if (previous != null) {
offsetMap = previous;
}
offsetMap.putIfAbsent(innerKey, persistOffset);
} else if (!offsetTable.get(key).containsKey(innerKey)) {
long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
Map<String, Long> offsetMap = offsetTable.get(key);
offsetMap.putIfAbsent(innerKey, persistOffset);
}
return offsetTable.get(key).get(innerKey);
}
但是由于producer在线消息也在发送,当consumer上线时,假设此时在线消息的offset到了1021,导致这个离线消息从broker拉取消息的offset并不是500而是1021,从而使得500到1000之间的离线消息都会丢失,这个问题实际上就是离线消息和在线消息consumeOffset混合使用导致
最后为了解决上述两个问题,优化后的方案为:
方案主要作了两个地方的逻辑修改:
1、改进之前consumeOffsetTable的offset更新是在MQTT收到客户端的ACK更新消费进度,改为在消息放入inflightWindow时就把consumeOffsetTable的消费进度更新,即使inflightWindow中消息没有收到ACK,也会作为NOT ACK的消息处理,不会丢失消息
2、在客户端上线推送离线消息时,计算拉取离线消息的消费进度改为直接从redis中取出,不必判断consumeOffsetTable里面是否有值,避免了在线跟离线消息混合使用consumeOffsetTable,把在线跟离线的消费进度分开处理
private long calcNextOffset(String key, String innerKey, PersistService persistService) {
long offlineMsgOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
return offlineMsgOffset + 1;
}
改进后的方案如下:
改进后的MQTT离线方案
除了以上问题,还有一个目前待MQTT离线消息解决的问题,先介绍一下这个问题,拉取离线消息主要就是获取拉取消息消费进度的开始结束值,前面的问题都是解决了消费进度开始值的问题,还有一个遗留问题是拉取离线消息消费进度结束值有问题。
出现这个问题场景还是上述测试场景
private long getMaxOffset(String enodeName,
String topic,
int queueId) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
return this.mqttBridgeController.getEnodeService().getMaxOffsetInQueue(enodeName, topic, request);
}
网友评论