美文网首页
rocketmq消息中间件搭建安装

rocketmq消息中间件搭建安装

作者: w_cross | 来源:发表于2019-03-05 22:29 被阅读0次

RocketMQ介绍

RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,具有以下特点:

能够保证严格的消息顺序

提供丰富的消息拉取模式

高效的订阅者水平扩展能力

实时的消息订阅机制

亿级消息堆积能力

选用理由:

强调集群无单点,可扩展,任意一点高可用,水平可扩展 

海量消息堆积能力,消息堆积后,写入低延迟

支持上万个队列

消息失败重试机制

消息可查询

开源社区活跃

成熟度(经过双十一考验)

RocketMQ的各部分角色介绍

角色名称功能


Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息


Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费

Broker:消息中转角色,负责存储消息,转发消息,一般也称为Server

NameServer:管理中心,一般存储Broker的信息

        RocketMQ这四个角色就相当于我们现实生活中的邮政系统,其中Producer、Consumer、 Broker、NameServer分别代表发信者、收信者、负责暂存和传输的邮局、以及协调各个地方邮局的管理机构。

        启动RocketMQ之前先要启动NameServer,再启动Broker,这时候消息队列已经在开始工作了。如果想要发送消息,就用Producer;接受消息就用Consumer。如果程序中既要接收也要发送,可以启动多个Producer和Consumer。如果想要增加可靠性或者增大吞吐量,防止单点故障也可以在多台机器上部署多个NameServer和Broker,并且每个Broker也可以部署一个或者过个Slave。

        大致了解了基本角色功能后,再介绍两个重要的名词概念Topic(主题)和Message Queue(消息队列)。当一个企业搭建好消息平台后会有多条业务线接入进来,同一个业务也会有不同类型的消息需要投递,如何保证这消息准确地进行,就需要给不同类型的消息加上Topic名称来进行区分。所以在发送消息和接受消息时,需要先创建Topic。有了Topic后,仍然还有性能问题需要考虑。当一个Topic下的消息投递量或者发送量过大怎么办,这就需要在一个Topic下设置一个或者多个Message Queue来提高并行处理速度。有了Message Queue后,消息就可以并行地向各个Message Queue进行分发,从而消费者也可以从多个队列中读取消息,满足性能要求。

RocketMQ单点安装

参照官网:Downloading the Apache RocketMQ Releases - Apache RocketMQ

RocketMQ多级集群部署以及安装

本次先讲如何利用两台物理机,搭建出双主双从无单点故障的高可用RoketMQ集群。假设这两台物理机的ip分别为192.168.218.51和192.168.218.52,端口号默认为9876。

1.启动多个NameServer和Broker

首先按照单点部署,在两台服务器上分别安装RocketMQ,服务地址分别为192.168.218.51:9876和192.168.218.52:9876,然后启动NameServer(nohup sh bin/mqnamesrv &)

启动Broker,每台机器都需启动一个Master角色和一个Slave角色,作为主备。修改的配置文件在安装目录下的conf/2m-2s-sync下。

192.168.218.51机器上的Master Broker配置文件(conf/2m-2s-sync/broker-a.properties)

brokerClusterName=ifind-rocketmq-cluster     所属集群名字,集群比较多可以分成多个Cluster,每个供一个业务使用

brokerName=broker-a    broker名字,注意此处不同的配置文件填写的不一样,2选1

brokerId=1    0表示 Master,>0表示 Slave

namesrvAddr=192.168.216.57:9876;192.168.216.61:9876;192.168.216.58:9876;192.168.216.62:9876    nameServer地址,分号分割

defaultTopicQueueNums=4    在发送消息时,自动创建服务器不存在的topic,默认创建的队列数

autoCreateTopicEnable=true    是否允许 Broker自动创建Topic,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true    是否允许 Broker自动创建订阅组,建议线下开启,线上关闭

listenPort=10911    Broker对外服务的监听端口

deleteWhen=04    删除文件时间点,默认凌晨 4点

fileReservedTime=120    文件保留时间,默认 48小时

mapedFileSizeCommitLog=1073741824    commitLog每个文件的大小默认1G

brokerRole=ASYNC_MASTER    Broker的角色

flushDiskType=ASYNC_FLUSH    刷盘方式    

修改192.168.218.51机器上的Master Broker配置文件(conf/2m-2s-sync/broker-a-s.properties)

修改192.168.218.52机器上的Master Broker配置文件(conf/2m-2s-sync/broker-b.properties)

修改192.168.218.52机器上的Master Broker配置文件(conf/2m-2s-sync/broker-b-s.properties)

几个配置参数的含义

参数名含义

brokerId有三种:SYNCMASTER ASYNCMASTER SLAVE,SYNC表示当Slave和Master消息同步完成 后,再返回发送成功的状态

flushDiskType表示刷盘策略,分为SYNCFLUSH和ASYNCFLUSH两种,代表同步刷盘和异步刷盘。同 步状态下,消息真正写入磁盘才返回成功状态;异步刷盘情况下,消息写入缓存后才返回成功状态

2.发送和接受消息的demo

procucer

public class SyncProducer {

    public static void main(String[] args) throws Exception {

        //实例化一个生产者

        DefaultMQProducer producer = new DefaultMQProducer("please_input_group_name"); //生产组名称

        producer.setNamesrvAddr("192.168.218.51"); //确定服务地址,集群时通过读取配置文件变量赋值

        producer.start();  //生产者开始工作

        //发送消息

        for () {

            //三个参数,第一个topic,第二个tag标识,第三个是消息内容

            Message msg = new Message("test-topic","tag-a","msg");

            SendResult sendResult = producer.send(msg); //生产者发送消息

        }

        producer.shutdown();

    }

}

consumer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_input_group_name"); //消费组名称

consumer.subscribe("topic","*");    //消费者订阅

consumer.registerMessageListener(new MessageListenerConcurrentliy) {

    public ConsumeConcurrentStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        //处理的逻辑代码

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

};

consumer.start();

消息队列的协调者

NameServer的功能

NameServer是消息队列中的状态服务器,集群中的各个组件通过它了解全局的信息。同时,各个机器都要定期向NameServer上报自己的状态,如果超时不上报的话,其它组件会把这个机器从列表中移除。NameServer可以部署多个,本身是无状态的,也就是Broker、Topic等状态信息不会持久存储,是由各个角色上报存储到内存的。

集群状态的存储结构

private final HashMap> topicQueue topicQueueTable //这个map的key是Topic的名称,存储了所有topic的信息。Value存储着Broker的名称、读写Queue的数量以及同步标识等 

private final HashMap //这个结构key是BrokerName,value存储着地址信息以及所属Cluster的名称 

private final HashMap //这个结构的key是Broker的ip地址,value为Broker机器的实时状态,包括上次更新状态的时间戳 

private final HashMap> //key为Cluster的名称,set中存储的是Broker的名称,就是集群的BrokerName的集合

 private final HashMap> filterServerTable //key是Broker的地址,value是和这个Broker关联的多个过滤器的地址

以上五个变量的定义,可以清楚的看出各个组件的状态是如何进行存储的,而NameServer的作用便是维护这五个变量中存储的信息。

相关文章

网友评论

      本文标题:rocketmq消息中间件搭建安装

      本文链接:https://www.haomeiwen.com/subject/advauqtx.html