本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。
- 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
- 之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
- 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。
如上图示:
message有以下4种状态
status 0:初始状态
status 1:publisher收到broker 确认消息后的状态
status 2:消息被consumer成功消费后的状态
status 3:异常状态,需人工跟进处理
步骤如下:
step 1:message 入库(存进db),初始 status 为 0
step 2:publisher发送message到 broker
step 3:broker收到消息后返回 确认消息 给publisher
step 4:publisher收到确认消息后,更新 message 的 status 为 1
step 5:consumer从broker获取消息并执行
step 6:consumer执行成功后返回确认消息给broker
step 7:consumer执行成功后将message 的 status 更新为 2
step 8:定时任务(cronjob)扫描db中status为0的message
step 9:cronjob将status为0的message重新发送到publisher,重复前面的 step2 ~ step7
step 10:为避免cronjob无限重复发送message而拖垮服务,
message发送次数大于3时,转为异常message,即status置为3
实际应用场景中, 可能由于网络原因, 或者消息未被持久化MQ就宕机了, 使得投递确认的回调方法ConfirmCallback没有被执行, 从而导致数据库该消息状态一直是投递中的状态, 此时就需要进行消息重投, 即使也许消息已经被消费了。
定时任务只是保证消息100%投递成功, 而多次投递的消费幂等性需要消费端自己保证。
消息100%投递
消息持久化 + 定时任务
总结:
-
step2 这一步骤,有可能因为网络故障或者broker宕机而导致message丢失,
因此需要消息补偿机制(step8 ~ step10),由定时任务(cronjob)实现 -
上面方案如果在step3 or step4发生错误,可能会发生broker已收到消息,但status没能更新为1情况
此时就会进入消息补偿环节,导致broker中存在2条相同消息,造成重复投递问题
而在业务上,比如支付业务上,是不允许消息重复消费消息的。
此时就需要在 consumer 端来处理这问题,即问题1《如何避免消息重复消费》 -
一条message,从创建到消费完毕,共经过 0,1,2 这三种消息状态,
需要进行3次db操作。而在实际业务中,造成服务瓶颈的往往是db操作,
3次耗时有点多,有没有优化方法?有的,看下面的 消息可靠投递方案2
如上图示:
'upstreamService':上游服务,即消息生产者(下面简称'uss')
'downstreamService':下游服务,即消息消费者(下面简称'dss')
'callbackService':回调服务,确认消息已经被成功消费并更新入库消息(下面简称'cbs')
步骤如下:
step 1:uss入库消息后,第一次发送消息给 broker
step 2:uss第二次发送消息,第二条消息属于延迟检查消息。
检查第一条消息是否已经被成功消费,两次投递之间需要间隔时间,比如5分钟
step 3:dss从broker获取消息
step 4:dss消费消息后发送confirm消息,这里的confirm不是正常的ack响应,
而是重新生成一条消息,发送到broker中
step5 5:cbs监听dss发送的confirm消息,如果收到confirm消息,
说明消息已经被成功消费,更新消息的状态为成功消费
step 6:5分钟之后,cbs收到了uss发送的延迟检查消息,此时会去db中检查消息是否已经被成功消费。
如果成功,不需要做任何处理。
如果失败,cbs会发起rpc服务通知uss该条消息投递失败,需要重新发送。
uss收到rpc通知后会重新发送该消息,重走step1 ~ step5的流程
总结:
-
方案2的step2和step6这两步骤,实现了消息补偿机制
-
方案2消息的status经过 0(初始),1(成功消费) 两种状态。只进行了2次db操作
相对前面的方案1 (3次db操作),性能上提升了 1/3,能支持更高的并发量。 -
cbs调起rpc通知这一步,如果出现故障,会导致消息的丢失,可靠性并没有方案1高。
但在实际业务中,有时并不需要100%的消息投递,可能是要支持更高的并发量,此时可以采用方案2。
另外如果真需要100%的消息投递,可以在方案2的基础上机上方案1的cronjob补偿消息,提高消息投递的可靠性
思考:这种方案和上一种方案的区别是什么呢?
如上图示:
'upstreamService':上游服务,即消息生产者(下面简称'uss')
'downstreamService':下游服务,即消息消费者(下面简称'dss')
'callbackService':回调服务,确认消息已经被成功消费并更新入库消息(下面简称'cbs')
步骤如下:
step 1:uss入库消息后,第一次发送消息给 broker
step 2:uss第二次发送消息,第二条消息属于延迟检查消息。
检查第一条消息是否已经被成功消费,两次投递之间需要间隔时间,比如5分钟
step 3:dss从broker获取消息
step 4:dss消费消息后发送confirm消息,这里的confirm不是正常的ack响应,
而是重新生成一条消息,发送到broker中
step5 5:cbs监听dss发送的confirm消息,如果收到confirm消息,
说明消息已经被成功消费,更新消息的状态为成功消费
step 6:5分钟之后,cbs收到了uss发送的延迟检查消息,此时会去db中检查消息是否已经被成功消费。
如果成功,不需要做任何处理。
如果失败,cbs会发起rpc服务通知uss该条消息投递失败,需要重新发送。
uss收到rpc通知后会重新发送该消息,重走step1 ~ step5的流程
总结:
-
方案2的step2和step6这两步骤,实现了消息补偿机制
-
方案2消息的status经过 0(初始),1(成功消费) 两种状态。只进行了2次db操作
相对前面的方案1 (3次db操作),性能上提升了 1/3,能支持更高的并发量。 -
cbs调起rpc通知这一步,如果出现故障,会导致消息的丢失,可靠性并没有方案1高。
但在实际业务中,有时并不需要100%的消息投递,可能是要支持更高的并发量,此时可以采用方案2。
另外如果真需要100%的消息投递,可以在方案2的基础上机上方案1的cronjob补偿消息,提高消息投递的可靠性
思考:这种方案和上一种方案的区别是什么呢?
对于主业务流程来说,他只有一条线,就是业务数据落库,然后发送消息到服务器,少了消息库的插入。这个方案把消息库的操作都给了回调服务,而回调服务是一种补偿服务,完全可以异步去处理,不影响主业务进程,性能就提高了。
网友评论