美文网首页
Linux下单节点rocketmq的部署

Linux下单节点rocketmq的部署

作者: _Kantin | 来源:发表于2021-11-22 14:45 被阅读0次

    背景

    • 本文主要介绍在Linux下如何部署rocketmq
    • 本次部署的版本为5.0之前最后一个正式版本4.9.0
    • 使用的服务器为腾讯云个人服务器(内存只有2G),机器比较小,各方面省吃俭用

    部署过程

    • 下载rocketmq 4.9.0版本
    cd /appcom/Install/Rocketmq
    wget https://github.com/apache/rocketmq/archive/rocketmq-all-4.9.0.tar.gz
    tar -zxvf rocketmq-all-4.9.0.tar.gz
    
    • 对代码进行编译
     cd  /appcom/Install/Rocketmq/rocketmq-4.9.0
     mvn -Prelease-all -DskipTests clean install -U
    
    • 代码编译成功
    • 创建软链

    ln -s /appcom/Install/Rocketmq/rocketmq-4.9.0/distribution/target/rocketmq-4.9.0/rocketmq-4.9.0  rocketmq
    
    • 修改环境变量
    vim /etc/profile
    #写入内容
    export ROCKETMQ_HOME=/appcom/Install/rocketmq
    export PATH=${PATH}:${ROCKETMQ_HOME}/bin
    export NAMESRV_ADDR=127.0.0.1:9876
    
    #source下环境变量
    source  /etc/profile
    ```shell
    - 修改broke的存储路径
    ```shell
    vim /appcom/Install/rocketmq/conf/broker.conf
    #写入内容
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    #删除文件时间点,默认是凌晨4点
    deleteWhen = 04
    #文件保留时间,默认48小时
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    #存储路径
    storePathRootDir=/appcom/config/rocketmq/store
    #commitLog存储路径
    storePathCommitLog=/appcom/config/rocketmq/store/commitlog
    #消费队列存储路径
    storePathConsumeQueue=/appcom/config/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/appcom/config/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/appcom/config/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/appcom/config/rocketmq/store/abort
    
    • 创建rocketmq日志目录
    mkdir -p /appcom/log/rocketmq
    
    • 修改runserver.sh/runbroker.sh/tools.sh中的JAVA_OPT配置项中的JVM参数大小为128m
    • 启动namespace和broke
    #启动nameserver
    nohup sh mqnamesrv  -c  /appcom/Install/rocketmq/conf/broker.conf  >/appcom/log/rocketmq/mqnamesrv.log 2>&1 &
    #启动broke
    nohup sh mqbroker -n 127.0.0.1:9876 -c  /appcom/Install/rocketmq/conf/broker.conf >/appcom/log/rocketmq/broker.log 2>&1 &
    #关闭nameserver
    sh mqshutdown namesrv
    #关闭broke
    sh mqshutdown broker
    
    • 查看存储目录和日志目录
    [root@kantlin01 store]# pwd
    /appcom/config/rocketmq/store
    [root@kantlin01 store]# ls
    abort  checkpoint  commitlog  config  consumequeue  index  lock
    
    [root@kantlin01 rocketmq]# pwd
    /appcom/log/rocketmq
    [root@kantlin01 rocketmq]# ls
    broker.log  mqnamesrv.log
    
    • 执行tool.sh测试脚本
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer 
    
    • 查看执行结果
      • 窗口(1)生产者
      • 消息生产者
      • 窗口(2)消费者
      • 消息消费者

    java例子(假设公网IP为101.101.101.101)

    • pom.xml中加入rocketmq的client包
        <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>4.9.0</version>
        </dependency>
    
    • 生产者
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    
    public class SimpleProducer {
        public static void main(String[] args) throws Exception {
            // 创建Producer
            // 实例化Producer对象,并指定生产者组名为producer_name
            DefaultMQProducer producer = new DefaultMQProducer("producer_name");
            // 指定NameServer集群地址,多个用;分隔
            producer.setNamesrvAddr("101.101.101.101:9876");
            producer.setSendMsgTimeout(60000);
            // 启动Producer
            producer.start();
    
            // 开始生产数据
            for (int i = 0; i < 10; i++) {
                // 构建消息对象,包括Topic、Tag、MessageBody
                // Topic: 主题
                // Tag: 一个主题下面可以分多个Tag。可以理解为一个业务功能(Topic)下有多种消息,Tag用于对消息分类。
                // MessageBody: 你要发送的消息,因为网络传输需要字节码,所以要转换一下
                Message msg = new Message(
                        "Topic_Test",
                        "Tag_A",
                        ("Hello RocketMQ " + i).getBytes()
                );
    
                // 【同步发送方式】
                SendResult sendResult = producer.send(msg);
                // 打印结果
                System.out.printf("%s%n", sendResult);
            }
    
            // 关闭Producer
            producer.shutdown();
        }
    }
    
    • 消费者
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @ClassName SimpleConsumer
     * @authors kantlin
     * @Date 2021/11/23 9:44
     * @Version 1.0
     * @Description
     **/
    
    public class SimpleConsumer {
        public static void main(String[] args) throws Exception {
            // 创建Consumer
            // 实例化Consumer对象,并指定消费者组名为push_consumer_name
            // 【Push方式】 由Broker推消息到Consumer
            DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("push_consumer_name");
            // 指定NameServer集群地址,多个用;分隔
            pushConsumer.setNamesrvAddr("101.101.101.101:9876");
            // 设置订阅的Topic与Tag
            // 订阅多个Tag,用||分隔,例如 "Tag_A || Tag_B"、"*"
            pushConsumer.subscribe("Topic_Test", "Tag_A");
    
            // 设定消费模式 CLUSTERING与BROADCASTING
            // CLUSTERING: 负载均衡(默认)。多个消费者消费时,分别消费所有消息的一部分。
            // BROADCASTING: 广播模式。多个消费者消费时,都全部消费。
            // pushConsumer.setMessageModel(MessageModel.CLUSTERING);
    
            // 注册一个回调监听,用于接收消息
            pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt messageExt : list) {
                        // 接到的消息是字节码,需要解码
                        byte[] body = messageExt.getBody();
                        System.out.println(new String(body));
                    }
    
                    // 回复 Broker "消费成功"
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 启动Consumer
            pushConsumer.start();
        }
    }
    

    相关文章

      网友评论

          本文标题:Linux下单节点rocketmq的部署

          本文链接:https://www.haomeiwen.com/subject/flrbtrtx.html