美文网首页
RocketMq基于docker compose

RocketMq基于docker compose

作者: 凌康ACG | 来源:发表于2020-01-21 00:05 被阅读0次

    基于docker compose部署rocket Mq


    image.png

    现在是2020年1月19日 我的docker都不是最新版的

    一、部署RocketMq

    先创建好几个文件夹,用来放日志和配置:

    mkdir /usr/local/docker/rocketmq
    cd /usr/local/docker/rocketmq
    mkdir data
    cd data
    mkdir logs
    mkdir store
    mkdir brokerconf
    cd brokerconf
    vi broker.conf
    

    其中 broker.conf内容如下:

    brokerClusterName=DefaultCluster
    brokerName=broker-a
    brokerId=0
    # 修改为你宿主机的 IP
    brokerIP1=192.168.30.131
    defaultTopicQueueNums=4
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    listenPort=10911
    deleteWhen=04
    fileReservedTime=120
    mapedFileSizeCommitLog=1073741824
    mapedFileSizeConsumeQueue=300000
    diskMaxUsedSpaceRatio=88
    maxMessageSize=65536
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    

    创建docker-compose.yml

    cd /usr/local/docker/rocketmq
    vi docker-compose.yml的内容如下:

    version: '3.5'
    services:
      rmqnamesrv:
        image: foxiswho/rocketmq:server
        container_name: rmqnamesrv
        ports:
          - 9876:9876
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
        networks:
            rmq:
              aliases:
                - rmqnamesrv
      rmqbroker:
        image: foxiswho/rocketmq:broker
        container_name: rmqbroker
        ports:
          - 10909:10909
          - 10911:10911
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
          - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
        environment:
            NAMESRV_ADDR: "rmqnamesrv:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        command: mqbroker -c /etc/rocketmq/broker.conf
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqbroker
      rmqconsole:
        image: styletang/rocketmq-console-ng
        container_name: rmqconsole
        ports:
          - 8080:8080
        environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqconsole
    networks:
      rmq:
        name: rmq
        driver: bridge
    

    PS:
    1、rocketmq:serverMQ的服务
    2、rocketmq:brokerMq的中间件
    3、rocketmq-console-ngMq的可视化控制台
    运行:docker-compose up
    访问 http://192.168.75.129:8080

    image.png
    然后我们创建一个简单

    二、Spring Boot Demo 之生产者

    参考资料spring-cloud-alibaba:https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en
    这里我们简单的创建一个spring boot demo来调用它。
    项目源码:https://github.com/xcocean/rocketmq-demo
    pom.xml加入,此时需要注意,spring boot使用的版本是2.1.10(2020年1月20日 最新版),如果使用2.2.x会报错,他们的特性不一样了。

            <!-- RocketMQ 依赖 -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
                <version>2.1.1.RELEASE</version>
            </dependency>
    

    application.yml

    spring:
      jackson:
        time-zone: GMT+8
        date-format: yyyy-MM-dd HH:mm:ss
      cloud:
        # RocketMQ 相关配置
        stream:
          rocketmq:
            binder:
              name-server: 192.168.75.129:9876
          bindings:
            #自定义的名称 # test-group(一级分类)
            test-group: {destination: test-group,content-type: application/json}
    

    创建MyProduce

    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    public interface MyProduce {
    
        @Output("test-group")
        MessageChannel log();
    }
    

    在应用启动入口绑定上面的MyProduce.java

    import com.lingkang.rocketmqdemo.mq.MyProduce;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    
    @SpringBootApplication
    @EnableBinding({MyProduce.class})
    public class RocketmqDemoApplication {
        public static void main(String[] args) {
            SpringApplication.run(RocketmqDemoApplication.class, args);
        }
    }
    

    controller中直接装配即可

    import com.lingkang.rocketmqdemo.mq.MyProduce;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author linke
     * @date 2020-01-19 下午 21:08
     * @description
     */
    @RestController
    public class MainController {
    
        @Autowired
        private MyProduce myProduce;
    
        @GetMapping("send")
        public Object send(String msg) throws Exception {
            myProduce.log().send(MessageBuilder.withPayload(msg).build());
            return "";
        }
    
        @GetMapping("send1")
        public Object send1(String msg) {
            //带上标签发送
            MessageBuilder builder = MessageBuilder.withPayload(msg)
                    .setHeader(RocketMQHeaders.TAGS, "log")
                    .setHeader(RocketMQHeaders.KEYS, "my-key")
                    .setHeader("DELAY", "1");
            Message message = builder.build();
            myProduce.log().send(message);
            return message;
        }
    }
    

    访问http://localhost:8080/send?msg=%E4%BD%A0%E5%A5%BD11111111
    http://localhost:8080/send1?msg=%E4%BD%A0%E5%A5%BD11111111

    image.png
    项目结构如下:
    image.png

    三、Spring Boot Demo 之消费者

    基于上面的项目
    改动application.yml 添加消费者绑定

    spring:
      jackson:
        time-zone: GMT+8
        date-format: yyyy-MM-dd HH:mm:ss
      cloud:
        # RocketMQ 相关配置
        stream:
          rocketmq:
            binder:
              name-server: 192.168.75.129:9876
          bindings:
            #自定义的名称 # test-group(一级分类)
            test-group: {destination: test-group,content-type: application/json}
            # consumer.maxAttempts
            # 消息最大可以被尝试消费的次数,包含第一次投递
            # 设为 1,表示不重试,注意该值必须大于 0
            input-consumer: {destination: test-group, content-type: text/plain,
                             group: test-group, consumer.maxAttempts: 1}
    

    创建MyConsumer

    public interface MyConsumer {
        @Input("input-consumer")
        MessageChannel log();
    }
    

    创建接收监听MyConsumerReceive

    @Component
    public class MyConsumerReceive {
    
        @StreamListener("input-consumer")
        public void receiveConsumer(Object msg) {
            System.out.println("消息消费:" + msg);
        }
    }
    

    结果如下:


    消息被消费.png
    消息被消费.png
    image.png

    相关文章

      网友评论

          本文标题:RocketMq基于docker compose

          本文链接:https://www.haomeiwen.com/subject/fwhdzctx.html