美文网首页RocketMQmqMQ
RocketMQ实战(四)

RocketMQ实战(四)

作者: 张丰哲 | 来源:发表于2017-04-30 15:26 被阅读5003次

    前言

    这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表如下:

    《RocketMQ实战(一)》

    《RocketMQ实战(二)》

    《RocketMQ实战(三):分布式事务》

    RocketMQ 3.2.6的事务机制

    在上一篇博客中,已经知道RocketMQ 3.0.8是支持事务回查机制,但是在RocketMQ 3.2.6中取消了这个功能,下面我们继续以转账功能分析我们自己如何解决这个问题。

    转账流程

    在正常情况下,当然没有问题,如果第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会导致这条转账消息,在A银行完成了操作,但是迟迟对B银行系统不可见!

    解决RocketMQ 3.2.6不支持事务回查的思路

    用户U1从A银行系统转账给B银行系统的用户U2的处理过程如下:

    第一步:A银行系统生成一条转账消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息

    第二步:写入MQ成功后,回调A银行系统,对T1,T2表进行操作(很显然需要是一个事务)

    我们重点关注下T2表,这个表是用来干嘛的呢?每条转账消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)

    第三步:完成第二步,接下来发送确认消息给MQ,如果这个确认消息发送成功,那么这条转账消息,将对B银行系统可见。然后B银行系统,会在一个事务中完成对t3,t5的操作。

    如果发送确认消息给MQ失败的处理思路:

    首先,B银行系统,有一个定时任务(比如说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转账数据。为什么要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转账数据。那么怎么发给A银行系统呢,这个方式比较多,可以考虑在来一个Topic,也可以考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。

    这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的作用,在t4中记录一个time字段,每次定时任务启动,先更新time(比如设定为当前系统时间,设置前的的时间为old),然后扫描出t5中大于这个old时间的转账数据,如此循环往复。

    其次,A银行系统,也有一个定时任务(可以根据业务消费能力定,可以大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转账消息找出来,更新updatetime并发送给MQ。

    这样,我们并没有改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!

    其实到这里,你可以发现RocketMQ的一个特点,就是将生产者和MQ绑定,而不需要特别处理消费者,这是为什么呢?因为消息只要发往RocketMQ成功,那么就意味着成功,为什么这么说?

    前面,我们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ一定会让消息得到消费,如果消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!

    Pull Or Push

    在前面的博客已经提到,在RocketMQ中Consumer分为2类:Push Consumer、Pull Consumer。以前的例子都是Push Consumer,接下来,为大家介绍下Pull Consumer。

    通过MQPullConsumerScheduleService进行操作 注册回调并启动

    从表面意思上来看,好像Push是MQ推送给消费者,而Pull是消费者从MQ中拉取;其实本质上都是拉取模式PULL,即消费者从MQ中轮询取得消息。

    在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费,所以给我们造成了感觉上好像是“推消息”。

    在Pull模式下,需要特别注意的是,本质上是从一个Topic下的所有Queue进行拉取,而且每个Queue都必须记录拉取位置,否则会导致重复消费。还有拉取的时间间隔,拉取的大小等等。不过所有的这一切,MQPullConsumerScheduleService都替我们考虑清楚了,提供updateConsumeOffset去更新消费的队列的位置(默认5S同步一次),提供setPullNextDelayTimeMillis设置下次拉取的时间间隔(应该设置的大一些,至少大于5S)。

    仔细回想下,对于Push方式的回调   和  Pull方式的回调,还有什么关键区别么?

    对于Push而言,不论是基于MessageListenerConcurrently的,还是基于MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值却是void?

    这意味,我们需要在pull方式中,注意自己处理每条消息消费的异常情况!

    运行结果

    通过运行结果,可以印证上面的观点:为什么每次消费都是4条开始,4条结束呢?因为一个Topic下有4个Queue,而且上面的代码实际上会针对每个Queue开启一个线程去消费!

    RocketMQ Filter组件介绍

    对于ActiveMQ而言,我们可以通过JMS Selectors机制(就是类似于SQL的语法)来实现过滤,很easy。那么和RocketMQ Filter组件有什么区别呢?

    虽然,2者都能实现过滤,但是RocketMQ Filter的性能要更高效些,因为RocketMQ是在broker上将过滤后的数据发往filter,然后消费者直接从filter上取得数据;而ActiveMQ是消费者直接在broker上进行过滤消费!(当然,对于RocketMQ而言,Tag机制已经足够应付日常绝大数的过滤功能,除非你的业务对性能有特别高的要求)

    RocketMQ Filter机制

    具体怎么做呢?这里我就不演示了,网上有很多例子,这里只说下大致的过程:

    第一:broker-xxx.properties中指定filter个数 

    第二:上传一段JAVA代码,其实就是一个类

    到这里,整个RocketMQ实战系列就结束呢,你学到了么,体会到RocketMQ的强大了么?

    See u next blog!

    相关文章

      网友评论

      • 大悦月:分布式事务的回查机制的代码没有贴出来呀~~
      • d3763feeccb1:楼主写的系列很好,如果能把各个demo传到git分享下就更好啦
        张丰哲:嗯嗯,以后改进下哈😄
      • xudongwang_js:楼主你好,感谢你的分享,对于你提供的分布式事务的解决方案,我有一个疑问:
        扫描T5表后得到数据发送给A银行系统,如果这里的发送失败,如何处理?(下次再取T5时,T4的lasttime时间也变了)
      • 龚智博:看您的文章,对以前模糊地方清楚不少,写的很清楚详细,感谢。
        张丰哲:嗯嗯,有空常来看看,😄
      • ed5f8842d7ca:只要保证了消费端消费的幂等性,在producer端,只要保证消息一定投递成功即可。这个时候其实使用消息回查的做法是太重了,放在业务中,加一个消息推送记录表,推送成功了修改状态,不成功就定时筛选出来重新推送,这样其实就是不考虑rollback的情况,保证最终一致性(成功)。
        张丰哲:嗯嗯,谢谢你的分享,实际场景中确实由这样处理的,😄
      • yanyt:关于发送确认消息给MQ失败的处理思路问题:B系统要扫描通知A系统,但是如果有C系统、D系统.....等多个系统给B转账,那B岂不是要通知这所有的系统,这个方案是相对最优的吗?
        张丰哲:由于发送确认消息失败,A(C,D..)系统和B系统必然涉及到对比操作,这是无法避免的,上面是我这边的一个思路,:smile:
      • jackhello:你好,看了你这篇博客有一点疑点,t5和t2表该如何设计才能进行关联比较去更新t2的状态了?因为在a银行系统里面进行事务处理不能获取msgId,而在b银行消费者这里有msgId,t5和t2表不能通过msgId进行关联,也就是说通过t5表发送过来的数据,我怎么找到对应的t2表里面的这条记录了
        jackhello:@张丰哲 嗯,谢谢提点
        张丰哲:一个转账行为,可以有一个订单号,就可以对应了吧,:blush:
      • Carvendy许:4.2.0版本的TransactionCheckListenerImpl没有被调用了。:fearful:
      • IDST:大佬,那个autoCreateSubscriptionGroup这个参数怎么手动创建啊
      • hahaee:这耦合度是怎么考虑的?
        张丰哲:@dbwu 这个我就只能猜测了。(也许是基于商业化的考虑.....):smiley:
        fd5635c3c201:咨询下大佬,出于什么原因,在3.2.6版本中取消了事务回查机制?
        张丰哲:用MQ是可以降低2个系统的耦合的。
      • 86c917423624:不错不错,收藏了。

        推荐下,RocketMQ 源码解析 14 篇:http://www.yunai.me/categories/RocketMQ/?jianshu&401
      • 5acdc5f64fda:有个问题,通过定时任务再次发送给mq,是发送什么类型的消息?另外,原来已经在mq那个prepare状态的消息怎么办?
        张丰哲:@ArvinLI 如果确认消息没有发送成功,那么实际上就需要我们自己来做对比处理了,在本文是有这方面的叙述的。(如果在发一次业务消息的话,需要注意幂等性,另外如果再次出现确认消息没有发送呢?):blush:
        5acdc5f64fda:比如已经确认某一条确认消息没有发送成功,是重新把业务数据重新发送一次到mq?还是重新发送确认消息?
      • 1b41feaa555c:楼主可以转载吗?
        张丰哲:可以啊,:smile:
      • 80e1ac301699:看了你写的文章,写得都十分不错
        张丰哲:谢谢啦~:smiley:
      • 80e1ac301699:博主现在任职于什么公司
        张丰哲:一家互联网公司,~:smile:
      • d53b94c2c297:目前可用于生产的版本是哪个版本呢
        张丰哲:我司使用的是3.2.6
      • 阿布554_:之前有遇到一个问题,consumerGroupName不同但是订阅的Topic是相同的。。。采用集群消费模式,两个consumer都会对Topic进行消费。。。。这个怎么搞
        张丰哲:之前博客中提及过,不论是对于producer,还是consumer,它们的groupname都是全局唯一的,特别是consumer的groupname将是集群消费模式下消息进行负载均衡的关键。你上面的情况,就是因为2个消费组都订阅了啊。

      本文标题:RocketMQ实战(四)

      本文链接:https://www.haomeiwen.com/subject/xgpzzttx.html