ActiveMQ

作者: Java及SpringBoot | 来源:发表于2018-09-10 10:30 被阅读228次

一、JMS简介

全称:Java Message Service 中文:Java消息服务。

JMS是Java的一套API标准,最初的目的是为了使应用程序能够访问现有的MOM系统(MOM是Message Oriented Middleware的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。);后来被许多现有的MOM供应商采用,并实现为MOM系统。【常见MOM系统包括Apache的ActiveMQ、阿里巴巴的RocketMQ、IBM的MQSeries、Microsoft的MSMQ、BEA的RabbitMQ等。(并非全部的MOM系统都遵循JMS规范)】

基于JMS实现的MOM,又被称为JMS Provider。

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。如:

跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。

多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。

在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。

应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。

消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。

应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。

跨局域网,甚至跨城市的通讯,比如北京机房与广州机房的应用程序的通信。

二、ActiveMQ简介

多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去

通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

支持通过JDBC和journal提供高速的消息持久化

从设计上保证了高性能的集群,客户端-服务器,点对点

支持Ajax

支持与Axis的整合, WebServices

可以很容易的调用内嵌JMS provider,进行测试

1 Destination

目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收消息。

2 Producer

消息生成者(客户端、生成消息),负责发送Message到目的地。应用接口为MessageProducer。在JMS规范中,所有的标准定义都在javax.jms包中。

3 Consumer【Receiver】

消息消费者(处理消息),负责从目的地中消费【处理|监听|订阅】Message。应用接口为MessageConsumer

4 Message

消息(Message),消息封装一次通信的内容。常见类型有:StreamMessage、BytesMessage、TextMessage、ObjectMessage、MapMessage。

5 ConnectionFactory

链接工厂, 用于创建链接的工厂类型。 注意,不能和JDBC中的ConnectionFactory混淆。

6 Connection

链接. 用于建立访问ActiveMQ连接的类型, 由链接工厂创建. 注意,不能和JDBC中的Connection混淆。

7 Session

会话, 一次持久有效有状态的访问. 由链接创建. 是具体操作消息的基础支撑。

8 Queue & Topic

Queue是队列目的地,Topic是主题目的地。都是Destination的子接口。

Queue特点: 队列中的消息,默认只能由唯一的一个消费者处理。一旦处理消息删除。

Topic特点:主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处理的业务场景中可使用。

9 PTP

Point to Point。点对点消息模型。就是基于Queue实现的消息处理方式。

10 PUB & SUB

Publish & Subscribe 。消息的发布/订阅模型。是基于Topic实现的消息处理方式。

三、ActiveMQ安装

1 下载资源

ActiveMQ官网: http://activemq.apache.org

1.1 版本说明

ActiveMQ5.10.x以上版本必须使用JDK1.8才能正常使用。

ActiveMQ5.9.x及以下版本使用JDK1.7即可正常使用。

2 上传至Linux服务器

3 解压安装文件

tar -zxf apache-activemq-5.9.0-bin.tar.gz

4 检查权限

ls -al apache-activemq-5.9.0/bin

如果权限不足,则无法执行,需要修改文件权限:

chmod 755 activemq

5 复制应用至本地目录

cp -r apache-activemq-5.9.0 /usr/local/activemq

6 配置文件简介

/usr/local/activemq/conf/* - 配置文件.

需要关注的配置文件有: activemq.xml, jetty.xml, users.properties

任何配置文件修改后,必须重启ActiveMQ,才能生效.

6.1 activemq.xml

就是spring配置文件. 其中配置的是ActiveMQ应用使用的默认对象组件.

transportConnectors标签 - 配置链接端口信息的. 其中的端口号61616是ActiveMQ对外发布的tcp协议访问端口. 就是java代码访问ActiveMQ时使用的端口.

6.2 jetty.xml

spring配置文件, 用于配置jetty服务器的默认对象组件.

jetty是类似tomcat的一个中间件容器.

ActiveMQ默认支持一个网页版的服务查看站点. 可以实现ActiveMQ中消息相关数据的页面查看.

8161端口, 是ActiveMQ网页版管理站点的默认端口.

在ActiveMQ网页版管理站点中,需要登录, 默认的用户名和密码都是admin.

6.3 users.properties

内容信息: 用户名=密码

是用于配置客户端通过协议访问ActiveMQ时,使用的用户名和密码.

7 启动ActiveMQ

/usr/local/activemq/bin/activemq start

8 测试ActiveMQ

8.1 检查进程

ps aux | grep activemq

8.2 管理界面

使用浏览器访问ActiveMQ管理应用, 地址如下:

http://ip:8161/admin/

用户名: admin

密码: admin

ActiveMQ使用的是jetty提供HTTP服务.启动稍慢,建议短暂等待再访问测试.

8.3 修改访问端口

修改ActiveMQ配置文件: /usr/local/activemq/conf/jetty.xml

配置文件修改完毕,保存并重新启动ActiveMQ服务。

9 重启ActiveMQ

/usr/local/activemq/bin/activemq restart

10 关闭ActiveMQ

/usr/local/activemq/bin/activemq stop

四、ActiveMQ应用

1 PTP处理模式(Queue)

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。

当消费者不存在时,消息会一直保存,直到有消费消费

clip_image002.jpg

2 Publish/Subscribe处理模式(Topic)

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。

和点对点方式不同,发布到topic的消息会被所有订阅者消费。

当生产者发布消息,不管是否有消费者。都不会保存消息

一定要先有消息的消费者,后有消息的生产者。

clip_image004.jpg

3 PTP和PUB/SUB简单对比

Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point点对点
有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kahadb下面。也可以配置成DB存储。
完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。消息不超时。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。前提是消息不超时。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

五、ActiveMQ安全认证

ActiveMQ也提供了安全认证。就是用户名密码登录规则。ActiveMQ如果需要使用安全认证的话,必须在activemq的核心配置文件中开启安全配置。配置文件就是conf/activemq.xml

在conf/activemq.xml配置文件的broker标签中增加下述内容。

<jaasAuthenticationPlugin configuration="activemq" />指定了使用JAAS插件管理权限,至于configuration="activemq"是在login.conf文件里定义的

<authorizationEntry topic="名字" read="用户组名" write="用户组名" admin="用户组名" />指定了具体的Topic/Queue与用户组的授权关系

<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>这个是必须的配置,不能少

<plugins>
            <!--  use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<!--  添加jaas认证插件activemq在login.config里面定义,详细见login.config-->

            <jaasAuthenticationPlugin configuration="activemq" />
            <!--  lets configure a destination based authorization mechanism -->
            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                            <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                            <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                            <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                            <authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                        </authorizationEntries>
                    </authorizationMap>
                </map>
            </authorizationPlugin>
</plugins> 

开启认证后,认证使用的用户信息由其他配置文件提供。

conf/login.config

activemq {
    org.apache.activemq.jaas.PropertiesLoginModule required
        org.apache.activemq.jaas.properties.user="users.properties"
        org.apache.activemq.jaas.properties.group="groups.properties";
};

user代表用户信息配置文件,group代表用户组信息配置文件。寻址路径为相对当前配置文件所在位置开始寻址。

conf/users.properties #用户名=密码 admin=admin

conf/groups.properties #用户组名=用户名,用户名 admins=admin

六、ActiveMQ的持久化

ActiveMQ中,持久化是指对消息数据的持久化。在ActiveMQ中,默认的消息是保存在内存中的。当内存容量不足的时候,或ActiveMQ正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。

ActiveMQ的默认存储策略是kahadb。如果使用JDBC作为持久化策略,则会将所有的需要持久化的消息保存到数据库中。

所有的持久化配置都在conf/activemq.xml中配置,配置信息都在broker标签内部定义。

1 kahadb方式

是ActiveMQ默认的持久化策略。kahadb是一个文件型数据库。是使用内存+文件保证数据的持久化的。kahadb可以限制每个数据文件的大小。不代表总计数据容量。

<persistenceAdapter>
 <!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
 <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>

特性是:1、日志形式存储消息;2、消息索引以B-Tree结构存储,可以快速更新;3、完全支持JMS事务;4、支持多种恢复机制;

2 AMQ方式

只适用于5.3版本之前。

AMQ也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

<persistenceAdapter>
   <!-- directory:保存数据的目录 ; maxFileLength:保存消息的文件大小 -->
   <amqPersistenceAdapter directory="${activemq.data}/amq" maxFileLength="32mb"/>
</persistenceAdapter>

性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。

当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档。

主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。

而且由于索引巨大,一旦Broker(ActiveMQ应用实例)崩溃,重建索引的速度会非常慢。

虽然AMQ性能略高于Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。

3 JDBC持久化方式

ActiveMQ将数据持久化到数据库中。 不指定具体的数据库。 可以使用任意的数据库中。 本环节中使用MySQL数据库。

下述文件为activemq.xml配置文件部分内容。不要完全复制。

首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

<broker brokerName="test-broker" persistent="true" 
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" 
createTablesOnStartup="false"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" 
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

配置成功后,需要在数据库中创建对应的database,否则无法访问。表格ActiveMQ可以自动创建。

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:

ID:自增的数据库主键

CONTAINER:消息的Destination

MSGID_PROD:消息发送者客户端的主键

MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID

EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数

MSG:消息本体的Java序列化对象的二进制数据

PRIORITY:优先级,从0-9,数值越大优先级越高

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

主要的数据库字段如下:

CONTAINER:消息的Destination

SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分

SUB_NAME:订阅者名称

SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

LAST_ACKED_ID:记录消费过的消息的ID。

表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,

其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪个Broker是当前的Master Broker。

只有在消息必须保证有效,且绝对不能丢失的时候。使用JDBC存储策略。

如果消息可以容忍丢失,或使用集群/主备模式保证数据安全的时候,建议使用levelDB或Kahadb。

七、API简介

1 Producer API简介

1.1 发送消息

MessageProducer.

send(Message message);发送消息到默认目的地,就是创建Producer时指定的目的地。

send(Destination destination, Message message); 发送消息到指定目的地,Producer不建议绑定目的地。也就是创建Producer的时候,不绑定目的地。session.createProducer(null)。

send(Message message, int deliveryMode, int priority, long timeToLive);发送消息到默认目的地,且设置相关参数。deliveryMode-持久化方式(DeliveryMode.PERSISTENT| DeliveryMode.NON_PERSISTENT)。priority-优先级。timeToLive-消息有效期(单位毫秒)。

send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive); 发送消息到指定目的地,且设置相关参数。

1.2 消息有效期

消息过期后,默认会将失效消息保存到“死信队列(ActiveMQ.DLQ)”。

不持久化的消息,在超时后直接丢弃,不会保存到死信队列中。

死信队列名称可配置,死信队列中的消息不能恢复。

死信队列是在activemq.xml中配置的。

1.3 消息优先级

不需特殊关注。

我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。

JMS标准中约定priority可以为09的整数数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持09,因为JDBC存储器可以基于priority对消息进行排序和索引化;但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。

1.3.1 开启

在broker端,默认是不存储priority信息的,我们需要手动开启,修改activemq.xml配置文件,在broker标签的子标签policyEntries中增加下述配置:

<policyEntry queue=">" prioritizedMessages="true"/>  

不过对于“非持久化”类型的消息(如果没有被swap到临时文件),它们被保存在内存中,它们不存在从文件Paged in到内存的过程,因为可以保证优先级较高的消息,总是在prefetch的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。

Broker在收到Producer的消息之后,将会把消息cache到内存,如果消息需要持久化,那么同时也会把消息写入文件;如果通道中Consumer的消费速度足够快(即积压的消息很少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来cache消息),那么消息几乎不需要从存储文件中Paged In,直接就能从内存的cache中获取即可,这种情况下,priority可以担保“全局顺序”;不过,如果消费者滞后太多,cache已满,就会触发新接收的消息直接保存在磁盘中,那么此时,priority就没有那么有效了。

在Queue中,prefetch的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个consumer的pending buffer中,比如有m1-m2-m3-m4四条消息,有C1-C2两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上m2的权重>m3,对于C1而言,它似乎丢失了“顺序性”。

1.3.2 强顺序
<policyEntry queue=">" strictOrderDispatch="true"/>

strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;不过不要误解它为“全局严格顺序”,它只不过是将prefetch的消息依次填满每个consumer的pending buffer。比如上述例子中,如果C1-C2两个消费者的buffer尺寸为3,那么(m1,m2,m3)->C1,(m4)->C2;当C1填充完毕之后,才会填充C2。由此这种策略可以保证buffer中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch并非是解决priority消息顺序的问题而生,只是在使用priority时需要关注它)。

1.3.3 严格顺序
policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>

useCache=false来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消息的转发效率);queuePrefetch=1来约束每个consumer任何时刻只有一个消息正在处理,那些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读取肯定都是priority最高的消息。

2 Consumer API简介

2.1 消息的确认

Consumer拉取消息后,如果没有做确认acknowledge,此消息不会从MQ中删除。

消息的如果拉去到consumer后,未确认,那么消息被锁定。如果consumer关闭的时候仍旧没有确认消息,则释放消息锁定信息。消息将发送给其他的consumer处理。

消息一旦处理,应该必须确认。类似数据库中的事务管理机制。

2.2 消息的过滤

对消息消费者处理的消息数据进行过滤。这种处理可以明确消费者的角色,细分消费者的功能。

设置过滤:

Session.createConsumer(Destination destination, String messageSelector);

过滤信息为字符串,语法类似SQL92中的where子句条件信息。可以使用诸如AND、OR、IN、NOT IN等关键字。详细内容可以查看javax.jms.Message的帮助文档。

注意:消息的生产者在发送消息的的时候,必须设置可过滤的属性信息,所有的属性信息设置方法格式为:setXxxxProperty(String name, T value)。 其中方法名中的Xxxx是类型,如setObjectProperty/setStringProperty等。

八、Spring&ActiveMQ

ActiveMQ的开发,和Spring的整合是非常方便的。且Spring有对JMS提供的Template机制。所以Spring管理ActiveMQ访问操作是非常方便的。

九、ActiveMQ集群

使用ZooKeeper+ActiveMQ实现主从和集群.

1 Master-Slave

主从模式是一种高可用解决方案。在ZooKeeper中注册若干ActiveMQ Broker,其中只有一个Broker提供对外服务(Master),其他Broker处于待机状态(Slave)。当Master出现故障导致宕机时,通过ZooKeeper内部的选举机制,选举出一台Slave替代Master继续对外提供服务。

clip_image006.gif

官方文档:http://activemq.apache.org/replicated-leveldb-store.html

1.1 安装ZooKeeper

搭建伪集群,在同一个Linux中安装三个ZooKeeper实例。使用不同的端口实现同时启动。端口分配如下:

主机 服务端口 投票端口 选举端口
192.168.159.130 2181 2881 3881
192.168.159.130 2182 2882 3882
192.168.159.130 2183 2883 3883
1.1.1 解压缩

tar -zxf zookeeper

1.1.2 复制

cp -r zookeeper /usr/local/zookeeper1

1.1.3 创建data数据目录

在zookeeper1目录中创建子目录data目录

mkdir data

1.1.4 编写Zookeeper配置文件

vi /usr/local/solrcloude/zookeeper1/conf/zoo.cfg

修改数据目录

1.1.5 复制两份同样的Zookeeper

cp zookeeper1 zookeeper2 -r

cp zookeeper1 zookeeper3 -r

1.1.6 为Zookeeper服务增加服务命名

在每个Zookeeper应用内的data目录中增加文件myid

内部定义每个服务的编号. 编号要求为数字,是正整数

可以使用回声命名快速定义myid文件

echo 1 >> myid

1.1.7 修改Zookeeper配置文件 zoo.cfg

修改端口号.

提供多节点服务命名

port=2181 客户端访问端口. 三个Zookeeper实例不能端口相同.
server.编号=IP:投票端口:选举端口
投票端口: 用于决定正在运行的主机是否宕机.
选举端口: 用于决定哪一个Zookeeper服务作为主机.
三个Zookeeper应用配置一致.
server.1=192.168.120.132:2881:3881
server.2=192.168.120.132:2882:3882
server.3=192.168.120.132:2883:3883 
1.1.8 启动Zookeeper测试

要至少启动两个Zookeeper启动. 启动单一Zookeeper,无法正常提供服务.

1.2 安装ActiveMQ

在同一个Linux中安装三个ActiveMQ实例,使用不同端口实现同时启动。端口分配如下:

主机 M-S通讯端口 服务端口 jetty端口
192.168.159.130 62626 61616 8161
192.168.159.130 62627 61617 8162
192.168.159.130 62628 61618 8163
1.2.1 安装ActiveMQ实例
1.2.2 修改配置信息
1.2.2.1 修改jetty端口

修改conf/jetty.xml中的端口配置。分别是8161、8162、8163

   <bean id="jettyPort"   class="org.apache.activemq.web.WebConsolePort"   init-method="start">                <!-- the default port number   for the web console -->           <property name="port"   value="8161"/>       </bean>   
1.2.2.2 统一所有主从节点Broker命名

修改conf/activemq.xml文件。修改broker标签属性信息,统一所有节点的broker命名。

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}">
1.2.2.3 修改持久化配置

修改conf/activemq.xml文件。修改broker标签中子标签persistenceAdapter相关内容。

replicas属性代表当前主从模型中的节点数量。按需配置。

bind属性中的端口为主从实例之间的通讯端口。代表当前实例对外开放端口是什么,三个实例分别使用62626、62627、62628端口。

zkAddress属性代表ZooKeeper安装位置,安装具体情况设置。

zkPath是ActiveMQ主从信息保存到ZooKeeper中的什么目录内。

hostname为ActiveMQ实例安装Linux的主机名,可以在/etc/hosts配置文件中设置。设置格式为:IP 主机名。 如: 127.0.0.1 mq-server

<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
 <replicatedLevelDB
   directory="${activemq.data}/levelDB"
   replicas="3"
   bind="tcp://0.0.0.0:62626"
   zkAddress="192.168.159.130:2181,192.168.159.130:2182,192.168.159.130:2183"
   zkPath="/activemq/leveldb-stores"
   hostname="mq-server"
   />
</persistenceAdapter>
1.2.2.4 修改服务端口

修改ActiveMQ对外提供的服务端口。原默认端口为61616。当前环境使用的端口为:61616、61617、61618。

修改conf/activemq.xml配置文件。修改broker标签中子标签transportConnectors的相关配置。只修改强调内容。

<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

1.3 启动主从

将三个ActiveMQ实例分别启动。{activemq-home}/bin/active start。启动后,可以查看日志文件,检查启动状态,日志文件为{activemq-home}/data/activemq.log。

1.4 查看主从状态

1.4.1 使用客户端连接ZooKeeper

${zkHome}/bin/zkCli.sh

1.4.2 查看状态信息

连接成功后,可以使用命令‘ls’查看ZooKeeper中的目录结构

如:

ls /

ls /activemq/leveldb-stores

找到对应的内容后,可以使用命令‘get’查看ZooKeeper中的数据内容

get /activemq/leveldb-stores/00000000005

其中主节点的elected及address属性一定有数据。从节点则数据为‘null’。

2 集群

准备多份主从模型。在所有的ActiveMQ节点中的conf/activemq.xml中增加下述配置:(每个主从模型中的networkConnector都指向另外一个主从模型)

<networkConnectors>
 <networkConnector uri="static://(tcp://ip:port,tcp://ip:port)" duplex="false">
 </networkConnector>
</networkConnectors>

注意配置顺序,Networks相关配置必须在持久化相关配置之前。如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.apache.org/schema/core">
   <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}" >
       <networkConnectors>
           <networkConnector uri=" static://(tcp://ip:port,tcp://ip:port)"/>
       </networkConnectors>
       <persistenceAdapter>
           < replicatedLevelDB directory = "xxx"/>       
       </persistenceAdapter>
   </broker>
</beans>

如: 主从模型1 - 192.168.159.129 主从模型2 - 192.168.159.130

在主从模型1的所有节点activemq.xml配置文件中增加标签:
<networkConnectors>
<networkConnector uri="static://(tcp://192.168.159.130:61616,tcp://192.168.159.130:61617)"/>
</networkConnectors>

在模型2中所有节点增加配置:
<networkConnectors>
<networkConnector uri="static://(tcp://192.168.159.129:61616,tcp://192.168.159.129:61617)"/>
</networkConnectors>

相关文章

网友评论

      本文标题:ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/rbgjgftx.html