美文网首页
第一章 RocketMQ 搭建调试环境

第一章 RocketMQ 搭建调试环境

作者: 原水寒 | 来源:发表于2022-05-14 16:33 被阅读0次

参考自:RocketMQ技术内幕 RocketMQ架构设计与实现原理(第2版)

RocketMQ 体系中包括四个角色:

  • broker:消息服务器,存储消息
  • producer:消息生产者,产生消息
  • consumer:消息消费者,消费消息
  • nameServer:简称 namesrv,注册中心(类似 zk),存储 broker 列表,producer 和 consumer 从 namesrv 获取 broker 列表,通过负载均衡模块获取 broker 后,之后将消息发送到 broker 或者 从 broker 进行消费

一、源码准备

https://github.com/apache/rocketmq.git
下载到:/Users/zhaowang/code/study/rocketmq,导入IDEA

二、配置文件

cd /Users/zhaowang/code/study/rocketmq 主运行目录
mkdir conf logs store
cd /Users/zhaowang/code/study/rocketmq/distribution/conf
cp broker.conf logback_namesrv.xml logback_broker.xml /Users/zhaowang/code/study/rocketmq/conf/
vi /Users/zhaowang/code/study/rocketmq/conf/broker.conf

主运行目录:通过 ROCKET_HOME=path 或者 -Drocketmq.home.dir=path 来设定

增加以下内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrv 地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 一票文件存储地址
storePathRootDir=/Users/zhaowang/code/study/rocketmq/store
storePathCommitLog=/Users/zhaowang/code/study/rocketmq/store/commitlog
storePathConsumeQueue=/Users/zhaowang/code/study/rocketmq/store/consumequeue
storePathIndex=/Users/zhaowang/code/study/rocketmq/store/index
storePathCheckPoint=/Users/zhaowang/code/study/rocketmq/store/checkpoint
abortFile=/Users/zhaowang/code/study/rocketmq/store/abort

配置环境变量

ROCKETMQ_HOME=/Users/zhaowang/code/study/rocketmq

三、启动 namesrv

debug 执行:org.apache.rocketmq.namesrv.NamesrvStartup#main
看到 The Name Server boot success. serializeType=JSON 后,表示启动成功

四、启动 broker

debug 执行:org.apache.rocketmq.broker.BrokerStartup#main
执行之前在 idea debug窗口配置:

# 检测磁盘空间,如果已使用空间/总空间>diskSpaceWarningLevelRatio,消息发送直接报错
VM options:-Drocketmq.broker.diskSpaceWarningLevelRatio=0.98
# 指定配置文件地址
Program arguments: -c /Users/zhaowang/code/study/rocketmq/conf/broker.conf

看到 The broker[broker-a, xx.xx.xx.xx:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876 后,表示启动成功

五、启动 producer

配置 namesrv

public class org.apache.rocketmq.example.quickstart.Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("127.0.0.1:9876"); 
        // Launch the instance.
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                // Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

debug 执行:org.apache.rocketmq.example.quickstart.Producer#main
看到 SendResult [sendStatus=SEND_OK, msgId=1E82742D193B18B4AAC2460C17BF0001, offsetMsgId=1E82742D00002A9F000000000005DA66, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=499] 类似信息后,表示消息发送成功

六、启动 consumer

配置 namesrv

public class org.apache.rocketmq.example.quickstart.Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // Specify name server addresses.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Subscribe one more topic to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //  Launch the consumer instance.
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

debug 执行:org.apache.rocketmq.example.quickstart.Consumer#main
看到 ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=499, sysFlag=0, bornTimestamp=1652509597631, bornHost=/xx.xx.xx.xx:51928, storeTimestamp=1652509597637, storeHost=/xx.xx.xx.xx:10911, msgId=1E82742D00002A9F000000000005DA66, commitLogOffset=383590, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=511, CONSUME_START_TIME=1652509643679, UNIQ_KEY=1E82742D193B18B4AAC2460C17BF0001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 类似信息后,表示消息消费成功

相关文章

网友评论

      本文标题:第一章 RocketMQ 搭建调试环境

      本文链接:https://www.haomeiwen.com/subject/reapurtx.html