参考自:RocketMQ技术内幕 RocketMQ架构设计与实现原理(第2版)
RocketMQ 体系中包括四个角色:
- broker:消息服务器,存储消息
- producer:消息生产者,产生消息
- consumer:消息消费者,消费消息
- nameServer:简称 namesrv,注册中心(类似 zk),存储 broker 列表,producer 和 consumer 从 namesrv 获取 broker 列表,通过负载均衡模块获取 broker 后,之后将消息发送到 broker 或者 从 broker 进行消费
一、源码准备
https://github.com/apache/rocketmq.git
下载到:/Users/zhaowang/code/study/rocketmq,导入IDEA
二、配置文件
cd /Users/zhaowang/code/study/rocketmq 主运行目录
mkdir conf logs store
cd /Users/zhaowang/code/study/rocketmq/distribution/conf
cp broker.conf logback_namesrv.xml logback_broker.xml /Users/zhaowang/code/study/rocketmq/conf/
vi /Users/zhaowang/code/study/rocketmq/conf/broker.conf
主运行目录:通过 ROCKET_HOME=path 或者 -Drocketmq.home.dir=path 来设定
增加以下内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrv 地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 一票文件存储地址
storePathRootDir=/Users/zhaowang/code/study/rocketmq/store
storePathCommitLog=/Users/zhaowang/code/study/rocketmq/store/commitlog
storePathConsumeQueue=/Users/zhaowang/code/study/rocketmq/store/consumequeue
storePathIndex=/Users/zhaowang/code/study/rocketmq/store/index
storePathCheckPoint=/Users/zhaowang/code/study/rocketmq/store/checkpoint
abortFile=/Users/zhaowang/code/study/rocketmq/store/abort
配置环境变量
ROCKETMQ_HOME=/Users/zhaowang/code/study/rocketmq
三、启动 namesrv
debug 执行:org.apache.rocketmq.namesrv.NamesrvStartup#main
看到 The Name Server boot success. serializeType=JSON
后,表示启动成功
四、启动 broker
debug 执行:org.apache.rocketmq.broker.BrokerStartup#main
执行之前在 idea debug窗口配置:
# 检测磁盘空间,如果已使用空间/总空间>diskSpaceWarningLevelRatio,消息发送直接报错
VM options:-Drocketmq.broker.diskSpaceWarningLevelRatio=0.98
# 指定配置文件地址
Program arguments: -c /Users/zhaowang/code/study/rocketmq/conf/broker.conf
看到 The broker[broker-a, xx.xx.xx.xx:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
后,表示启动成功
五、启动 producer
配置 namesrv
public class org.apache.rocketmq.example.quickstart.Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("127.0.0.1:9876");
// Launch the instance.
producer.start();
for (int i = 0; i < 1000; i++) {
try {
// Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
debug 执行:org.apache.rocketmq.example.quickstart.Producer#main
看到 SendResult [sendStatus=SEND_OK, msgId=1E82742D193B18B4AAC2460C17BF0001, offsetMsgId=1E82742D00002A9F000000000005DA66, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=499]
类似信息后,表示消息发送成功
六、启动 consumer
配置 namesrv
public class org.apache.rocketmq.example.quickstart.Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// Specify name server addresses.
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// Subscribe one more topic to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
debug 执行:org.apache.rocketmq.example.quickstart.Consumer#main
看到 ConsumeMessageThread_please_rename_unique_group_name_4_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=190, queueOffset=499, sysFlag=0, bornTimestamp=1652509597631, bornHost=/xx.xx.xx.xx:51928, storeTimestamp=1652509597637, storeHost=/xx.xx.xx.xx:10911, msgId=1E82742D00002A9F000000000005DA66, commitLogOffset=383590, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=511, CONSUME_START_TIME=1652509643679, UNIQ_KEY=1E82742D193B18B4AAC2460C17BF0001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]]
类似信息后,表示消息消费成功
网友评论