美文网首页消息中间件
消息队列之三: ActiveMQ

消息队列之三: ActiveMQ

作者: suxin1932 | 来源:发表于2019-10-17 14:12 被阅读0次

问题

如何保证集群高可用?

官网核心链接
http://activemq.apache.org/features
http://activemq.apache.org/using-activemq
http://activemq.apache.org/developers

1.ActiveMQ概述

中间件-JMS-MQ-ActiveMQ之间的关系.png

1.1JMS概述

#概念
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,
用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

#JMS provider
实现JMS接口和规范的消息中间件.
注意: 与producer不同!!!!!!!!!!!!!!!!!!!!!!!!

#JMS message
JMS的消息,JMS消息由三部组成:
1:消息头Head:每个消息头字段都有相应的getter和setter方法
2:消息属性Attribute:如果需要除消息头字段以外的值,那么可以使用消息属性
3:消息体Payload:封装具体的消息数据

#JMS producer
消息生产者,创建和发送JMS消息的客户端应用

#JMS consumer
消息消费者,接收和处理JMS消息的客户端应用.
消息的消费可以采用以下两种方法之一:
1:同步消费:
通过调用消费者的receive方法从目的地中显式提取消息,receive 方法可以一直阻塞到消息到达。
2:异步消费:
客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

#JMS domains (消息域) (消息传送模型)
1.点对点模式(point-to-point, P2P)
client A、C 和 D之间的消息传送说明了点对点模式(P2P)。
producer 使用此模式向"队列queue"目的地发送一条消息,只有一个consumer能够从该destination获得该消息。
访问该destination的其他任何consumer都不能获得该消息。
"queue"模式下:
producer往queue里发送消息,消费者从queue里取,消费一条,就从queue里移除一条。
如果一个消费者消费速度不够快怎么办呢?
在activemq里,提供messageGroup的概念,一个queue可以有多个消费者,
但是他们得标记自己是一个messageGroup里的。
这样,消息会轮训发送给每个消费者,也就是说消费者不会重复消费同一条消息。
但是每条消息只被消费一次。
2.发布/订阅模式(publish-subscribe)
client B、E 和 F之间的消息传送说明了发布/订阅模式(publish-subscribe)。
producer 使用此广播模式向"主题topic"目的地发送一条消息,任意数量的使用方订户都可以从该destination检索此消息。
每个consumer都获得此消息的一个副本。
topics就是广播。producer往broker发消息,每个消息包含topic。
消费者订阅感兴趣的topic,所有订阅了topic的消费者都会收到消息。当然订阅分持久不持久。
持久订阅,当消费者断开一会,再连上来,仍然会把没收到的消费发过来。
不持久的订阅,断开这段时间的消息就收不到了。
"注意: 这里与rabbitmq不同:
rabbitmq的exchange的fanout模式只是将msg路由到绑定了exchange对应的queue, 
rabbitmq中queue中的一条msg最终只能被一个consumer消费.
若想实现一条msg被多个consumer消费, 可创建一个fanout的exchange, 绑定多个queue, 
创建多个不同的consumer, 每个consumer订阅不同的queue来实现."

#消息接收模型
>> 推技术:
消息服务器(broker)主动推送(push)消息到消息消费者(JMS的Pub/Sub采用的是推技术)
>> 拉技术:
消息消费者定时去消息服务器(broker) 取(pull)消息
(JMS接收者能够push或pull消息,取决于它是否使用异步onMessage回调或者是同步receive方法,
同步receive方式就是拉的方式,consumer主动去消息服务器pull消息,
异步的listener方式为push的方式。)


#JMS 消息传送对象
JMS 消息传送的对象在编程域中基本保持不变:
连接工厂、连接、会话、生成方、使用方、消息和目的地。

#Connection factory
连接工厂,用来创建连接对象,以连接到JMS的producer

#JMS Connection
封装了客户与JMS 提供者之间的一个虚拟的连接

#JMS session
是生产和消费消息的一个单线程上下文会话:
用于创建消息生产者(producer), 消息消费者(consumer)和消息(message)等。
会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

#Destination
消息发送到的目的地, 主要是指Queue和Topic

#Acknowledge
签收

#Transaction
事务

#JMS client
用来收发消息的Java应用, 包含producer 和consumer

#Non-JMS client
用来替换JMS API实现收发消息的功能,
通常会提供其他的一些特性,比如:CORBA、RMI等。

#Administered objects
预定义的JMS对象,通常在provider规范中有定义,提供给JMS客户端来访问,比如: ConnectionFactory和Destination
JMS 消息传送模式.png JMS1.X的api规范.png JMS2的api规范.png

1.1.1JMS的消息结构

JMS消息的组成:消息头,属性和消息体

#消息头
消息头包含消息的识别信息和路由信息,一些标准的属性如下:
1:JMSDestination:
消息发送的目的地,主要是指Queue和Topic,由send方法设置

2:JMSDeliveryMode:
传送模式, 包括持久模式和非持久模式, 由send方法设置
>> 一条持久性的消息应该被传送“一次仅仅一次”,这就意味者如果JMS提供者出现故障,
该消息并不会丢失,它会在服务器恢复之后再次传递。
>> 一条非持久的消息最多会传送一次,这意味这服务器出现故障,该消息将永远丢失。

3:JMSExpiration:
由send方法设置。
消息过期时间 = Destination的send方法中的timeToLive值 + 发送时刻的GMT时间值。
>> 如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。
>> 如果timeToLive值不等于零,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。

4:JMSPriority:
消息优先级,由send方法设置。
从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。默认是4级。
JMS不要求JMSProvider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。

5:JMSMessageID:
唯一识别每个消息的标识,由JMS Provider 产生。

6:JMSTimestamp:
一个JMS Provider在调用send()方法时自动设置的。
它是消息被发送和消费者实际接收的时间差。

7:JMSCorrelationID :
用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。
在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,
不过,JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。
由开发者设置。

8:JMSReplyTo :
提供本消息回复消息的目的地址。由开发者设置。

9:JMSType :
消息类型的识别符。由开发者设置。

10:JMSRedelivered:
如果一个客户端收到一个设置了JMSRedelivered属性的消息,
则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。
如果该消息被重新传送,JMSRedelivered=true反之,JMSRedelivered =false。
由JMS Provider设置

====================================================
#消息属性
包含以下三种类型的属性:
1:应用程序设置和添加的属性,比如:
Message.setStringProperty(“username”,username);
2:JMS定义的属性
使用“JMSX”作为属性名的前缀,
connection.getMetaData().getJMSXPropertyNames(),
方法返回所有连接支持的JMSX 属性的名字。

3:JMS供应商特定的属性
JMS定义的属性如下:
>> JMSXUserID:发送消息的用户标识,发送时提供商设置
>> JMSXAppID:发送消息的应用标识,发送时提供商设置
>> JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2,… ,发送时提供商设置
>> JMSXGroupID:消息所在消息组的标识,由客户端设置
>> JMSXGroupSeq:组内消息的序号第一个消息是1,第二个是2,…,由客户端设置
>> JMSXProducerTXID :产生消息的事务的事务标识,发送时提供商设置
>> JMSXConsumerTXID :消费消息的事务的事务标识,接收时提供商设置
>> JMSXRcvTimestamp :JMS 转发消息到消费者的时间,接收时提供商设置
>> JMSXState:假定存在一个消息仓库,它存储了每个消息的单独拷贝,且这些消息从原始消息被发送时开始。
每个拷贝的状态有:1(等待),2(准备),3(到期)或4(保留)。
由于状态与生产者和消费者无关,所以它不是由它们来提供。
它只和在仓库中查找消息相关,因此JMS没有提供这种API。
由提供商设置。

====================================================
#消息体
消息体,JMS API定义了5种消息体格式,也叫消息类型,
可以使用不同形式发送接收数据,并可以兼容现有的消息格式。包括:
>> TextMessage
>> MapMessage
>> BytesMessage
>> StreamMessage
>> ObjectMessage

1.1.2消息分发机制

#1.消息选择器
在destination上使用消息选择器,
利用消息属性Attribute和消息头Payload(无法使用消息体内的数据)作为条件表达式的准则,
消息在destination(队列/主题)分发给消费者之前就过滤好。
(rabbitmq有基于routing key的表达式过滤方式,来选择接收哪几个topic的消息)

#2.消费者控制还是生产者控制
两种方案:
(1)MessageFilter
消费者控制了消息过滤,并决定它要接受什么消息.
消息在目的地分发给消费者之前过滤。
使用MessageFitler的优点是,具有更强的可伸缩性,
假设增加了一个CustType为PLATINUM级别,那么只要增加一个相应的消息消费者就可以。
(2)Multiple Destination
多Destination方式,生产者控制消息分发.
消息在发送给Destination之前过滤,分发给不同Destination不同的消息.
如果是使用Multiple Destination方式,就是增加一个队列来保存PLATINUM,
同时还得增加一个类来监听这个新的队列。
MultipleDestination的好处是生产者控制消息过滤分类,不容易出错,
但是可扩展性稍差,MessageFilter是可扩展性好,但是容易出错。

#3.未能过滤掉的消息如何处理
1) 发布订阅模型:
这些消息不会传送给该订阅者,不论是持久订阅还是非持久订阅
2) 点对点模型:
未被消费者选择的所有消息,对该消费者都是不可见的。
确认由producer生产的所有消息各自都有和它们相关联的有效期,
默认情况是永不过期,那么意味着如果一条消息被过滤掉,没有传送给consumer,
它将在队列中永久驻留,可以通过设置生存时间来解决。

有些厂商提出了死信队列(Dead Letter Queue,DLQ)
或停用消息队列(Dead Message Queue,DMQ)的概念,
来处理那些被认为无法传送的消息。

最简单的情况,消息传送系统将所有无法传送的消息放入DMQ,而应用程序负责监测它的内容。
也可以支持管理型事件,能够在消息放入DMQ时通知应用程序。

对于rabbitmq,有一个dead letter的机制, 当消息在一个队列中变成死信后,
它能被重新publish到另一个Exchange,这个Exchange就是DLX.

1.1.3消息可靠性基础

#1.消息保存转发机制
消息传送保证机制(guaranteed delivery), 
确保即便发生了具备故障,预定消费者最终也会接收到这条消息。
当消费者出故障时,将消息保存到持久化介质中,等待消费者恢复之后,
从持久介质取出消息,转发给消费者.

#2.DeliveryMode
public interface DeliveryMode {
    static final int NON_PERSISTENT = 1;
    static final int PERSISTENT = 2; #默认模式
}
其本质上,还是利用消息头的deliveryMode属性来标记的。
可以有两个地方可以设置:
(1) 对于消息生产者来说
是发送消息的时候,设置是否持久化;
"msg.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);"
默认就是持久化模式的。
(2)对于消息消费者来说
是在接收消息之前设置传送模式。
比如对于topic提供durableSubscriber的方法。
public interface TopicSession extends Session {
    ...
    TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
    TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException;
    ...
}

#3.丢失重连
当客户端和服务器之间的网络连接丢失时,JMS提供者必须尽可能重新建立连接。
如果该JMS提供者无法重新自动连接,在客户端调用某种能够引起网络流量的方法时,
提供者必须抛出一个异常,向客户端通知这个情况。
JMS提供了ExceptionListener接口,用于捕获丢失的连接(可以在捕获时重连),并向客户端通知这个情况。
与MessageListener不同,MessageListener是与会话绑定在一起的。

1.1.4消息确认机制

一般是自动确定、手动确定。

// 1.producer发送消息时
TopicPublisher.publish和QueueSender.send方法是同步的,
这些方法负责发送消息,同时进行阻塞,直到从消息服务器接收到一个确认为止。
一旦接收到一个确认,执行线程就会恢复并返回方法,认为消息发送成功。
底层确认对客户端编程模型来说是不可见的。
如果在这个操作期间发生了一个故障情况,就会抛出一个异常,
同时认为该消息未被传送(注意重新发送指的是消费服务器到消费者的重新发送)。

// 2.consumer接收消息时
如果会话是AUTO_ACKNOWLEDGE模式,当每个消费者获得消息时,
JMS提供者的客户端运行时环境必须自动向服务器发送确认信息。
如果服务器没有接收到这个确认信息,它就会认为该消息未被传送,并可能会试图重新传送。

// 3.消息服务器收发消息时
(1) 服务器接收到生产者的消息,发送确认给生产者
确认消息从服务器发送到生产者,意味着服务器已经接收到该消息,并已经承担了传送它的责任。
从JMS服务器的角度来看,发送到生产者的确认并未和消息传送直接关联。
逻辑上,它们是两个独立的步骤。
A、对于持久消息来说,服务器将消息写入磁盘,然后在再通知生产者该消息已经被接收。
B、对于非持久消息,意味着服务器能够在接收到消息后立刻通知发送者,并将该消息存入内存。
如果该消息的主题没有订阅者,根据厂商的不同,也可能会将该消息抛弃。
(2) 消息消费者接收时确认
A、对于持久订阅者来说
一直到消息服务器接收到所有的消息预定接收者的确认时,消息服务器才会认为该消息已经完成传送。
要获得这些信息,必须对每个消费者都非常了解:
哪一个客户端已经接收到每条消息,哪一个还没有接收到。
一旦消息服务器将消息传送给所有的已知订阅者,并已分别从订阅者那里接收到确认,
就会将这条消息从持久存储器中删除。
如果持久订阅,而订阅者当前并未连接,那么消息服务器会将该消息保存起来,
直到该订阅者变成可用状态或消息到期为止。
甚至对于非持久消息来说,也是如此。
也就是说,如果消息消费者设定为持久订阅,则不管消息生产者设定消息是否是持久的,
当消费者不在时,总是保存,恢复时转发。
B、对于非持久性消息来说
在消息服务器已经向发送者确认消息后,
以及消息服务器有机会代表未连接的持久订阅者将消息写入磁盘之前,
二者之间可能会有一个时间窗,如果JMS在这个时间窗内出现故障,该消息就可能丢失。
C、如果是使用持久消息时
一个提供者可能会出现故障,但是会优雅恢复正常。
由于消息保存在持久存储器中,它们并没有丢失,在提供者再次启动时,它们又会传送给消费者。
如果是点对点队列,它们能够保证被传送出去;
如果是发布订阅发送,只有消费者的订阅为持久性时,才能被保证传送出去,非持久订阅的传送行为因提供者不同而不同。

1.1.5消息的事务机制

// 1.producer提交消息的事务
生产者在commit之前,JMS提供者不会开始向它的消费者传送消息,
即使它已经从发送者那里接收到所有的消息。
发送消息时,如果在发送消息的方法正常完成后没有调用commit方法,
JMS提供者会从队列中删除这些消息,而这些消息并不会传送给消费者。

// 2.consumer消费消息的事务
消息会尽可能快地传送给consumer,但是它们一直由JMS提供者(即broker)保存,
直到consumer在会话对象上发布commit为止,如果发生了故障或调用了rollback,
broker会试图重新传送这些消息,这种情况下,这些消息会设置为重新传送标记。
接收消息时,如果在接收消息方法正常完成后没有调用commit方法,
消息就会被标记为未被传送,JMS提供者(broker)会将这些消息重新传送给消费者,
并将JMSRedelivered标记为true,表示此前曾试图处理过这些消息。

1.2什么是ActiveMQ

http://activemq.apache.org/using-activemq (官网)

1.2.1基础

ActiveMQ是Apache软件基金下的一个开源软件,它遵循JMS1.1规范(Java Message Service), 是消息驱动中间件软件(MOM)。
它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。

1.2.2基本架构

#传输协议:
ActiveMQ提供了广泛的连接模式,其中主要包括SSL、STOMP 、XMPP;
ActiveMQ默认的使用的协议是openWire,端口号:61616.

#消息域:
ActiveMQ主要包含Point-to-Point (点对点),Publish/Subscribe Model (发布/订阅者)模式.
其中在Publich/Subscribe 模式下又有Nondurable/durable subscription 2种消息处理方式.

#消息存储:
在消息传递过程中,部分重要的消息可能需要存储到数据库或文件系统中,当中介崩溃时,信息不回丢失.

#Cluster  (集群): 
最常见到 集群方式包括network of brokers和Master Slave.

#Monitor (监控) :
ActiveMQ一般由jmx来进行监控.
ActiveMQ整体架构.png ActiveMQ消息传递图.png

1.2.3主要特点

#稳定性:
失败重连机制failover,持久化服务,容错机制,多种恢复机制.

#高效性:
支持多种传送协议TCP,SSL,NIO,UDP,集群服务消息在多个代理之间转发防止消息丢失,
支持超快的JDBC消息持久化和高效的日志系统.

#可扩展:
activemq的高级特性都可以配置的形式来表现,
很好的实现例如游标,容错机制,消息group及监控服务,
同时扩展了很多成熟的框架spring使得其使用更加成熟

#高级特性:
>> 消息群组(Message Groups)
>> 虚拟端点(Virtual Destinations)
>> 通配符(Wildcards)
>> 复合端点(Composite Destinations)

1.3ActiveMQ相关概念

http://activemq.apache.org/developers (官网 ActiveMQ 设计)

1.3.1死信队列 (防止消息丢失)


http://activemq.apache.org/message-redelivery-and-dlq-handling.html (死信队列官网)
https://www.cnblogs.com/rainwang/p/5146223.html (死信队列官网翻译)

https://www.iteye.com/blog/shift-alt-ctrl-2378868 (集群与架构)

1.4应用范围

// 1.发送邮件
最经典的就是当用户注册时,我们就需要用activeMQ来做为中间件,当用户注册后,
我门把用户的邮箱号和验证码等信息通过activeMQ的生产端发送到activeMQ的消息队列中,
而一旦消息队列中出现了数据,我们的邮件模块通过实时的监控activeMQ的消息队列就能通过消费端获取到这个数据,
邮件模块就会自行的去对数据进行解析,给用户发送邮件

// 2.发送短信
原理同发送邮件相同

// 3.同步索引库
为了缓解数据库的压力,我们把经常被调用的数据放入索引库(solr, es)中,
当有请求查询时,我们会先去查询索引库,如果索引库内有数据,那么我们就不用就数据库进行查询,
这样就能大大的减轻服务器的压力,可是随之而来的一个问题是,
假如我们服务器内的数据已经发生了改变,而浏览用户查询数据时,
因为索引库中已经有数据了,那么这样一来数据库与索引库的数据就不一致了,那么怎么解决这个问题呢?
可以通过用activeMQ来监听数据库的操作来实现数据库与索引库的数据同步,
当后台管理员或房产经纪人对数据库的数据进行了增删改的操作时,
我们通过activeMQ监听到了数据的改变,获取到被修改的数据的id,
然后在另一个服务模块中通过这个数据的id去数据库先查询一把,
然后根据查询结果进行判断,再去做索引库的数据同步。
打个比方,如果查询结果返回的是空,就说明商品已经被删除,
那么我们就可以根据数据的id去把索引库中的数据也一并删除了。

2.ActiveMQ安装与使用

2.1windows版本安装 (单机)

// 1.下载
http://activemq.apache.org/components/classic/download/
// 2.解压
// 3.修改配置文件(此步骤可选)
D:\soft_for_dev\apache-activemq-5.15.10\conf\
(此目录下的配置文件可修改端口号, 管理界面用户名密码等)
// 4.右键-->以管理员身份运行 "启动脚本"文件
D:\soft_for_dev\apache-activemq-5.15.10\bin\win64\activemq.bat
// 5.进入管理界面
http://localhost:8161/admin
(username: admin;  password: admin)
ActiveMQ管理界面--居然还是jsp?.png

2.2linux集群版

https://www.cnblogs.com/arjenlee/p/9303229.html (真集群 & 伪集群)

3.ActiveMQ的配置文件

4.spring系列中使用ActiveMQ

4.1spring-boot中使用ActiveMQ(同时支持queue类型和topic类型)

4.1.1producer

application.yml

application.yml.png

configuration

producer-配置类.png

controller

producer发消息.png

4.1.2consumer

application.yml

application.yml.png

configuration

consumer-配置类.png

listener

consumer-消费消息.png

参考资源
https://segmentfault.com/a/1190000004247661 (JMS)
https://blog.51cto.com/1754966750/1914850 (JMS)
https://segmentfault.com/a/1190000014958916?utm_source=tag-newest (JMS参考)
https://yq.aliyun.com/articles/44586 (ActiveMQ整体架构)
https://www.cnblogs.com/Survivalist/p/8094069.html (ActiveMQ应用)
https://blog.csdn.net/cs_hnu_scw/article/details/81040834 (大杂烩)

相关文章

网友评论

    本文标题:消息队列之三: ActiveMQ

    本文链接:https://www.haomeiwen.com/subject/ysgmmctx.html