ActiveMQ从入门到精通(二)

作者: 张丰哲 | 来源:发表于2017-03-19 14:19 被阅读7857次

    接上一篇《ActiveMQ从入门到精通(一)》,本篇主要讨论的话题是:消息的顺序消费、JMS Selectors、消息的同步/异步接受方式、Message、P2P/PubSub、持久化订阅、持久化消息到MySQL以及与Spring整合等知识。

    消息的顺序消费

    在上一篇文章中,我们已经明确知道了ActiveMQ并不能保证消费的顺序性,即便我们使用了消息优先级。而在实际开发中,有些场景又是需要对消息进行顺序消费的,比如:用户从下单、到支付、再到发货等。如果使用ActiveMQ该如何保证消费的顺序性呢?

    消息消费顺序性

    首先来说,在实际中,我们并不需要的是对全部消息的全局有序消费,我们仅仅需要的是局部业务有序性消费。比如说,我们仅仅需要的是一个用户的下订单、支付、发货这个过程的3条消息有序消费。

    比如,我们可以根据用户ID简单做一个HASH,将消息定位到不同的队列上,也就意味着同一个用户的消息将发往同一个队列。这样做的好处在于,多个队列之间可以并行处理。

    然后,在队列上可以对一段时间上的消息按照用户分组进行排序,这只是一个少量消息的局部排序而已,比如Queue-A上有一个用户的3条消息(订单消息msg1、支付消息msg2、发货消息msg3),那么,msg1将交给订单业务系统,处理完成后,msg2交给支付系统,处理完成后,msg3交给发货系统。虽然这个处理过程是同步的(一条消息处理完,在接着处理),但是它的并发性,系统的处理能力并没有下降!为什么这么说呢?

    假设,msg1/msg2/msg3处理各需要0.1S,如果订单业务系统、支付系统、发货系统并没有分开,而是一个“大系统”,那么显然订单业务在0.1S完成后,需要等待后面的支付、发货逻辑处理完才能继续工作,意味着订单业务干了0.1S的活,等了0.2S,导致在0.3秒内订单业务只处理了1条消息。而现在这3个系统是分开的,那么在0.3S内,订单业务系统可以处理3条消息,而且没有业务系统闲着!

    实际上,RocketMQ在消费顺序性这块要比ActiveMQ要强大些,后期在RocketMQ专题中再为大家介绍。


    JMS Selectors

    JMS Selectors,即消息选择器。在《ActiveMQ从入门到精通(一)》中,介绍过消息的组成部分,其中谈到消息对象有消息属性,用于消息选择器。我们来看一个代码片段,你就会明白:

    生产者片段 消费者片段

    需要注意一下几点:

    第一,生产者端需要设置消息属性,一定要注意的是setXxxProperty(filed,value)

    第二,给出条件,其实本质上就是SQL92语法

    第三,创建消费者的时候,指定条件即可


    消息的同步 AND 异步 接受

    消息的接受,我们已经知道,可以通过消费者的receive()/receive(long time)/receiveNoWait(),这种方式是client端主动接受消息,可以理解为消息的同步接受。要知道这种同步的消息接受方式,是让我们很难受的,我们不得不写一个死循环来不断接受消息。那么有没有一种比较优雅的方式,比如我们设置一个类似消息监听的机制,一旦队列上有消息了,那么回调我们的message handler进行处理呢?

    Message Listener

    消息的异步接受是指当消息到达时,ActiveMQ主动通知客户端。可以通过注册一个实现了MessageListener接口的对象到MessageConsumer。MessageListener只有一个必须要实现的方法,即onMessage。在发往Destination的消息时,会调用该方法。

    这种异步接受“貌似”是ActiveMQ主动的推送消息给消费者,其本质还是消费者轮询消息服务器导致的,只不过这个过程被封装了!


    Message

    JMS程序的核心在于,生产和消费的消息能够被其他程序所使用到。JMS Message是一个既简单又不乏灵活的基本格式,由消息头、属性、消息体3部分组成。

    Message

    注意,在消费者端,我们接受到消息后,一般需要通过instanceof来判断类型后在进行处理!

    在ActiveMQ中,还存在一类临时消息,就是通过创建临时队列/临时主题,如果Connection一旦关闭,那么临时目标就关闭,消息内容也就消失。了解下即可,实际中并不适用。


    P2P or Pub/Sub

    上2张图,你就会明白这2种模式的区别了。

    P2P

    生产者端发送一条消息,消费者端只会有一个消费者消费这个消息。好像打电话,一对一通信!

    Pub/Sub

    一对多通信,发送一条消息,所有订阅了该目标的消费者都会收到消息。

    P2P、Pub/Sub在代码上的区别点仅仅在于,目标类型的创建是createQueue or createTopic,其他一切照旧!

    对于订阅模式,对订阅者提出了特殊的要求,要想收到消息,必须先订阅,而且订阅进程必须一直处于运行状态!实际上,有时候消费者重启了下,那么这个消费者将丢失掉一些消息,那么能否避免这样的情况呢?ActiveMQ已经替我们想好了,就是持久化订阅!


    持久化订阅

    所谓持久化订阅,打个比方,就是说跟MQ打声招呼,即便我不在,那么给我发送的消息暂存在MQ,等我来了,再给我发过来。说白了,持久化订阅,需要给MQ备个案(你是谁,想在哪个Topic上搞特殊化)!看一个代码片段:

    持久化订阅机制

    每一个持久化订阅者都应该有一个唯一的ID作为标示以及要在哪个Topic上进行持久化订阅,一旦这些信息告知MQ之后,那么以后不论持久化订阅者在不在线,那么他的消息会暂存在MQ,以后都会发给他!


    持久化消息到MySQL

    在前文中已经提及默认情况下,ActiveMQ是开启持久化消息机制的,并且是持久化到kahadb的,但是"很可惜"kahadb对我们不是很友好的可视化,其实ActiveMQ提供了配置的方式让我们来选择持久化消息到哪里,这里我以到MySQL为例来说明。(实际上ActiveMQ已经在conf配置文件中提供了相应的例子,我这里就简单说明下)

    在activemq.xml的<broker>节点中增加MySQL信息

    注意到这个bean的id,这个是要被引用的。

    注释kahadb,启用持久化到MySQL配置

    实际中,我们会持久化到哪里呢?一般情况下,比如到kahadb,比如到leveldb,因为这些数据库的性能要较MySQL更高些,我们并不关心消息的“可视化”,更加关心的是消息在持久化的同时更加高效!


    与Spring整合

    这里我将为大家演示Spring和ActiveMQ整合的核心要素。采用Spring,不要Web容器,不涉及Spring-MVC,而且在这里我将采用JUnit + Spring-Test来进行测试!在文章末尾我将提供源码下载。OK,先来看一眼工程截图:

    工程结构

    第一步:POM.XML配置

    maven dependency tree

    第二步:MQ信息配置文件、Spring配置文件

    activemq.properties spring-context.xml

    下面我们重点关注spring-activemq.xml:

    ConnectionFactory

    注意从ActiveMQConectionFactory到PooledConnectionFactory,到Spring提供的SingleConnectionFactory,就是一个适配的过程。

    生产者、消费者配置

    注意Spring的套路经常是这样的,提供XxxTemplate,比如HibernateTemplate,对于JMS,提供了JmsTemplate。

    生产者应该持有JmsTemplate进行发送消息。

    消费者,提供监听器、监听的目的地、连接工厂即可。

    上面的配置,只是一个非常简单的示例,比如是发送到队列,还是发送到主题,事务的配置,签收机制的配置,ttl/priority等配置在后文通过看一下源码,你就会知道该如何配置了。

    第三步:消费者实现监听器

    spring提供的接口

    第四步:生产者

    通过注入拿到JmsTemplate

    第五步:利用Junit4 + SpringTest方式进行测试

    我们以前在测试Spring这一块,大都是通过手动编码的方式(加载XML,setter/getter bean)进行,这里我将为大家介绍一种全新的方式测试Spring程序!

    测试基类

    为什么要提供一个测试基类呢?因为我们可能有很多个测试类,如果有了这个基类,其他测试类继承它,就可以自动获得测试基类的属性了。

    @RunWith  指明采用SpringJunit4进行测试

    @ContextConfiguration 告诉配置文件在哪里

    生产者测试类

    发现没有,这样写Junit单元测试,和以前感觉不一样!

    其实,SpringTest + Junit4还提供了很多功能强大的地方,比如可以设置数据库事务。如果我们在测试的过程结束后,希望回滚数据库的话,很简单,只需要在相应方法上打上注解即可。

    运行结果

    Test Result

    JmsTemplate

    看一下属性:

    JmsTemplate

    很多属性,是不是很熟悉呢?

    JmsTemplate的父类中有一个重要属性:

    pubSubDomain

    默认情况下,是P2P模式,如果将这个属性配置成true,那么将是主题模式。

    OK,到这里这篇博客的内容就介绍完毕了,下一篇是关于ActiveMQ集群方面的知识,See you again.

    本篇博客工程代码下载地址: 

    http://pan.baidu.com/s/1i4HEZsx

    密码:h2hh

    相关文章

      网友评论

      • 8bd68c2c6fa7:bean配置里的pooledConnectionFactory这个bean对应的class全路径是不是写错了,少了jms?另外请教楼主个问题,我参照你写的文章尝试了下,结果一直报错:Cause: java.lang.IllegalStateException: Pool not open,百度上搜索了好久没找到解决方案,不知楼主是否有遇到过
        8bd68c2c6fa7:报错的问题已解决,密码写错了,查看了下activemq的命令控制台,里面打印出用户名或密码错误的信息
      • 108d03a3f1cf:你好,请问activemq.properties中的activemq.brokerURL是哪里的?下载了项目运行不起来
      • Initialxy:想知道pom文件那个图是怎么画的:smile:
        张丰哲:是那个maven依赖树么?是idea自带就可以展示出来的哈:smile:
      • Kalier:想问下web项目下,多个模块(也就是有多个jar)的情况下的SpringTest + Junit4的单元测试,用注释@ContextConfiguration只能加载某个目录下的配置文件,jar包里面的配置文件无法加载,请问有啥解决方法吗?
      • 种出个地球:请教一个学习方法,请问下spring这种非常复杂的配置,大神是在哪里找的,官方项目吗,还是自己看源码得出来,类似这种:
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!-- ActiveMQ Address -->
        <property name="brokerURL" value="${activemq.brokerURL}" />
        <property name="userName" value="${activemq.userName}"></property>
        <property name="password" value="${activemq.password}"></property>
        </bean>
        <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory" />
        <property name="maxConnections" value="${activemq.pool.maxConnections}" />
        </bean>
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory" />
        </bean>

        <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
        <value>${activemq.queueName}</value>
        </constructor-arg>
        </bean>
        Nathans:不知道是不是少了配置,但我看其他文章也是相同配置,但是说需要重写JMSTemplate代码,否则还是自动确认,我感觉不应该吧。
        Nathans:你好,集成Spring后,配置如下:
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="receiveTimeout" value="10000"/>
        <property name="pubSubDomain" value="false"/>
        <property name="sessionTransacted" value="false"/>
        <property name="deliveryMode" value="2"/>
        <property name="sessionAcknowledgeMode" value="2"/>
        </bean>

        <jms:listener-container connection-factory="connectionFactory" acknowledge="client" destination-type="queue">
        <jms:listener destination="study.queue.simple" ref="accountListener" method="onMessage"/>
        </jms:listener-container>
        始终是自动确认,请问是不是少了配置?
        张丰哲:恩恩,一般参考官方的一些文档以及工作中会涉及到这些,另外实际上你可以按照博文的顺序,先学习下非Spring环境下ActiveMQ如何使用,那么在Spring环境下使用ActiveMQ,就容易了。
      • 86c917423624:不错不错,收藏了。

        推荐下,RocketMQ 源码解析 14 篇:http://www.yunai.me/categories/RocketMQ/?jianshu&401
      • 7b9553470376:很棒。谢谢楼主分享~
        张丰哲:喜欢就分享~
      • 倾听心的声音_41bd:持久化订阅为什么Messages Dequeued 一直都是0呀,这个消息什么一致保存下去吗
        种出个地球:@张丰哲 我设置了发送持久化的,(消费、不消费),重启activeMQ后数据也都是被清0了,是不是重启服务数据都是清掉的?
        倾听心的声音_41bd:@张丰哲 我发布了10条消息,其中有2个订阅者,订阅者也都收到消息了,可是Messages Enqueued依旧为10,Messages Dequeued为0
        张丰哲:一直都是0?你重启过ActiveMQ吧,重启后清0的。
      • Twisted丶Fate: 想问下楼主是在哪里学习到的这些知识?我想做个笔录。
        张丰哲:工作中有涉及,再加上这些都是开源技术,网上有很多资料的~
      • qinkangdeid:很棒啊😁
        张丰哲:谢谢~常来看看喔

      本文标题:ActiveMQ从入门到精通(二)

      本文链接:https://www.haomeiwen.com/subject/nclsnttx.html