一、JMS简介
全称:Java Message Service 中文:Java消息服务。
JMS是Java的一套API标准,最初的目的是为了使应用程序能够访问现有的MOM系统(MOM是Message Oriented Middleware的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。);后来被许多现有的MOM供应商采用,并实现为MOM系统。【常见MOM系统包括Apache的ActiveMQ、阿里巴巴的RocketMQ、IBM的MQSeries、Microsoft的MSMQ、BEA的RabbitMQ等。(并非全部的MOM系统都遵循JMS规范)】
基于JMS实现的MOM,又被称为JMS Provider。
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。如:
跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。
多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。
在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。
应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。
消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。
应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。
跨局域网,甚至跨城市的通讯,比如北京机房与广州机房的应用程序的通信。
二、ActiveMQ简介
多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去
通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通过JDBC和journal提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持Ajax
支持与Axis的整合, WebServices
可以很容易的调用内嵌JMS provider,进行测试
1 Destination
目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收消息。
2 Producer
消息生成者(客户端、生成消息),负责发送Message到目的地。应用接口为MessageProducer。在JMS规范中,所有的标准定义都在javax.jms包中。
3 Consumer【Receiver】
消息消费者(处理消息),负责从目的地中消费【处理|监听|订阅】Message。应用接口为MessageConsumer
4 Message
消息(Message),消息封装一次通信的内容。常见类型有:StreamMessage、BytesMessage、TextMessage、ObjectMessage、MapMessage。
5 ConnectionFactory
链接工厂, 用于创建链接的工厂类型。 注意,不能和JDBC中的ConnectionFactory混淆。
6 Connection
链接. 用于建立访问ActiveMQ连接的类型, 由链接工厂创建. 注意,不能和JDBC中的Connection混淆。
7 Session
会话, 一次持久有效有状态的访问. 由链接创建. 是具体操作消息的基础支撑。
8 Queue & Topic
Queue是队列目的地,Topic是主题目的地。都是Destination的子接口。
Queue特点: 队列中的消息,默认只能由唯一的一个消费者处理。一旦处理消息删除。
Topic特点:主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处理的业务场景中可使用。
9 PTP
Point to Point。点对点消息模型。就是基于Queue实现的消息处理方式。
10 PUB & SUB
Publish & Subscribe 。消息的发布/订阅模型。是基于Topic实现的消息处理方式。
三、ActiveMQ安装
1 下载资源
ActiveMQ官网: http://activemq.apache.org
1.1 版本说明
ActiveMQ5.10.x以上版本必须使用JDK1.8才能正常使用。
ActiveMQ5.9.x及以下版本使用JDK1.7即可正常使用。
2 上传至Linux服务器
3 解压安装文件
tar -zxf apache-activemq-5.9.0-bin.tar.gz
4 检查权限
ls -al apache-activemq-5.9.0/bin
如果权限不足,则无法执行,需要修改文件权限:
chmod 755 activemq
5 复制应用至本地目录
cp -r apache-activemq-5.9.0 /usr/local/activemq
6 配置文件简介
/usr/local/activemq/conf/* - 配置文件.
需要关注的配置文件有: activemq.xml, jetty.xml, users.properties
任何配置文件修改后,必须重启ActiveMQ,才能生效.
6.1 activemq.xml
就是spring配置文件. 其中配置的是ActiveMQ应用使用的默认对象组件.
transportConnectors标签 - 配置链接端口信息的. 其中的端口号61616是ActiveMQ对外发布的tcp协议访问端口. 就是java代码访问ActiveMQ时使用的端口.
6.2 jetty.xml
spring配置文件, 用于配置jetty服务器的默认对象组件.
jetty是类似tomcat的一个中间件容器.
ActiveMQ默认支持一个网页版的服务查看站点. 可以实现ActiveMQ中消息相关数据的页面查看.
8161端口, 是ActiveMQ网页版管理站点的默认端口.
在ActiveMQ网页版管理站点中,需要登录, 默认的用户名和密码都是admin.
6.3 users.properties
内容信息: 用户名=密码
是用于配置客户端通过协议访问ActiveMQ时,使用的用户名和密码.
7 启动ActiveMQ
/usr/local/activemq/bin/activemq start
8 测试ActiveMQ
8.1 检查进程
ps aux | grep activemq
8.2 管理界面
使用浏览器访问ActiveMQ管理应用, 地址如下:
用户名: admin
密码: admin
ActiveMQ使用的是jetty提供HTTP服务.启动稍慢,建议短暂等待再访问测试.
8.3 修改访问端口
修改ActiveMQ配置文件: /usr/local/activemq/conf/jetty.xml
配置文件修改完毕,保存并重新启动ActiveMQ服务。
9 重启ActiveMQ
/usr/local/activemq/bin/activemq restart
10 关闭ActiveMQ
/usr/local/activemq/bin/activemq stop
四、ActiveMQ应用
1 PTP处理模式(Queue)
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。
当消费者不存在时,消息会一直保存,直到有消费消费
clip_image002.jpg2 Publish/Subscribe处理模式(Topic)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息
一定要先有消息的消费者,后有消息的生产者。
clip_image004.jpg3 PTP和PUB/SUB简单对比
Topic | Queue | |
---|---|---|
概要 | Publish Subscribe messaging 发布订阅消息 | Point-to-Point点对点 |
有无状态 | topic数据默认不落地,是无状态的。 | Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kahadb下面。也可以配置成DB存储。 |
完整性保障 | 并不保证publisher发布的每条数据,Subscriber都能接受到。 | Queue保证每条数据都能被receiver接收。消息不超时。 |
消息是否会丢失 | 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 | Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。前提是消息不超时。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 |
五、ActiveMQ安全认证
ActiveMQ也提供了安全认证。就是用户名密码登录规则。ActiveMQ如果需要使用安全认证的话,必须在activemq的核心配置文件中开启安全配置。配置文件就是conf/activemq.xml
在conf/activemq.xml配置文件的broker标签中增加下述内容。
<jaasAuthenticationPlugin configuration="activemq" />指定了使用JAAS插件管理权限,至于configuration="activemq"是在login.conf文件里定义的
<authorizationEntry topic="名字" read="用户组名" write="用户组名" admin="用户组名" />指定了具体的Topic/Queue与用户组的授权关系
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>这个是必须的配置,不能少
<plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<!-- 添加jaas认证插件activemq在login.config里面定义,详细见login.config-->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
开启认证后,认证使用的用户信息由其他配置文件提供。
conf/login.config
activemq {
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};
user代表用户信息配置文件,group代表用户组信息配置文件。寻址路径为相对当前配置文件所在位置开始寻址。
conf/users.properties #用户名=密码 admin=admin
conf/groups.properties #用户组名=用户名,用户名 admins=admin
六、ActiveMQ的持久化
ActiveMQ中,持久化是指对消息数据的持久化。在ActiveMQ中,默认的消息是保存在内存中的。当内存容量不足的时候,或ActiveMQ正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。
ActiveMQ的默认存储策略是kahadb。如果使用JDBC作为持久化策略,则会将所有的需要持久化的消息保存到数据库中。
所有的持久化配置都在conf/activemq.xml中配置,配置信息都在broker标签内部定义。
1 kahadb方式
是ActiveMQ默认的持久化策略。kahadb是一个文件型数据库。是使用内存+文件保证数据的持久化的。kahadb可以限制每个数据文件的大小。不代表总计数据容量。
<persistenceAdapter>
<!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>
特性是:1、日志形式存储消息;2、消息索引以B-Tree结构存储,可以快速更新;3、完全支持JMS事务;4、支持多种恢复机制;
2 AMQ方式
只适用于5.3版本之前。
AMQ也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。
<persistenceAdapter>
<!-- directory:保存数据的目录 ; maxFileLength:保存消息的文件大小 -->
<amqPersistenceAdapter directory="${activemq.data}/amq" maxFileLength="32mb"/>
</persistenceAdapter>
性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。
当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档。
主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。
而且由于索引巨大,一旦Broker(ActiveMQ应用实例)崩溃,重建索引的速度会非常慢。
虽然AMQ性能略高于Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。
3 JDBC持久化方式
ActiveMQ将数据持久化到数据库中。 不指定具体的数据库。 可以使用任意的数据库中。 本环节中使用MySQL数据库。
下述文件为activemq.xml配置文件部分内容。不要完全复制。
首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。
<broker brokerName="test-broker" persistent="true"
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"
createTablesOnStartup="false"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
配置成功后,需要在数据库中创建对应的database,否则无法访问。表格ActiveMQ可以自动创建。
activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。
表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,
其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪个Broker是当前的Master Broker。
只有在消息必须保证有效,且绝对不能丢失的时候。使用JDBC存储策略。
如果消息可以容忍丢失,或使用集群/主备模式保证数据安全的时候,建议使用levelDB或Kahadb。
七、API简介
1 Producer API简介
1.1 发送消息
MessageProducer.
send(Message message);发送消息到默认目的地,就是创建Producer时指定的目的地。
send(Destination destination, Message message); 发送消息到指定目的地,Producer不建议绑定目的地。也就是创建Producer的时候,不绑定目的地。session.createProducer(null)。
send(Message message, int deliveryMode, int priority, long timeToLive);发送消息到默认目的地,且设置相关参数。deliveryMode-持久化方式(DeliveryMode.PERSISTENT| DeliveryMode.NON_PERSISTENT)。priority-优先级。timeToLive-消息有效期(单位毫秒)。
send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive); 发送消息到指定目的地,且设置相关参数。
1.2 消息有效期
消息过期后,默认会将失效消息保存到“死信队列(ActiveMQ.DLQ)”。
不持久化的消息,在超时后直接丢弃,不会保存到死信队列中。
死信队列名称可配置,死信队列中的消息不能恢复。
死信队列是在activemq.xml中配置的。
1.3 消息优先级
不需特殊关注。
我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。
JMS标准中约定priority可以为09的整数数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持09,因为JDBC存储器可以基于priority对消息进行排序和索引化;但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。
1.3.1 开启
在broker端,默认是不存储priority信息的,我们需要手动开启,修改activemq.xml配置文件,在broker标签的子标签policyEntries中增加下述配置:
<policyEntry queue=">" prioritizedMessages="true"/>
不过对于“非持久化”类型的消息(如果没有被swap到临时文件),它们被保存在内存中,它们不存在从文件Paged in到内存的过程,因为可以保证优先级较高的消息,总是在prefetch的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。
Broker在收到Producer的消息之后,将会把消息cache到内存,如果消息需要持久化,那么同时也会把消息写入文件;如果通道中Consumer的消费速度足够快(即积压的消息很少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来cache消息),那么消息几乎不需要从存储文件中Paged In,直接就能从内存的cache中获取即可,这种情况下,priority可以担保“全局顺序”;不过,如果消费者滞后太多,cache已满,就会触发新接收的消息直接保存在磁盘中,那么此时,priority就没有那么有效了。
在Queue中,prefetch的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个consumer的pending buffer中,比如有m1-m2-m3-m4四条消息,有C1-C2两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上m2的权重>m3,对于C1而言,它似乎丢失了“顺序性”。
1.3.2 强顺序
<policyEntry queue=">" strictOrderDispatch="true"/>
strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;不过不要误解它为“全局严格顺序”,它只不过是将prefetch的消息依次填满每个consumer的pending buffer。比如上述例子中,如果C1-C2两个消费者的buffer尺寸为3,那么(m1,m2,m3)->C1,(m4)->C2;当C1填充完毕之后,才会填充C2。由此这种策略可以保证buffer中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch并非是解决priority消息顺序的问题而生,只是在使用priority时需要关注它)。
1.3.3 严格顺序
policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>
useCache=false来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消息的转发效率);queuePrefetch=1来约束每个consumer任何时刻只有一个消息正在处理,那些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读取肯定都是priority最高的消息。
2 Consumer API简介
2.1 消息的确认
Consumer拉取消息后,如果没有做确认acknowledge,此消息不会从MQ中删除。
消息的如果拉去到consumer后,未确认,那么消息被锁定。如果consumer关闭的时候仍旧没有确认消息,则释放消息锁定信息。消息将发送给其他的consumer处理。
消息一旦处理,应该必须确认。类似数据库中的事务管理机制。
2.2 消息的过滤
对消息消费者处理的消息数据进行过滤。这种处理可以明确消费者的角色,细分消费者的功能。
设置过滤:
Session.createConsumer(Destination destination, String messageSelector);
过滤信息为字符串,语法类似SQL92中的where子句条件信息。可以使用诸如AND、OR、IN、NOT IN等关键字。详细内容可以查看javax.jms.Message的帮助文档。
注意:消息的生产者在发送消息的的时候,必须设置可过滤的属性信息,所有的属性信息设置方法格式为:setXxxxProperty(String name, T value)。 其中方法名中的Xxxx是类型,如setObjectProperty/setStringProperty等。
八、Spring&ActiveMQ
ActiveMQ的开发,和Spring的整合是非常方便的。且Spring有对JMS提供的Template机制。所以Spring管理ActiveMQ访问操作是非常方便的。
九、ActiveMQ集群
使用ZooKeeper+ActiveMQ实现主从和集群.
1 Master-Slave
主从模式是一种高可用解决方案。在ZooKeeper中注册若干ActiveMQ Broker,其中只有一个Broker提供对外服务(Master),其他Broker处于待机状态(Slave)。当Master出现故障导致宕机时,通过ZooKeeper内部的选举机制,选举出一台Slave替代Master继续对外提供服务。
clip_image006.gif官方文档:http://activemq.apache.org/replicated-leveldb-store.html
1.1 安装ZooKeeper
搭建伪集群,在同一个Linux中安装三个ZooKeeper实例。使用不同的端口实现同时启动。端口分配如下:
主机 | 服务端口 | 投票端口 | 选举端口 |
---|---|---|---|
192.168.159.130 | 2181 | 2881 | 3881 |
192.168.159.130 | 2182 | 2882 | 3882 |
192.168.159.130 | 2183 | 2883 | 3883 |
1.1.1 解压缩
tar -zxf zookeeper
1.1.2 复制
cp -r zookeeper /usr/local/zookeeper1
1.1.3 创建data数据目录
在zookeeper1目录中创建子目录data目录
mkdir data
1.1.4 编写Zookeeper配置文件
vi /usr/local/solrcloude/zookeeper1/conf/zoo.cfg
修改数据目录
1.1.5 复制两份同样的Zookeeper
cp zookeeper1 zookeeper2 -r
cp zookeeper1 zookeeper3 -r
1.1.6 为Zookeeper服务增加服务命名
在每个Zookeeper应用内的data目录中增加文件myid
内部定义每个服务的编号. 编号要求为数字,是正整数
可以使用回声命名快速定义myid文件
echo 1 >> myid
1.1.7 修改Zookeeper配置文件 zoo.cfg
修改端口号.
提供多节点服务命名
port=2181 客户端访问端口. 三个Zookeeper实例不能端口相同.
server.编号=IP:投票端口:选举端口
投票端口: 用于决定正在运行的主机是否宕机.
选举端口: 用于决定哪一个Zookeeper服务作为主机.
三个Zookeeper应用配置一致.
server.1=192.168.120.132:2881:3881
server.2=192.168.120.132:2882:3882
server.3=192.168.120.132:2883:3883
1.1.8 启动Zookeeper测试
要至少启动两个Zookeeper启动. 启动单一Zookeeper,无法正常提供服务.
1.2 安装ActiveMQ
在同一个Linux中安装三个ActiveMQ实例,使用不同端口实现同时启动。端口分配如下:
主机 | M-S通讯端口 | 服务端口 | jetty端口 |
---|---|---|---|
192.168.159.130 | 62626 | 61616 | 8161 |
192.168.159.130 | 62627 | 61617 | 8162 |
192.168.159.130 | 62628 | 61618 | 8163 |
1.2.1 安装ActiveMQ实例
1.2.2 修改配置信息
1.2.2.1 修改jetty端口
修改conf/jetty.xml中的端口配置。分别是8161、8162、8163
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="port" value="8161"/> </bean>
1.2.2.2 统一所有主从节点Broker命名
修改conf/activemq.xml文件。修改broker标签属性信息,统一所有节点的broker命名。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}">
1.2.2.3 修改持久化配置
修改conf/activemq.xml文件。修改broker标签中子标签persistenceAdapter相关内容。
replicas属性代表当前主从模型中的节点数量。按需配置。
bind属性中的端口为主从实例之间的通讯端口。代表当前实例对外开放端口是什么,三个实例分别使用62626、62627、62628端口。
zkAddress属性代表ZooKeeper安装位置,安装具体情况设置。
zkPath是ActiveMQ主从信息保存到ZooKeeper中的什么目录内。
hostname为ActiveMQ实例安装Linux的主机名,可以在/etc/hosts配置文件中设置。设置格式为:IP 主机名。 如: 127.0.0.1 mq-server
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<replicatedLevelDB
directory="${activemq.data}/levelDB"
replicas="3"
bind="tcp://0.0.0.0:62626"
zkAddress="192.168.159.130:2181,192.168.159.130:2182,192.168.159.130:2183"
zkPath="/activemq/leveldb-stores"
hostname="mq-server"
/>
</persistenceAdapter>
1.2.2.4 修改服务端口
修改ActiveMQ对外提供的服务端口。原默认端口为61616。当前环境使用的端口为:61616、61617、61618。
修改conf/activemq.xml配置文件。修改broker标签中子标签transportConnectors的相关配置。只修改强调内容。
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
1.3 启动主从
将三个ActiveMQ实例分别启动。{activemq-home}/data/activemq.log。
1.4 查看主从状态
1.4.1 使用客户端连接ZooKeeper
${zkHome}/bin/zkCli.sh
1.4.2 查看状态信息
连接成功后,可以使用命令‘ls’查看ZooKeeper中的目录结构
如:
ls /
ls /activemq/leveldb-stores
找到对应的内容后,可以使用命令‘get’查看ZooKeeper中的数据内容
get /activemq/leveldb-stores/00000000005
其中主节点的elected及address属性一定有数据。从节点则数据为‘null’。
2 集群
准备多份主从模型。在所有的ActiveMQ节点中的conf/activemq.xml中增加下述配置:(每个主从模型中的networkConnector都指向另外一个主从模型)
<networkConnectors>
<networkConnector uri="static://(tcp://ip:port,tcp://ip:port)" duplex="false">
</networkConnector>
</networkConnectors>
注意配置顺序,Networks相关配置必须在持久化相关配置之前。如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.apache.org/schema/core">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}" >
<networkConnectors>
<networkConnector uri=" static://(tcp://ip:port,tcp://ip:port)"/>
</networkConnectors>
<persistenceAdapter>
< replicatedLevelDB directory = "xxx"/>
</persistenceAdapter>
</broker>
</beans>
如: 主从模型1 - 192.168.159.129 主从模型2 - 192.168.159.130
在主从模型1的所有节点activemq.xml配置文件中增加标签:
<networkConnectors>
<networkConnector uri="static://(tcp://192.168.159.130:61616,tcp://192.168.159.130:61617)"/>
</networkConnectors>
在模型2中所有节点增加配置:
<networkConnectors>
<networkConnector uri="static://(tcp://192.168.159.129:61616,tcp://192.168.159.129:61617)"/>
</networkConnectors>
网友评论