美文网首页
RocketMQ环境搭建

RocketMQ环境搭建

作者: 冷冷DerFan | 来源:发表于2019-03-02 09:06 被阅读0次

    RocketMQ单机部署流程

    1.下载源码包
    地址 http://rocketmq.apache.org/docs/quick-start/

    unzip rocketmq-all-4.4.0-source-release.zip
    cd rocketmq-all-4.4.0/
    mvn -Prelease-all -DskipTests clean install -U
    cd distribution/target/apache-rocketmq
    使用maven打包源码包
    完成后在distribution/target/apache-rocketmq目录下找到生成的文件

    2.修改linux Hosts
    vi /etc/hosts

    添加
    172.19.24.103 rocketmq-nameserver1
    172.19.24.103 rocketmq-master1
    nameserver和master映射关系

    3.建立RocketMQ目录

    mkdir /usr/local/apache-rocketmq
    cp apache-rocketmq.tar.gz /usr/local/apache-rocketmq
    cd /usr/local/apache-rocketmq
    tar -zxvf apache-rocketmq.tar.gz
    ln -s apache-rocketmq rocketmq

    4.创建存储路径
    mkdir /usr/local/rocketmq/store
    mkdir /usr/local/rocketmq/store/commitlog
    mkdir /usr/local/rocketmq/store/consumequeue
    mkdir /usr/local/rocketmq/store/index

    5.修改RocketMQ配置文件
    vi /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

    内容如下

    集群名称

    brokerClusterName=rocketmq-cluster

    brokerName=broker-a

    brokerId=0

    namesrvAddr=rocketmq-nameserver1:9876

    defaultTopicQueueNums=4

    autoCreateTopicEnable=true

    autoCreateSubScriptionGroup=true

    listenPosr=10911

    deleteWhen=04

    fileReservedTime=120

    mapedFileSizeCommitLog=1073741824

    mapedFileSizeConsumeQueue=300000

    diskMaxUsedSpaceRatio=88

    storePathRootDir=/usr/local/rocketmq/store

    storePathCommitLog=/usr/local/rocketmq/store/commitLog

    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

    storePathIndex=/usr/local/rocketmq/store/index

    storeCheckpooint=/usr/local/rocketmq/store/checkpoint

    abortFile=/usr/local/rocketmq/store/abort

    maxMessageSize=65536

    brokerRole=ASYNC_MASTER

    flushDiskType=ASYNC_FLUSH

    6.修改日志文件路径
    mkdir -p /usr/local/rocketmq/logs
    cd /usr/local/rocketmq/conf
    sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

    7.修改启动脚本参数(主要防止配置内存过大,本地机器内存不够用,生产另说)
    vi /usr/local/rocketmq/bin/runbroker.sh
    修改jvm内存为最大,最小,新生代为1g
    vi /usr/local/rocketmq/bin/runserver.sh
    修改jvm内存为最大,最小,新生代为1g

    8.启动
    cd /usr/local/rocketmq/bin/
    先启动 NameServer
    nohup sh mqnamesrv &

    再启动 broker
    nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &

    9.查看日志
    broker 日志
    tail -f /usr/local/rocketmq/logs/rocketmqlogs/broker.log
    tail -f /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

    RocketMQ生产者使用

    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("testQuickStart");
            producer.setNamesrvAddr(Const.NAME_SERVER_ADDR);
            producer.start();
    
            for (int i = 0; i < 5; i++) {
                Message message = new Message("test_quick_topic", "TagA", "Keys" + i, ("Hello RocketMQ"  + i).getBytes());
                SendResult res = producer.send(message);
                System.out.println("消息发送结果" + res);
            }
            producer.shutdown();
        }
    }
    

    运行代码可以发现Topic和Queue是一对多的关系,一个Topic发送多条消息会落到不同queueId的队列中去

    消费端:

    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
            consumer.setNamesrvAddr(Const.NAME_SERVER_ADDR);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            consumer.subscribe("test_quick_topic", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    MessageExt messageExt = list.get(0);
    
                    try {
                        String topic = messageExt.getTopic();
                        String msgId = messageExt.getMsgId();
                        String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("topic "  + topic + " msgId "  + msgId + " body "  + body);
                        int i =  1 / 0;
                    } catch (Exception e) {
                        e.printStackTrace();
                        int reconsumeTimes = messageExt.getReconsumeTimes();
                        System.out.println("reconsumeTimes " + reconsumeTimes);
                        if (reconsumeTimes == 3) {
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }
    

    可以看到RocketMQ会有自动重试机制,在捕获异常时RocketMQ Broker可以进行消息重发。

    至此,完成了单机RocketMQ的搭建和简单生产者消费者的demo运行。

    相关文章

      网友评论

          本文标题:RocketMQ环境搭建

          本文链接:https://www.haomeiwen.com/subject/ihasuqtx.html