前言
这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表如下:
RocketMQ 3.2.6的事务机制
转账流程在上一篇博客中,已经知道RocketMQ 3.0.8是支持事务回查机制,但是在RocketMQ 3.2.6中取消了这个功能,下面我们继续以转账功能分析我们自己如何解决这个问题。
解决RocketMQ 3.2.6不支持事务回查的思路在正常情况下,当然没有问题,如果第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会导致这条转账消息,在A银行完成了操作,但是迟迟对B银行系统不可见!
用户U1从A银行系统转账给B银行系统的用户U2的处理过程如下:
第一步:A银行系统生成一条转账消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息
第二步:写入MQ成功后,回调A银行系统,对T1,T2表进行操作(很显然需要是一个事务)
我们重点关注下T2表,这个表是用来干嘛的呢?每条转账消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)
第三步:完成第二步,接下来发送确认消息给MQ,如果这个确认消息发送成功,那么这条转账消息,将对B银行系统可见。然后B银行系统,会在一个事务中完成对t3,t5的操作。
如果发送确认消息给MQ失败的处理思路:
首先,B银行系统,有一个定时任务(比如说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转账数据。为什么要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转账数据。那么怎么发给A银行系统呢,这个方式比较多,可以考虑在来一个Topic,也可以考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。
这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的作用,在t4中记录一个time字段,每次定时任务启动,先更新time(比如设定为当前系统时间,设置前的的时间为old),然后扫描出t5中大于这个old时间的转账数据,如此循环往复。
其次,A银行系统,也有一个定时任务(可以根据业务消费能力定,可以大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转账消息找出来,更新updatetime并发送给MQ。
这样,我们并没有改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!
其实到这里,你可以发现RocketMQ的一个特点,就是将生产者和MQ绑定,而不需要特别处理消费者,这是为什么呢?因为消息只要发往RocketMQ成功,那么就意味着成功,为什么这么说?
前面,我们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ一定会让消息得到消费,如果消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!
Pull Or Push
通过MQPullConsumerScheduleService进行操作 注册回调并启动在前面的博客已经提到,在RocketMQ中Consumer分为2类:Push Consumer、Pull Consumer。以前的例子都是Push Consumer,接下来,为大家介绍下Pull Consumer。
运行结果从表面意思上来看,好像Push是MQ推送给消费者,而Pull是消费者从MQ中拉取;其实本质上都是拉取模式PULL,即消费者从MQ中轮询取得消息。
在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费,所以给我们造成了感觉上好像是“推消息”。
在Pull模式下,需要特别注意的是,本质上是从一个Topic下的所有Queue进行拉取,而且每个Queue都必须记录拉取位置,否则会导致重复消费。还有拉取的时间间隔,拉取的大小等等。不过所有的这一切,MQPullConsumerScheduleService都替我们考虑清楚了,提供updateConsumeOffset去更新消费的队列的位置(默认5S同步一次),提供setPullNextDelayTimeMillis设置下次拉取的时间间隔(应该设置的大一些,至少大于5S)。
仔细回想下,对于Push方式的回调 和 Pull方式的回调,还有什么关键区别么?
对于Push而言,不论是基于MessageListenerConcurrently的,还是基于MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值却是void?
这意味,我们需要在pull方式中,注意自己处理每条消息消费的异常情况!
通过运行结果,可以印证上面的观点:为什么每次消费都是4条开始,4条结束呢?因为一个Topic下有4个Queue,而且上面的代码实际上会针对每个Queue开启一个线程去消费!
RocketMQ Filter组件介绍
RocketMQ Filter机制对于ActiveMQ而言,我们可以通过JMS Selectors机制(就是类似于SQL的语法)来实现过滤,很easy。那么和RocketMQ Filter组件有什么区别呢?
虽然,2者都能实现过滤,但是RocketMQ Filter的性能要更高效些,因为RocketMQ是在broker上将过滤后的数据发往filter,然后消费者直接从filter上取得数据;而ActiveMQ是消费者直接在broker上进行过滤消费!(当然,对于RocketMQ而言,Tag机制已经足够应付日常绝大数的过滤功能,除非你的业务对性能有特别高的要求)
具体怎么做呢?这里我就不演示了,网上有很多例子,这里只说下大致的过程:
第一:broker-xxx.properties中指定filter个数
第二:上传一段JAVA代码,其实就是一个类
网友评论
扫描T5表后得到数据发送给A银行系统,如果这里的发送失败,如何处理?(下次再取T5时,T4的lasttime时间也变了)
推荐下,RocketMQ 源码解析 14 篇:http://www.yunai.me/categories/RocketMQ/?jianshu&401