美文网首页
07.`RocketMq`的部署及简单使用

07.`RocketMq`的部署及简单使用

作者: 风安峻_ | 来源:发表于2020-09-11 19:50 被阅读0次
1. docker-compose部署rockermq
  1. 编辑docker-compose.yml

    1. vim docker-compose.yml

      编辑 docker-compose.yml
    2. docker-compose.yml内容

      version: "3.7"
      services:
        rocketmq-server:
          image: foxiswho/rocketmq:server
          container_name: rocketmq-server
          ports:
            - 9876:9876
          volumes:
            - ./rocketmq/server/logs:/opt/logs
            - ./rocketmq/server/store:/opt/store
          networks:
            rocketmq:
              aliases:
                - rocketmq-server
      
        rocketmq-broker:
          image: foxiswho/rocketmq:broker
          container_name: rocketmq-broker
          ports:
            - 10909:10909
            - 10911:10911
          volumes:
            - ./rocketmq/broker/logs:/opt/logs
            - ./rocketmq/broker/store:/opt/store
            - ./rocketmq/broker/conf/broker.conf:/etc/rocketmq/broker.conf
          environment:
            NAMESRV_ADDR: "rocketmq-server:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
          command: mqbroker -c /etc/rocketmq/broker.conf
          depends_on:
            - rocketmq-server
          networks:
            rocketmq:
              aliases:
                - rocketmq-broker
      
        rocketmq-console:
          image: styletang/rocketmq-console-ng
          container_name: rocketmq-console
          ports:
            - 8080:8080
          environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-server:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
          depends_on:
            - rocketmq-server
          networks:
            rocketmq:
              aliases:
                - rocketmq-console
      
      networks:
        rocketmq:
          name: rocketmq
          driver: bridge    
      
      1. networds:网络桥接,使rocketmq的三个组件处于同一网段
      2. docker netword ls命令可以查看docker-compose网络
  2. 编辑broker.conf

    1. rocketmq-broker有一个配置文件broker.conf,需要一些自定义的配置

    2. 根据docker-compose.yml中的volumes所配置的,需要在./rocketmq/broker/conf/目录下创建一个名为broker.conf的文件

      # 在当前目录下递归创建目录
      mkdir -p rocketmq/broker/conf/
      
      创建目录
    3. broker.conf内容

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      #  Unless required by applicable law or agreed to in writing, software
      #  distributed under the License is distributed on an "AS IS" BASIS,
      #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      #  See the License for the specific language governing permissions and
      #  limitations under the License.
      
      # 所属集群名字
      brokerClusterName=DefaultCluster
      
      # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a
      # 在 broker-b.properties 使用: broker-b
      brokerName=broker-a
      
      # 0 表示 Master,> 0 表示 Slave
      brokerId=0
      
      # nameServer地址,分号分割
      # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
      
      # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
      # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不使用docker 内部IP
      # brokerIP1=47.115.35.187
      
      # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
      defaultTopicQueueNums=4
      
      # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
      autoCreateTopicEnable=true
      
      # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
      autoCreateSubscriptionGroup=true
      
      # Broker 对外服务的监听端口
      listenPort=10911
      
      # 删除文件时间点,默认凌晨4点
      deleteWhen=04
      
      # 文件保留时间,默认48小时
      fileReservedTime=120
      
      # commitLog 每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824
      
      # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000
      
      # destroyMapedFileIntervalForcibly=120000
      # redeleteHangedFileInterval=120000
      # 检测物理文件磁盘空间
      diskMaxUsedSpaceRatio=88
      # 存储路径
      # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
      # commitLog 存储路径
      # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
      # 消费队列存储
      # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
      # 消息索引存储路径
      # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
      # checkpoint 文件存储路径
      # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
      # abort 文件存储路径
      # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
      # 限制的消息大小
      maxMessageSize=65536
      
      # flushCommitLogLeastPages=4
      # flushConsumeQueueLeastPages=2
      # flushCommitLogThoroughInterval=10000
      # flushConsumeQueueThoroughInterval=60000
      
      # Broker 的角色
      # - ASYNC_MASTER 异步复制Master
      # - SYNC_MASTER 同步双写Master
      # - SLAVE
      brokerRole=ASYNC_MASTER
      
      # 刷盘方式
      # - ASYNC_FLUSH 异步刷盘
      # - SYNC_FLUSH 同步刷盘
      flushDiskType=ASYNC_FLUSH
      
      # 发消息线程池数量
      # sendMessageThreadPoolNums=128
      # 拉消息线程池数量
      # pullMessageThreadPoolNums=128        
      
    4. 注意

      如果部署在服务器,那么需要将brokerIP1的值设置为服务器宿主机ip地址,否则,信息将不能注册上去,发送消息将会失败

  3. 启动

    1. 返回docker-compose.yml文件所在的目录

      # 返回上一次所在的目录
      cd -
      
      返回docker-compose文件所在的目录
    2. 运行docker-compose.yml

      # 运行 docker-compose.yml
      docker-compose up -d
      
      # 查看容器运行情况
      docker ps -a
      
      容器运行
  4. 浏览器查看 服务器ip:端口号

    服务器ip:端口号
2. rocketmq-spring-boot-starter的简单使用
  1. 依赖

    1. 依赖

      <!-- rocket mq -->
      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-spring-boot-starter</artifactId>
          <version>2.0.4</version>
      </dependency>
      
    2. 说明

      这是spring-boot的依赖

  2. 配置文件

    1. rocket-mq-provider提供者的application.yml主配置文件

      server:
        port: 8081
      
      rocketmq:
        # RocketMq-Server的服务器地址
        name-server: 120.25.207.44:9876
        producer:
          # 配置属于哪个 group
          group: zero-rocket-mq-test-group
          # 过期时间
          send-message-timeout: 30000        
      
    2. rocket-mq-consumer消费者的application.yml主配置文件

      server:
        port: 8082
      
      rocketmq:
        name-server: 120.25.207.44:9876
        producer:
          send-message-timeout: 30000        
      
  3. 栗子

    1. rocket-mq-providercontroller

      import lombok.extern.slf4j.Slf4j;
      import org.apache.rocketmq.client.producer.SendCallback;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.spring.core.RocketMQTemplate;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      
      import javax.annotation.Resource;
      
      /**
       * 说明:
       *
       * @author sheng
       */
      @Slf4j
      @RestController()
      @RequestMapping("/test")
      public class TestController {
          @Resource
          RocketMQTemplate rocketMqTemplate;
      
          @GetMapping("/test")
          public String test() {
              // 发送消息的主题
              String topic = "test";
              // 发送到 RocketMq 的消息
              String message = "Hello rocketMq";
              
              // syncSend()是同步发送消息,直接返回 SendResult 对象
              // asyncSend() 异步发送消息
              //  new SendCallback() SendResult 返回的消息对象
              rocketMqTemplate.asyncSend(topic, message, new SendCallback() {
                  @Override
                  public void onSuccess(SendResult sendResult) {
                      String msgId = sendResult.getMsgId();
                      log.info("provider_onSuccess: " + msgId);
                  }
      
                  @Override
                  public void onException(Throwable throwable) {
      
                  }
              });
      
              return "rocket mq";
          }
      }        
      
    2. rocket-mq-consumerlistener

      import lombok.extern.slf4j.Slf4j;
      import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
      import org.apache.rocketmq.spring.core.RocketMQListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 说明:@RocketMQMessageListener(topic = "test", consumerGroup = "zero-rocket-mq-test-group")中
       * topic 对应提供者 controller 中
       * public void asyncSend(String destination,Object payload,org.apache.rocketmq.client.producer.SendCallback sendCallback)
       * 的 `destination`, consumerGroup 对应提供者主配置文件配置的 group
       *
       * @author sheng
       */
      @Slf4j
      @Component
      @RocketMQMessageListener(topic = "test", consumerGroup = "zero-rocket-mq-test-group")
      public class TestListener implements RocketMQListener<String> {
          @Override
          public void onMessage(String s) {
              log.info(s);
          }
      }       
      
    3. 浏览器

      1. 执行提供者的controller方法

        controller
      2. 查看RocketMq-console-ng消费者

        消费者
  4. rocketmq-demo 源码

3. spring-cloud-starter-stream-rocketmq的简单使用
  1. 官方文档

    spring-cloud-alibaba-rocketmq github 文档

    spring-cloud-alibaba-wiki-RocketMQ 文档

  2. 依赖

    1. 依赖

      <!-- spring-cloud-rocketmq -->
      <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
      </dependency>                
      
      1. 使用了spring-cloud-rocketmq就不需要rocketmq-spring-boot-starter
    2. 父依赖

      <properties>
          <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
      </properties>
      
      <dependencyManagement>
          <dependencies>
              <!-- spring-cloud-alibaba-dependencies -->
              <dependency>
                  <groupId>com.alibaba.cloud</groupId>
                  <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                  <version>${spring-cloud-alibaba.version}</version>
                  <type>pom</type>
                  <scope>import</scope>
              </dependency>
          </dependencies>
      </dependencyManagement>
      
  3. 配置文件以及启动类

    1. rocket-mq-provider提供者

      1. application.yml主配置文件

        server:
          port: 8081
        
        spring:
          application:
            name: rocket-mq-provider
          cloud:
            stream:
              rocketmq:
                binder:
                  # 配置 rocketMq 的 nameserver 服务器地址
                  name-server: 120.25.207.44:9876
              bindings:
                # 定义名为 output 的 binding,名字随意起
                # 值为 Map,{} 为 yml 中 Map 的行内写法
                output: {destination: test-topic, content-type: application/json}
                # 测试自定义 binding
                myOutput1: {destination: my-topic-1, content-type: application/json}
                myOutput2: {destination: my-topic-2, content-type: application/json}            
        
      2. 启动类

        import com.sheng.cloud.provider.binding.MySource;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        import org.springframework.cloud.stream.annotation.EnableBinding;
        import org.springframework.cloud.stream.messaging.Source;
        
        @EnableBinding({Source.class, MySource.class})
        @SpringBootApplication
        public class RocketMqProviderApplication {
        
            public static void main(String[] args) {
                SpringApplication.run(RocketMqProviderApplication.class, args);
            }
        
        }            
        
        1. 启动类要用@EnableBinding注解标记,值是一个Class<?>[] value() default {};泛型字节码对象数组
        2. Source.classorg.springframework.cloud.stream.messaging.Source,自带的binding,用于发送消息
        3. MySource.class是自定义的binding
    2. rocket-mq-consumer消费者

      1. application.yml主配置文件

        server:
          port: 8082
        
        spring:
          application:
            name: rocket-mq-consumer
          cloud:
            stream:
               rocketmq:
                 binder:
                   # 配置 rocketMq 的 nameserver 服务器地址
                   name-server: 120.25.207.44:9876
               bindings:
                 # 定义名为 input 的 binding,名字随意起
                 # 值为 Map,{} 为 yml 中 Map 的行内写法
                 # {} 里面的值 destination、content-type要与 提供者的 output 的值对应上
                 input: {destination: test-topic, content-type: application/json, group: test-group}
                 # 自定义 binding
                 myInput1: {destination: my-topic-1, content-type: application/json, group: my-binding-1}
                 myInput2: {destination: my-topic-2, content-type: application/json, group: my-binding-2}            
        
      2. 启动类

        import com.sheng.cloud.provider.binding.MySink;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        import org.springframework.cloud.stream.annotation.EnableBinding;
        import org.springframework.cloud.stream.messaging.Sink;
        
        @EnableBinding({Sink.class, MySink.class})
        @SpringBootApplication
        public class RocketMqConsumerApplication {
        
            public static void main(String[] args) {
                SpringApplication.run(RocketMqConsumerApplication.class, args);
            }
        
        }            
        
        1. 启动类要用@EnableBinding注解标记,值是一个Class<?>[] value() default {};泛型字节码对象数组
        2. Sink.classorg.springframework.cloud.stream.messaging.Source,自带的binding,用于监听信息
        3. MySink.class是自定义的binding
  4. 栗子

    1. 自带的binding栗子

      1. rocket-mq-provider提供者

        1. 本例子在controller使用

          import org.springframework.messaging.MessageChannel;
          import org.springframework.messaging.support.MessageBuilder;
          import org.springframework.web.bind.annotation.GetMapping;
          import org.springframework.web.bind.annotation.PathVariable;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RestController;
          
          import javax.annotation.Resource;
          
          /**
           * 说明:测试自带 binding
           *
           * @author sheng
           */
          @RestController
          @RequestMapping("/test")
          public class TestController {
              /**
               * Resource(name = "output") 中 name 为 主配置文件 application.yml 中配置的
               * spring.cloud.stream.bindings.output
               */
              @Resource(name = "output")
              private MessageChannel messageChannel;
          
              @GetMapping("/test/{phone}")
              public String test(@PathVariable String phone) {
                  String message = "hello_";
                  // 发送消息
                  messageChannel.send(MessageBuilder.withPayload(message + phone).build());
                  return "发送消息成功_" + phone;
              }
          }                
          
        2. 注意

          1. 启动类中要用@EnableBinding标记,并把Source.class注册上去,使binding生效
      2. rocket-mq-consumer消费者

        1. 本例子创建listener

          import lombok.extern.slf4j.Slf4j;
          import org.springframework.cloud.stream.annotation.StreamListener;
          import org.springframework.stereotype.Component;
          
          /**
           * 说明:必须注册到 spring 容器中
           *
           * @author sheng
           */
          @Slf4j
          @Component
          public class TestListener {
              @StreamListener("input")
              public void input(String message) {
                  log.info("testListener: " + message);
              }
          }                
          
          1. listener必须注册到容器中
          2. 方法使用@StreamListener标记,值是主配置文件application.yml中配置的binding。(org.springframework.cloud.stream.annotation.StreamListener;)
        2. 注意

          1. listener必须注册到容器中
          2. 启动类中要用@EnableBinding标记,并把Sink.class注册上去,使binding生效
      3. 结果

        1. 先访问提供者controller,使其发送信息

          提供者 controller
        2. RocketMq-console-ng查看topic主题、message消息

          topic 主题 message 消息
        3. 控制台

          控制台
    2. 自定义binding栗子

      1. rocket-mq-provider提供者

        1. 自定义bingding

          import org.springframework.cloud.stream.annotation.Output;
          import org.springframework.messaging.MessageChannel;
          
          /**
           * 说明:自定义 binding,不需要注册到容器中,在启动类中配合 @EnableBinding 使其生效
           *
           * @author sheng
           */
          public interface MySource {
              /**
               * 自定义 binding
               * @return org.springframework.messaging.MessageChannel 对象
               */
              @Output("myOutput1")
              MessageChannel myOutput1();
          
              /**
               * 自定义 binding
               * @return org.springframework.messaging.MessageChannel 对象
               */
              @Output("myOutput2")
              MessageChannel myOutput2();
          }                
          
          1. 使用@Output注解标记方法,值是主配置文件application.yml中设置的binding
          2. 方法返回值是org.springframework.messaging.MessageChannel,用于发送消息
          3. 启动类中要用@EnableBinding标记,并把MySource.class注册上去,使自定义binding生效
        2. controller

          import com.sheng.cloud.provider.binding.MySource;
          import org.springframework.messaging.support.MessageBuilder;
          import org.springframework.web.bind.annotation.GetMapping;
          import org.springframework.web.bind.annotation.PathVariable;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RestController;
          
          import javax.annotation.Resource;
          
          /**
           * 说明:
           *
           * @author sheng
           */
          @RestController
          @RequestMapping("/binding")
          public class MyBindingController {
              @Resource
              private MySource mySource;
          
              @GetMapping("/myOutput1/{phone}")
              public String myOutput1(@PathVariable String phone) {
                  String message = "自定义 binding myOutput1:";
                  // 发送消息
                  mySource.myOutput1().send(MessageBuilder.withPayload(message + phone).build());
                  return "MyBindingController_myOutput1_" + phone;
              }
          
              @GetMapping("/myOutput2/{phone}")
              public String myOutput2(@PathVariable String phone) {
                  String message = "自定义 binding myOutput2:";
                  // 发送消息
                  mySource.myOutput2().send(MessageBuilder.withPayload(message + phone).build());
                  return "MyBindingController_myOutput2_" + phone;
              }
          }                
          
      2. rocket-mq-consumer消费者

        1. 自定义binding

          import org.springframework.cloud.stream.annotation.Input;
          import org.springframework.messaging.SubscribableChannel;
          
          /**
           * 说明:不需要注册到容器中,在启动类中 @EnableBinding 使其生效
           *
           * @author sheng
           */
          public interface MySink {
              /**
               * 自定义 binding
               *
               * @return org.springframework.messaging.SubscribableChannel 对象
               */
              @Input("myInput1")
              SubscribableChannel myInput1();
          
              /**
               * 自定义 binding
               *
               * @return org.springframework.messaging.SubscribableChannel 对象
               */
              @Input("myInput2")
              SubscribableChannel myInput2();
          }                
          
          1. 使用@Input注解标记方法,值是主配置文件application.yml中设置的binding
          2. 方法返回值是org.springframework.messaging.SubscribableChannel;,用于监听消息
          3. 启动类中要用@EnableBinding标记,并把自定义的MySink.class注册上去,使自定义binding生效
        2. listener

          import lombok.extern.slf4j.Slf4j;
          import org.springframework.cloud.stream.annotation.StreamListener;
          import org.springframework.stereotype.Component;
          
          /**
           * 说明:必须注册到容器中
           *
           * @author sheng
           */
          @Slf4j
          @Component
          public class TestMyBindingListener {
              @StreamListener("myInput1")
              public void myInput1(String message) {
                  log.info("TestMyBindingListener_myInput1: " + message);
              }
          
              @StreamListener("myInput2")
              public void myInput2(String message) {
                  log.info("TestMyBindingListener_myInput2: " + message);
              }
          }                
          
          1. listener必须注册到容器中
          2. 方法使用@StreamListener标记,值是主配置文件application.yml中配置的binding。(org.springframework.cloud.stream.annotation.StreamListener;)
      3. 结果

        1. 先访问提供者controller,使其发送信息

          提供者 controller 提供者 controller
        2. RocketMq-console-ng查看topic主题、message消息

          topic 主题 message 消息1 message 消息2
        3. 控制台

          控制台
  5. spring-cloud-rocket-mq-demo 源码

相关文章

网友评论

      本文标题:07.`RocketMq`的部署及简单使用

      本文链接:https://www.haomeiwen.com/subject/tjdtektx.html