ActiveMQ

作者: Java及SpringBoot | 来源:发表于2018-09-10 10:30 被阅读228次

    一、JMS简介

    全称:Java Message Service 中文:Java消息服务。

    JMS是Java的一套API标准,最初的目的是为了使应用程序能够访问现有的MOM系统(MOM是Message Oriented Middleware的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。);后来被许多现有的MOM供应商采用,并实现为MOM系统。【常见MOM系统包括Apache的ActiveMQ、阿里巴巴的RocketMQ、IBM的MQSeries、Microsoft的MSMQ、BEA的RabbitMQ等。(并非全部的MOM系统都遵循JMS规范)】

    基于JMS实现的MOM,又被称为JMS Provider。

    “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

    消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

    消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。如:

    跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。

    多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。

    在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。

    应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。

    消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。

    应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。

    跨局域网,甚至跨城市的通讯,比如北京机房与广州机房的应用程序的通信。

    二、ActiveMQ简介

    多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

    完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

    对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去

    通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

    支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

    支持通过JDBC和journal提供高速的消息持久化

    从设计上保证了高性能的集群,客户端-服务器,点对点

    支持Ajax

    支持与Axis的整合, WebServices

    可以很容易的调用内嵌JMS provider,进行测试

    1 Destination

    目的地,JMS Provider(消息中间件)负责维护,用于对Message进行管理的对象。MessageProducer需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收消息。

    2 Producer

    消息生成者(客户端、生成消息),负责发送Message到目的地。应用接口为MessageProducer。在JMS规范中,所有的标准定义都在javax.jms包中。

    3 Consumer【Receiver】

    消息消费者(处理消息),负责从目的地中消费【处理|监听|订阅】Message。应用接口为MessageConsumer

    4 Message

    消息(Message),消息封装一次通信的内容。常见类型有:StreamMessage、BytesMessage、TextMessage、ObjectMessage、MapMessage。

    5 ConnectionFactory

    链接工厂, 用于创建链接的工厂类型。 注意,不能和JDBC中的ConnectionFactory混淆。

    6 Connection

    链接. 用于建立访问ActiveMQ连接的类型, 由链接工厂创建. 注意,不能和JDBC中的Connection混淆。

    7 Session

    会话, 一次持久有效有状态的访问. 由链接创建. 是具体操作消息的基础支撑。

    8 Queue & Topic

    Queue是队列目的地,Topic是主题目的地。都是Destination的子接口。

    Queue特点: 队列中的消息,默认只能由唯一的一个消费者处理。一旦处理消息删除。

    Topic特点:主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处理的业务场景中可使用。

    9 PTP

    Point to Point。点对点消息模型。就是基于Queue实现的消息处理方式。

    10 PUB & SUB

    Publish & Subscribe 。消息的发布/订阅模型。是基于Topic实现的消息处理方式。

    三、ActiveMQ安装

    1 下载资源

    ActiveMQ官网: http://activemq.apache.org

    1.1 版本说明

    ActiveMQ5.10.x以上版本必须使用JDK1.8才能正常使用。

    ActiveMQ5.9.x及以下版本使用JDK1.7即可正常使用。

    2 上传至Linux服务器

    3 解压安装文件

    tar -zxf apache-activemq-5.9.0-bin.tar.gz

    4 检查权限

    ls -al apache-activemq-5.9.0/bin

    如果权限不足,则无法执行,需要修改文件权限:

    chmod 755 activemq

    5 复制应用至本地目录

    cp -r apache-activemq-5.9.0 /usr/local/activemq

    6 配置文件简介

    /usr/local/activemq/conf/* - 配置文件.

    需要关注的配置文件有: activemq.xml, jetty.xml, users.properties

    任何配置文件修改后,必须重启ActiveMQ,才能生效.

    6.1 activemq.xml

    就是spring配置文件. 其中配置的是ActiveMQ应用使用的默认对象组件.

    transportConnectors标签 - 配置链接端口信息的. 其中的端口号61616是ActiveMQ对外发布的tcp协议访问端口. 就是java代码访问ActiveMQ时使用的端口.

    6.2 jetty.xml

    spring配置文件, 用于配置jetty服务器的默认对象组件.

    jetty是类似tomcat的一个中间件容器.

    ActiveMQ默认支持一个网页版的服务查看站点. 可以实现ActiveMQ中消息相关数据的页面查看.

    8161端口, 是ActiveMQ网页版管理站点的默认端口.

    在ActiveMQ网页版管理站点中,需要登录, 默认的用户名和密码都是admin.

    6.3 users.properties

    内容信息: 用户名=密码

    是用于配置客户端通过协议访问ActiveMQ时,使用的用户名和密码.

    7 启动ActiveMQ

    /usr/local/activemq/bin/activemq start

    8 测试ActiveMQ

    8.1 检查进程

    ps aux | grep activemq

    8.2 管理界面

    使用浏览器访问ActiveMQ管理应用, 地址如下:

    http://ip:8161/admin/

    用户名: admin

    密码: admin

    ActiveMQ使用的是jetty提供HTTP服务.启动稍慢,建议短暂等待再访问测试.

    8.3 修改访问端口

    修改ActiveMQ配置文件: /usr/local/activemq/conf/jetty.xml

    配置文件修改完毕,保存并重新启动ActiveMQ服务。

    9 重启ActiveMQ

    /usr/local/activemq/bin/activemq restart

    10 关闭ActiveMQ

    /usr/local/activemq/bin/activemq stop

    四、ActiveMQ应用

    1 PTP处理模式(Queue)

    消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

    消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。

    Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。

    当消费者不存在时,消息会一直保存,直到有消费消费

    clip_image002.jpg

    2 Publish/Subscribe处理模式(Topic)

    消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。

    和点对点方式不同,发布到topic的消息会被所有订阅者消费。

    当生产者发布消息,不管是否有消费者。都不会保存消息

    一定要先有消息的消费者,后有消息的生产者。

    clip_image004.jpg

    3 PTP和PUB/SUB简单对比

    Topic Queue
    概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point点对点
    有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kahadb下面。也可以配置成DB存储。
    完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。消息不超时。
    消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。前提是消息不超时。
    消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

    五、ActiveMQ安全认证

    ActiveMQ也提供了安全认证。就是用户名密码登录规则。ActiveMQ如果需要使用安全认证的话,必须在activemq的核心配置文件中开启安全配置。配置文件就是conf/activemq.xml

    在conf/activemq.xml配置文件的broker标签中增加下述内容。

    <jaasAuthenticationPlugin configuration="activemq" />指定了使用JAAS插件管理权限,至于configuration="activemq"是在login.conf文件里定义的

    <authorizationEntry topic="名字" read="用户组名" write="用户组名" admin="用户组名" />指定了具体的Topic/Queue与用户组的授权关系

    <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>这个是必须的配置,不能少

    <plugins>
                <!--  use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
    <!--  添加jaas认证插件activemq在login.config里面定义,详细见login.config-->
    
                <jaasAuthenticationPlugin configuration="activemq" />
                <!--  lets configure a destination based authorization mechanism -->
                <authorizationPlugin>
                    <map>
                        <authorizationMap>
                            <authorizationEntries>
                                <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                                <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                                <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                                <authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                            </authorizationEntries>
                        </authorizationMap>
                    </map>
                </authorizationPlugin>
    </plugins> 
    

    开启认证后,认证使用的用户信息由其他配置文件提供。

    conf/login.config

    activemq {
        org.apache.activemq.jaas.PropertiesLoginModule required
            org.apache.activemq.jaas.properties.user="users.properties"
            org.apache.activemq.jaas.properties.group="groups.properties";
    };
    

    user代表用户信息配置文件,group代表用户组信息配置文件。寻址路径为相对当前配置文件所在位置开始寻址。

    conf/users.properties #用户名=密码 admin=admin

    conf/groups.properties #用户组名=用户名,用户名 admins=admin

    六、ActiveMQ的持久化

    ActiveMQ中,持久化是指对消息数据的持久化。在ActiveMQ中,默认的消息是保存在内存中的。当内存容量不足的时候,或ActiveMQ正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。

    ActiveMQ的默认存储策略是kahadb。如果使用JDBC作为持久化策略,则会将所有的需要持久化的消息保存到数据库中。

    所有的持久化配置都在conf/activemq.xml中配置,配置信息都在broker标签内部定义。

    1 kahadb方式

    是ActiveMQ默认的持久化策略。kahadb是一个文件型数据库。是使用内存+文件保证数据的持久化的。kahadb可以限制每个数据文件的大小。不代表总计数据容量。

    <persistenceAdapter>
     <!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
     <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
    </persistenceAdapter>
    

    特性是:1、日志形式存储消息;2、消息索引以B-Tree结构存储,可以快速更新;3、完全支持JMS事务;4、支持多种恢复机制;

    2 AMQ方式

    只适用于5.3版本之前。

    AMQ也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

    <persistenceAdapter>
       <!-- directory:保存数据的目录 ; maxFileLength:保存消息的文件大小 -->
       <amqPersistenceAdapter directory="${activemq.data}/amq" maxFileLength="32mb"/>
    </persistenceAdapter>
    

    性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。

    当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档。

    主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。

    而且由于索引巨大,一旦Broker(ActiveMQ应用实例)崩溃,重建索引的速度会非常慢。

    虽然AMQ性能略高于Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。

    3 JDBC持久化方式

    ActiveMQ将数据持久化到数据库中。 不指定具体的数据库。 可以使用任意的数据库中。 本环节中使用MySQL数据库。

    下述文件为activemq.xml配置文件部分内容。不要完全复制。

    首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。

    dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

    <broker brokerName="test-broker" persistent="true" 
    xmlns="http://activemq.apache.org/schema/core">
    <persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" 
    createTablesOnStartup="false"/>
    </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
    destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" 
    value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
    

    配置成功后,需要在数据库中创建对应的database,否则无法访问。表格ActiveMQ可以自动创建。

    activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:

    ID:自增的数据库主键

    CONTAINER:消息的Destination

    MSGID_PROD:消息发送者客户端的主键

    MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID

    EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数

    MSG:消息本体的Java序列化对象的二进制数据

    PRIORITY:优先级,从0-9,数值越大优先级越高

    activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

    主要的数据库字段如下:

    CONTAINER:消息的Destination

    SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息

    CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分

    SUB_NAME:订阅者名称

    SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作

    LAST_ACKED_ID:记录消费过的消息的ID。

    表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,

    其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
    这个表用于记录哪个Broker是当前的Master Broker。

    只有在消息必须保证有效,且绝对不能丢失的时候。使用JDBC存储策略。

    如果消息可以容忍丢失,或使用集群/主备模式保证数据安全的时候,建议使用levelDB或Kahadb。

    七、API简介

    1 Producer API简介

    1.1 发送消息

    MessageProducer.

    send(Message message);发送消息到默认目的地,就是创建Producer时指定的目的地。

    send(Destination destination, Message message); 发送消息到指定目的地,Producer不建议绑定目的地。也就是创建Producer的时候,不绑定目的地。session.createProducer(null)。

    send(Message message, int deliveryMode, int priority, long timeToLive);发送消息到默认目的地,且设置相关参数。deliveryMode-持久化方式(DeliveryMode.PERSISTENT| DeliveryMode.NON_PERSISTENT)。priority-优先级。timeToLive-消息有效期(单位毫秒)。

    send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive); 发送消息到指定目的地,且设置相关参数。

    1.2 消息有效期

    消息过期后,默认会将失效消息保存到“死信队列(ActiveMQ.DLQ)”。

    不持久化的消息,在超时后直接丢弃,不会保存到死信队列中。

    死信队列名称可配置,死信队列中的消息不能恢复。

    死信队列是在activemq.xml中配置的。

    1.3 消息优先级

    不需特殊关注。

    我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。

    JMS标准中约定priority可以为09的整数数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持09,因为JDBC存储器可以基于priority对消息进行排序和索引化;但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。

    1.3.1 开启

    在broker端,默认是不存储priority信息的,我们需要手动开启,修改activemq.xml配置文件,在broker标签的子标签policyEntries中增加下述配置:

    <policyEntry queue=">" prioritizedMessages="true"/>  
    

    不过对于“非持久化”类型的消息(如果没有被swap到临时文件),它们被保存在内存中,它们不存在从文件Paged in到内存的过程,因为可以保证优先级较高的消息,总是在prefetch的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。

    Broker在收到Producer的消息之后,将会把消息cache到内存,如果消息需要持久化,那么同时也会把消息写入文件;如果通道中Consumer的消费速度足够快(即积压的消息很少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来cache消息),那么消息几乎不需要从存储文件中Paged In,直接就能从内存的cache中获取即可,这种情况下,priority可以担保“全局顺序”;不过,如果消费者滞后太多,cache已满,就会触发新接收的消息直接保存在磁盘中,那么此时,priority就没有那么有效了。

    在Queue中,prefetch的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个consumer的pending buffer中,比如有m1-m2-m3-m4四条消息,有C1-C2两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上m2的权重>m3,对于C1而言,它似乎丢失了“顺序性”。

    1.3.2 强顺序
    <policyEntry queue=">" strictOrderDispatch="true"/>
    

    strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;不过不要误解它为“全局严格顺序”,它只不过是将prefetch的消息依次填满每个consumer的pending buffer。比如上述例子中,如果C1-C2两个消费者的buffer尺寸为3,那么(m1,m2,m3)->C1,(m4)->C2;当C1填充完毕之后,才会填充C2。由此这种策略可以保证buffer中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch并非是解决priority消息顺序的问题而生,只是在使用priority时需要关注它)。

    1.3.3 严格顺序
    policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>
    

    useCache=false来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消息的转发效率);queuePrefetch=1来约束每个consumer任何时刻只有一个消息正在处理,那些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读取肯定都是priority最高的消息。

    2 Consumer API简介

    2.1 消息的确认

    Consumer拉取消息后,如果没有做确认acknowledge,此消息不会从MQ中删除。

    消息的如果拉去到consumer后,未确认,那么消息被锁定。如果consumer关闭的时候仍旧没有确认消息,则释放消息锁定信息。消息将发送给其他的consumer处理。

    消息一旦处理,应该必须确认。类似数据库中的事务管理机制。

    2.2 消息的过滤

    对消息消费者处理的消息数据进行过滤。这种处理可以明确消费者的角色,细分消费者的功能。

    设置过滤:

    Session.createConsumer(Destination destination, String messageSelector);

    过滤信息为字符串,语法类似SQL92中的where子句条件信息。可以使用诸如AND、OR、IN、NOT IN等关键字。详细内容可以查看javax.jms.Message的帮助文档。

    注意:消息的生产者在发送消息的的时候,必须设置可过滤的属性信息,所有的属性信息设置方法格式为:setXxxxProperty(String name, T value)。 其中方法名中的Xxxx是类型,如setObjectProperty/setStringProperty等。

    八、Spring&ActiveMQ

    ActiveMQ的开发,和Spring的整合是非常方便的。且Spring有对JMS提供的Template机制。所以Spring管理ActiveMQ访问操作是非常方便的。

    九、ActiveMQ集群

    使用ZooKeeper+ActiveMQ实现主从和集群.

    1 Master-Slave

    主从模式是一种高可用解决方案。在ZooKeeper中注册若干ActiveMQ Broker,其中只有一个Broker提供对外服务(Master),其他Broker处于待机状态(Slave)。当Master出现故障导致宕机时,通过ZooKeeper内部的选举机制,选举出一台Slave替代Master继续对外提供服务。

    clip_image006.gif

    官方文档:http://activemq.apache.org/replicated-leveldb-store.html

    1.1 安装ZooKeeper

    搭建伪集群,在同一个Linux中安装三个ZooKeeper实例。使用不同的端口实现同时启动。端口分配如下:

    主机 服务端口 投票端口 选举端口
    192.168.159.130 2181 2881 3881
    192.168.159.130 2182 2882 3882
    192.168.159.130 2183 2883 3883
    1.1.1 解压缩

    tar -zxf zookeeper

    1.1.2 复制

    cp -r zookeeper /usr/local/zookeeper1

    1.1.3 创建data数据目录

    在zookeeper1目录中创建子目录data目录

    mkdir data

    1.1.4 编写Zookeeper配置文件

    vi /usr/local/solrcloude/zookeeper1/conf/zoo.cfg

    修改数据目录

    1.1.5 复制两份同样的Zookeeper

    cp zookeeper1 zookeeper2 -r

    cp zookeeper1 zookeeper3 -r

    1.1.6 为Zookeeper服务增加服务命名

    在每个Zookeeper应用内的data目录中增加文件myid

    内部定义每个服务的编号. 编号要求为数字,是正整数

    可以使用回声命名快速定义myid文件

    echo 1 >> myid

    1.1.7 修改Zookeeper配置文件 zoo.cfg

    修改端口号.

    提供多节点服务命名

    port=2181 客户端访问端口. 三个Zookeeper实例不能端口相同.
    server.编号=IP:投票端口:选举端口
    投票端口: 用于决定正在运行的主机是否宕机.
    选举端口: 用于决定哪一个Zookeeper服务作为主机.
    三个Zookeeper应用配置一致.
    server.1=192.168.120.132:2881:3881
    server.2=192.168.120.132:2882:3882
    server.3=192.168.120.132:2883:3883 
    
    1.1.8 启动Zookeeper测试

    要至少启动两个Zookeeper启动. 启动单一Zookeeper,无法正常提供服务.

    1.2 安装ActiveMQ

    在同一个Linux中安装三个ActiveMQ实例,使用不同端口实现同时启动。端口分配如下:

    主机 M-S通讯端口 服务端口 jetty端口
    192.168.159.130 62626 61616 8161
    192.168.159.130 62627 61617 8162
    192.168.159.130 62628 61618 8163
    1.2.1 安装ActiveMQ实例
    1.2.2 修改配置信息
    1.2.2.1 修改jetty端口

    修改conf/jetty.xml中的端口配置。分别是8161、8162、8163

       <bean id="jettyPort"   class="org.apache.activemq.web.WebConsolePort"   init-method="start">                <!-- the default port number   for the web console -->           <property name="port"   value="8161"/>       </bean>   
    
    1.2.2.2 统一所有主从节点Broker命名

    修改conf/activemq.xml文件。修改broker标签属性信息,统一所有节点的broker命名。

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}">
    
    1.2.2.3 修改持久化配置

    修改conf/activemq.xml文件。修改broker标签中子标签persistenceAdapter相关内容。

    replicas属性代表当前主从模型中的节点数量。按需配置。

    bind属性中的端口为主从实例之间的通讯端口。代表当前实例对外开放端口是什么,三个实例分别使用62626、62627、62628端口。

    zkAddress属性代表ZooKeeper安装位置,安装具体情况设置。

    zkPath是ActiveMQ主从信息保存到ZooKeeper中的什么目录内。

    hostname为ActiveMQ实例安装Linux的主机名,可以在/etc/hosts配置文件中设置。设置格式为:IP 主机名。 如: 127.0.0.1 mq-server

    <persistenceAdapter>
    <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
     <replicatedLevelDB
       directory="${activemq.data}/levelDB"
       replicas="3"
       bind="tcp://0.0.0.0:62626"
       zkAddress="192.168.159.130:2181,192.168.159.130:2182,192.168.159.130:2183"
       zkPath="/activemq/leveldb-stores"
       hostname="mq-server"
       />
    </persistenceAdapter>
    
    1.2.2.4 修改服务端口

    修改ActiveMQ对外提供的服务端口。原默认端口为61616。当前环境使用的端口为:61616、61617、61618。

    修改conf/activemq.xml配置文件。修改broker标签中子标签transportConnectors的相关配置。只修改强调内容。

    <transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>
    

    1.3 启动主从

    将三个ActiveMQ实例分别启动。{activemq-home}/bin/active start。启动后,可以查看日志文件,检查启动状态,日志文件为{activemq-home}/data/activemq.log。

    1.4 查看主从状态

    1.4.1 使用客户端连接ZooKeeper

    ${zkHome}/bin/zkCli.sh

    1.4.2 查看状态信息

    连接成功后,可以使用命令‘ls’查看ZooKeeper中的目录结构

    如:

    ls /

    ls /activemq/leveldb-stores

    找到对应的内容后,可以使用命令‘get’查看ZooKeeper中的数据内容

    get /activemq/leveldb-stores/00000000005

    其中主节点的elected及address属性一定有数据。从节点则数据为‘null’。

    2 集群

    准备多份主从模型。在所有的ActiveMQ节点中的conf/activemq.xml中增加下述配置:(每个主从模型中的networkConnector都指向另外一个主从模型)

    <networkConnectors>
     <networkConnector uri="static://(tcp://ip:port,tcp://ip:port)" duplex="false">
     </networkConnector>
    </networkConnectors>
    

    注意配置顺序,Networks相关配置必须在持久化相关配置之前。如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://activemq.apache.org/schema/core">
       <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq-cluster" dataDirectory="${activemq.data}" >
           <networkConnectors>
               <networkConnector uri=" static://(tcp://ip:port,tcp://ip:port)"/>
           </networkConnectors>
           <persistenceAdapter>
               < replicatedLevelDB directory = "xxx"/>       
           </persistenceAdapter>
       </broker>
    </beans>
    

    如: 主从模型1 - 192.168.159.129 主从模型2 - 192.168.159.130

    在主从模型1的所有节点activemq.xml配置文件中增加标签:
    <networkConnectors>
    <networkConnector uri="static://(tcp://192.168.159.130:61616,tcp://192.168.159.130:61617)"/>
    </networkConnectors>
    
    在模型2中所有节点增加配置:
    <networkConnectors>
    <networkConnector uri="static://(tcp://192.168.159.129:61616,tcp://192.168.159.129:61617)"/>
    </networkConnectors>
    

    相关文章

      网友评论

          本文标题:ActiveMQ

          本文链接:https://www.haomeiwen.com/subject/rbgjgftx.html