美文网首页
消息队列之RocketMQ

消息队列之RocketMQ

作者: Demon先生 | 来源:发表于2020-06-09 17:13 被阅读0次

    1. 什么是RocketMQ

    1.1. 简介

    Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。

    1.2. 特点

    RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

    • 支持严格的消息顺序
    • 支持 Topic 与 Queue 两种模式
    • 亿级消息堆积能力
    • 比较友好的分布式特性
    • 同时支持 Push 与 Pull 方式消费消息
    • 历经多次天猫双十一海量消息考验

    2. 安装RocketMQ服务端

    这里使用docker-compose进行安装。
    注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存

    2.1. 创建配置文件

    RocketMQ Broker 需要一个配置文件,需要在 ./data/brokerconf/ 目录下创建一个名为 broker.conf 的配置文件,内容如下:

    # 所属集群名字
    brokerClusterName=DefaultCluster
    
    # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
    # 在 broker-b.properties 使用: broker-b
    brokerName=broker-a
    
    # 0 表示 Master,> 0 表示 Slave
    brokerId=0
    
    # nameServer地址,分号分割
    # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    
    # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
    # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
    # brokerIP1=192.168.0.253
    
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    
    # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
    autoCreateTopicEnable=true
    
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    
    # Broker 对外服务的监听端口
    listenPort=10911
    
    # 删除文件时间点,默认凌晨4点
    deleteWhen=04
    
    # 文件保留时间,默认48小时
    fileReservedTime=120
    
    # commitLog 每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    
    # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
    # destroyMapedFileIntervalForcibly=120000
    # redeleteHangedFileInterval=120000
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    # 存储路径
    # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
    # commitLog 存储路径
    # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
    # 消费队列存储
    # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
    # 消息索引存储路径
    # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
    # checkpoint 文件存储路径
    # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
    # abort 文件存储路径
    # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
    # 限制的消息大小
    maxMessageSize=65536
    
    # flushCommitLogLeastPages=4
    # flushConsumeQueueLeastPages=2
    # flushCommitLogThoroughInterval=10000
    # flushConsumeQueueThoroughInterval=60000
    
    # Broker 的角色
    # - ASYNC_MASTER 异步复制Master
    # - SYNC_MASTER 同步双写Master
    # - SLAVE
    brokerRole=ASYNC_MASTER
    
    # 刷盘方式
    # - ASYNC_FLUSH 异步刷盘
    # - SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    # 发消息线程池数量
    # sendMessageThreadPoolNums=128
    # 拉消息线程池数量
    # pullMessageThreadPoolNums=128
    

    注意:启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed,brokerIP1 设置宿主机IP,不要使用docker 内部,brokerIP1=192.168.25.132

    2.2. 创建docker-compose.yml并启动

    version: '3.5'
    services:
      rmqnamesrv:
        image: foxiswho/rocketmq:server
        container_name: rmqnamesrv
        ports:
          - 9876:9876
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
        networks:
            rmq:
              aliases:
                - rmqnamesrv
    
      rmqbroker:
        image: foxiswho/rocketmq:broker
        container_name: rmqbroker
        ports:
          - 10909:10909
          - 10911:10911
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
          - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
        environment:
            NAMESRV_ADDR: "rmqnamesrv:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        command: mqbroker -c /etc/rocketmq/broker.conf
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqbroker
    
      rmqconsole:
        image: styletang/rocketmq-console-ng
        container_name: rmqconsole
        ports:
          - 8080:8080
        environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqconsole
    
    networks:
      rmq:
        name: rmq
        driver: bridge
    

    使用命令docker-compose up -d进行启动

    2.3. 测试访问

    浏览器输入地址:ip:8080, 登入控制台,如下图所示,表示创建成功。

    image.png

    3. RocketMQ生产者

    3.1. 添加依赖

    主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.6.RELEASE</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>springboot-rocketmq-provider</artifactId>
    
        <properties>
            <java.version>1.7</java.version>
        </properties>
    
        <dependencies>
            <!-- Spring Boot Begin -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- Spring Boot End -->
            <!-- Spring Cloud Begin -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
                <version>0.2.1.RELEASE</version>
            </dependency>
            <!-- Spring Cloud End -->
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <mainClass>com.sprintboot.rocketmq.provider.RocketMQProviderApplication</mainClass>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    3.2. 配置Application

    配置 Output(Source.class) 的 Binding 信息并配合 @EnableBinding 注解使其生效

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @SpringBootApplication
    @EnableBinding({Source.class})
    @RestController
    public class RocketMQProviderApplication{
    
        public static void main(String[] args) {
            SpringApplication.run(RocketMQProviderApplication.class, args);
        }
    
        @Autowired
        private MessageChannel output;
    
        @RequestMapping("send/{message}")
        public String send(@PathVariable String message) {
            output.send(MessageBuilder.withPayload(message).build());
            return "发送成功";
        }
    }
    

    3.3. 配置application.yml

    spring:
      application:
        name: rocketmq-provider
      cloud:
        stream:
          rocketmq:
            binder:
              # RocketMQ 服务器地址
              namesrv-addr: 192.168.25.132:9876
          bindings:
            # 这里是个 Map 类型参数,{} 为 YAML 中 Map 的行内写法
            output: {destination: my-test-topic, content-type: application/json}
    
    server:
      port: 9093
    
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
    #logging:
    #  level:
    #    root: debug
    

    运行成功后即可在 RocketMQ 控制台的 消息 列表中选择 test-topic 主题即可看到发送的消息

    4. RocketMQ消费者

    4.1. 添加依赖

    主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.6.RELEASE</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>springboot-rocketmq-consumer</artifactId>
    
        <properties>
            <java.version>1.7</java.version>
        </properties>
    
        <dependencies>
            <!-- Spring Boot Begin -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- Spring Boot End -->
            <!-- Spring Cloud Begin -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
                <version>0.2.1.RELEASE</version>
            </dependency>
            <!-- Spring Cloud End -->
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <mainClass>com.sprintboot.rocketmq.consumer.RocketMQConsumerApplication</mainClass>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    4.2. 配置Application

    配置 Input(Sink.class) 的 Binding 信息并配合 @EnableBinding 注解使其生效

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    
    @SpringBootApplication
    @EnableBinding({Sink.class})
    public class RocketMQConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(RocketMQConsumerApplication.class, args);
        }
    }
    

    4.3. 消息消费者服务

    主要使用 @StreamListener("input") 注解来订阅从名为 input 的 Binding 中接收的消息

    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ConsumerReceive {
    
        @StreamListener("input")
        public void receiveInput(String message) {
            System.out.println("Receive input: " + message);
        }
    }
    

    4.4. 配置application.yml

    spring:
      application:
        name: rocketmq-consumer
      cloud:
        stream:
          rocketmq:
            binder:
              namesrv-addr: 192.168.25.132:9876
            bindings:
              input: {consumer.orderly: true}
          bindings:
            input: {destination: my-test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
    
    server:
      port: 9094
    
    management:
      endpoints:
        web:
          exposure:
            include: '*'
    

    运行成功后即可在控制台接收到消息:Receive input: Hello RocketMQ

    相关文章

      网友评论

          本文标题:消息队列之RocketMQ

          本文链接:https://www.haomeiwen.com/subject/smostktx.html