美文网首页
Linux下单节点rocketmq的部署

Linux下单节点rocketmq的部署

作者: _Kantin | 来源:发表于2021-11-22 14:45 被阅读0次

背景

  • 本文主要介绍在Linux下如何部署rocketmq
  • 本次部署的版本为5.0之前最后一个正式版本4.9.0
  • 使用的服务器为腾讯云个人服务器(内存只有2G),机器比较小,各方面省吃俭用

部署过程

  • 下载rocketmq 4.9.0版本
cd /appcom/Install/Rocketmq
wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.9.0.tar.gz
tar -zxvf rocketmq-all-4.9.0.tar.gz
  • 对代码进行编译
 cd  /appcom/Install/Rocketmq/rocketmq-4.9.0
 mvn -Prelease-all -DskipTests clean install -U
  • 代码编译成功
  • 创建软链

ln -s /appcom/Install/Rocketmq/rocketmq-4.9.0/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0  rocketmq
  • 修改环境变量
vim /etc/profile
#写入内容
export ROCKETMQ_HOME=/appcom/Install/rocketmq
export PATH=${PATH}:${ROCKETMQ_HOME}/bin
export NAMESRV_ADDR=127.0.0.1:9876

#source下环境变量
source  /etc/profile
```shell
- 修改broke的存储路径
```shell
vim /appcom/Install/rocketmq/conf/broker.conf
#写入内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#删除文件时间点,默认是凌晨4点
deleteWhen = 04
#文件保留时间,默认48小时
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#存储路径
storePathRootDir=/appcom/config/rocketmq/store
#commitLog存储路径
storePathCommitLog=/appcom/config/rocketmq/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/appcom/config/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/appcom/config/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/appcom/config/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/appcom/config/rocketmq/store/abort
  • 创建rocketmq日志目录
mkdir -p /appcom/log/rocketmq
  • 修改runserver.sh/runbroker.sh/tools.sh中的JAVA_OPT配置项中的JVM参数大小为128m
  • 启动namespace和broke
#启动nameserver
nohup sh mqnamesrv  -c  /appcom/Install/rocketmq/conf/broker.conf  >/appcom/log/rocketmq/mqnamesrv.log 2>&1 &
#启动broke
nohup sh mqbroker -n 127.0.0.1:9876 -c  /appcom/Install/rocketmq/conf/broker.conf >/appcom/log/rocketmq/broker.log 2>&1 &
#关闭nameserver
sh mqshutdown namesrv
#关闭broke
sh mqshutdown broker
  • 查看存储目录和日志目录
[root@kantlin01 store]# pwd
/appcom/config/rocketmq/store
[root@kantlin01 store]# ls
abort  checkpoint  commitlog  config  consumequeue  index  lock

[root@kantlin01 rocketmq]# pwd
/appcom/log/rocketmq
[root@kantlin01 rocketmq]# ls
broker.log  mqnamesrv.log
  • 执行tool.sh测试脚本
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer 
  • 查看执行结果
    • 窗口(1)生产者
    • 消息生产者
    • 窗口(2)消费者
    • 消息消费者

java例子(假设公网IP为101.101.101.101)

  • pom.xml中加入rocketmq的client包
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.9.0</version>
    </dependency>
  • 生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;


public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        // 创建Producer
        // 实例化Producer对象,并指定生产者组名为producer_name
        DefaultMQProducer producer = new DefaultMQProducer("producer_name");
        // 指定NameServer集群地址,多个用;分隔
        producer.setNamesrvAddr("101.101.101.101:9876");
        producer.setSendMsgTimeout(60000);
        // 启动Producer
        producer.start();

        // 开始生产数据
        for (int i = 0; i < 10; i++) {
            // 构建消息对象,包括Topic、Tag、MessageBody
            // Topic: 主题
            // Tag: 一个主题下面可以分多个Tag。可以理解为一个业务功能(Topic)下有多种消息,Tag用于对消息分类。
            // MessageBody: 你要发送的消息,因为网络传输需要字节码,所以要转换一下
            Message msg = new Message(
                    "Topic_Test",
                    "Tag_A",
                    ("Hello RocketMQ " + i).getBytes()
            );

            // 【同步发送方式】
            SendResult sendResult = producer.send(msg);
            // 打印结果
            System.out.printf("%s%n", sendResult);
        }

        // 关闭Producer
        producer.shutdown();
    }
}
  • 消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @ClassName SimpleConsumer
 * @authors kantlin
 * @Date 2021/11/23 9:44
 * @Version 1.0
 * @Description
 **/

public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
        // 创建Consumer
        // 实例化Consumer对象,并指定消费者组名为push_consumer_name
        // 【Push方式】 由Broker推消息到Consumer
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("push_consumer_name");
        // 指定NameServer集群地址,多个用;分隔
        pushConsumer.setNamesrvAddr("101.101.101.101:9876");
        // 设置订阅的Topic与Tag
        // 订阅多个Tag,用||分隔,例如 "Tag_A || Tag_B"、"*"
        pushConsumer.subscribe("Topic_Test", "Tag_A");

        // 设定消费模式 CLUSTERING与BROADCASTING
        // CLUSTERING: 负载均衡(默认)。多个消费者消费时,分别消费所有消息的一部分。
        // BROADCASTING: 广播模式。多个消费者消费时,都全部消费。
        // pushConsumer.setMessageModel(MessageModel.CLUSTERING);

        // 注册一个回调监听,用于接收消息
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    // 接到的消息是字节码,需要解码
                    byte[] body = messageExt.getBody();
                    System.out.println(new String(body));
                }

                // 回复 Broker "消费成功"
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动Consumer
        pushConsumer.start();
    }
}

相关文章

网友评论

      本文标题:Linux下单节点rocketmq的部署

      本文链接:https://www.haomeiwen.com/subject/flrbtrtx.html