背景
- 本文主要介绍在Linux下如何部署rocketmq
- 本次部署的版本为5.0之前最后一个正式版本4.9.0
- 使用的服务器为腾讯云个人服务器(内存只有2G),机器比较小,各方面省吃俭用
部署过程
- 下载rocketmq 4.9.0版本
cd /appcom/Install/Rocketmq
wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.9.0.tar.gz
tar -zxvf rocketmq-all-4.9.0.tar.gz
- 对代码进行编译
cd /appcom/Install/Rocketmq/rocketmq-4.9.0
mvn -Prelease-all -DskipTests clean install -U
- 代码编译成功
-
创建软链
ln -s /appcom/Install/Rocketmq/rocketmq-4.9.0/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0 rocketmq
- 修改环境变量
vim /etc/profile
#写入内容
export ROCKETMQ_HOME=/appcom/Install/rocketmq
export PATH=${PATH}:${ROCKETMQ_HOME}/bin
export NAMESRV_ADDR=127.0.0.1:9876
#source下环境变量
source /etc/profile
```shell
- 修改broke的存储路径
```shell
vim /appcom/Install/rocketmq/conf/broker.conf
#写入内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#删除文件时间点,默认是凌晨4点
deleteWhen = 04
#文件保留时间,默认48小时
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#存储路径
storePathRootDir=/appcom/config/rocketmq/store
#commitLog存储路径
storePathCommitLog=/appcom/config/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/appcom/config/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/appcom/config/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/appcom/config/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/appcom/config/rocketmq/store/abort
- 创建rocketmq日志目录
mkdir -p /appcom/log/rocketmq
- 修改runserver.sh/runbroker.sh/tools.sh中的JAVA_OPT配置项中的JVM参数大小为128m
- 启动namespace和broke
#启动nameserver
nohup sh mqnamesrv -c /appcom/Install/rocketmq/conf/broker.conf >/appcom/log/rocketmq/mqnamesrv.log 2>&1 &
#启动broke
nohup sh mqbroker -n 127.0.0.1:9876 -c /appcom/Install/rocketmq/conf/broker.conf >/appcom/log/rocketmq/broker.log 2>&1 &
#关闭nameserver
sh mqshutdown namesrv
#关闭broke
sh mqshutdown broker
- 查看存储目录和日志目录
[root@kantlin01 store]# pwd
/appcom/config/rocketmq/store
[root@kantlin01 store]# ls
abort checkpoint commitlog config consumequeue index lock
[root@kantlin01 rocketmq]# pwd
/appcom/log/rocketmq
[root@kantlin01 rocketmq]# ls
broker.log mqnamesrv.log
- 执行tool.sh测试脚本
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 查看执行结果
- 窗口(1)生产者
- 消息生产者
- 窗口(2)消费者
- 消息消费者
java例子(假设公网IP为101.101.101.101)
- pom.xml中加入rocketmq的client包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
- 生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
// 创建Producer
// 实例化Producer对象,并指定生产者组名为producer_name
DefaultMQProducer producer = new DefaultMQProducer("producer_name");
// 指定NameServer集群地址,多个用;分隔
producer.setNamesrvAddr("101.101.101.101:9876");
producer.setSendMsgTimeout(60000);
// 启动Producer
producer.start();
// 开始生产数据
for (int i = 0; i < 10; i++) {
// 构建消息对象,包括Topic、Tag、MessageBody
// Topic: 主题
// Tag: 一个主题下面可以分多个Tag。可以理解为一个业务功能(Topic)下有多种消息,Tag用于对消息分类。
// MessageBody: 你要发送的消息,因为网络传输需要字节码,所以要转换一下
Message msg = new Message(
"Topic_Test",
"Tag_A",
("Hello RocketMQ " + i).getBytes()
);
// 【同步发送方式】
SendResult sendResult = producer.send(msg);
// 打印结果
System.out.printf("%s%n", sendResult);
}
// 关闭Producer
producer.shutdown();
}
}
- 消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @ClassName SimpleConsumer
* @authors kantlin
* @Date 2021/11/23 9:44
* @Version 1.0
* @Description
**/
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
// 创建Consumer
// 实例化Consumer对象,并指定消费者组名为push_consumer_name
// 【Push方式】 由Broker推消息到Consumer
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("push_consumer_name");
// 指定NameServer集群地址,多个用;分隔
pushConsumer.setNamesrvAddr("101.101.101.101:9876");
// 设置订阅的Topic与Tag
// 订阅多个Tag,用||分隔,例如 "Tag_A || Tag_B"、"*"
pushConsumer.subscribe("Topic_Test", "Tag_A");
// 设定消费模式 CLUSTERING与BROADCASTING
// CLUSTERING: 负载均衡(默认)。多个消费者消费时,分别消费所有消息的一部分。
// BROADCASTING: 广播模式。多个消费者消费时,都全部消费。
// pushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册一个回调监听,用于接收消息
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
// 接到的消息是字节码,需要解码
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
// 回复 Broker "消费成功"
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
pushConsumer.start();
}
}
网友评论