前言
- 对于常见的微服务系统,大部分接口调用是同步的,也就是一个服务直接调用另外一个服务的接口。这个时候,用TCC分布式事务方案来保证各个接口的调用,要么一起成功,要么一起回滚,是比较合适的。
- 如果服务间的调用是异步的,也就是说,一个服务发送一个消息给MQ,即消息中间件,比如RocketMQ、RabbitMQ、Kafka等等。然后,另外一个服务从MQ消费到一条消息后进行处理。这就成了基于MQ的异步调用了。
- 那么针对这种基于MQ的异步调用,如何保证各个服务间的分布式事务呢?
- 也就是基于MQ实现异步调用的多个服务的业务逻辑,要么一起成功,要么一起失败。
-
这个时候,就要用上可靠消息最终一致性方案,来实现分布式事务。
可靠消息最终一致性分布式事务.png - 如果不考虑各种高并发、高可用等技术挑战的话,单从“可靠消息”以及“最终一致性”两个角度来考虑,这种分布式事务方案还是比较简单的。
可靠消息最终一致性方案的核心流程
上游服务投递消息
- 如果要实现可靠消息最终一致性方案,一般自己写一个可靠消息服务,实现一些业务逻辑。
- 首先,上游服务需要发送一条消息给可靠消息服务。
- 你可以认为是对下游服务一个接口的调用,里面包含了对应的一些请求参数。
- 然后,可靠消息服务就得把这条消息存储到自己的数据库里去,状态为“待确认”。
- 接着,上游服务就可以执行自己本地的数据库操作,根据自己的执行结果,再次调用可靠消息服务的接口。
- 如果本地数据库操作执行成功了,那么就找可靠消息服务确认那条消息。如果本地数据库操作失败了,那么就找可靠消息服务删除那条消息。
- 此时如果是确认消息,那么可靠消息服务就把数据库里的消息状态更新为“已发送”,同时将消息发送给MQ。
- 这里有个很关键的点,就是更新数据库里的消息状态和投递消息到MQ。这俩操作,得放在一个方法里,而且得开启本地事务。
- 如果数据库里更新消息的状态失败了,那么就抛异常退出了,就别投递到MQ;
- 如果投递MQ失败报错了,那么就要抛异常让本地数据库事务回滚。
- 这俩操作必须得一起成功,或者一起失败。
- 如果上游服务是通知删除消息,那么可靠消息服务就得删除这条消息。
下游服务接收消息
- 下游服务就一直等着从MQ消费消息,如果消费到了消息,那么就操作自己本地数据库。
- 如果操作成功了,就反过来通知可靠消息服务,说自己处理成功了,然后可靠消息服务就会把消息的状态设置为“已完成”。
如何上游服务对消息的100%可靠投递?
- 如果投递消息的过程中各个环节出现了问题该怎么办?
- 如果上游服务给可靠消息服务发送待确认消息的过程出错了,那没关系,上游服务可以感知到调用异常的,就不用执行下面的流程了,这是没问题的。
- 如果上游服务操作完本地数据库之后,通知可靠消息服务确认消息或者删除消息的时候,出现了问题。
- 比如:没通知成功,或者没执行成功,或者是可靠消息服务没成功的投递消息到MQ。这一系列步骤出了问题怎么办?
- 那条消息在可靠消息服务的数据库里的状态会一直是“待确认”。
- 我们可以在在可靠消息服务里开发一个后台定时运行的线程,不停的检查各个消息的状态。
- 如果一直是“待确认”状态,就认为这个消息出了点什么问题。
- 此时的话,就可以回调上游服务提供的一个接口,这个消息对应的数据库操作,执行成功了?
- 如果上游服务答复执行成功,那么可靠消息服务将消息状态修改为“已发送”,同时投递消息到MQ。
- 如果上游服务答复没执行成功,那么可靠消息服务将数据库中的消息删除即可。
- 通过这套机制,就可以保证,可靠消息服务一定会尝试完成消息到MQ的投递。
如何保证下游服务对消息的100%可靠接收?
- 如果下游服务消费消息出了问题,没消费到?或者是下游服务对消息的处理失败了,怎么办?
- 在可靠消息服务里开发一个后台线程,不断的检查消息状态。
- 如果消息状态一直是“已发送”,始终没有变成“已完成”,那么就说明下游服务始终没有处理成功。
- 此时可靠消息服务就可以再次尝试重新投递消息到MQ,让下游服务来再次处理。
- 只要下游服务的接口逻辑实现幂等性,保证多次处理一个消息,不会插入重复数据即可。
网友评论