美文网首页
91 RocketMQ

91 RocketMQ

作者: 滔滔逐浪 | 来源:发表于2021-05-30 18:12 被阅读0次

    RocketMQ 作为一种纯java,分布式,队列模型的开源消息中间件,支持事务消息,顺序消息,批量消息,定时消息,消息回溯等。

    ROcketMQ优点:
    1,RocketMQ 去除zk 的依赖;
    2,RocketMQ 支持同步和异步2种方式刷盘
    3,RocketMQ 单机支持的队列或者toipic数量是5w
    4,Rocket 支持消息重试;
    5,RocketMQ支持严格按照一定顺序发送消息
    6,RcoketMQ支持定时发送消息
    ,7,RocketMQ支持根据消息ID来进行查询;
    8,RocketMQ 支持根据某个时间点进行消息的回溯
    9,RocketMQ支持消息服务端的过滤
    10,RocketMQ 消费并行度

    RocketMQ架构原理

    [图片上传失败...(image-81e25d-1621778488985)]

    RocketMQ专业名词:
    Producer 生产者角色--投递消息给mq
    Producer Group生产组---
    Consumer 消费者 采用拉取/mq推送方式 获取消息offset
    Consumer Group 消费者组---在同一个组中,是不允许多个不同的消费者消费同一个消息

    多个消费者消费同一条消息呢? 两个分组 多个不同的分组中可以允许有不同分组中消费者消费同一条消息的。----
    以组的名义关联该组消费的offset位置---
    Topic
    业务队列存放消息
    异步发送短信
    异步发送邮件
    邮件topic
    短信topic
    业务区分不同的队列消息
    Queue
    会将一个topic主题中的消息存放在多个不同的queue 与kafka 分区模型是一样的
    MEssage:
    生产者投递消息会自动对给消息生成一个全局消息id,后期的可以根据该消息全局id实现业务的防止重复执行----幂等性概念。
    tag
    区分 过滤

    Broker
    MQ服务器端
    Name server
    与zk相同思想,作为rocketmq注册中心 存放生产者消费者topic主题信息;
    Producer:
    消息生产者,位于用户的进程内,Producer 通过NAMEsERVER获取所有的Broker的路由信息,根据负载均衡选择将消息发到哪个Broker,然后调用Broker接口提交消息;
    Producer Group
    生产者组,简单的说就是多个发送同一类消息的生产者称之为一个生产者组
    ConSumer
    消息消费者,位于用户进程内。Consumer通过NameServer 获取所有broker的路由信息后,向Broker发送pull请求来获取消息数据。Consumer可以以2种模式启动,广播(Broadcast)和集群(Cluser),

    RocketMQ环境搭建

    注意:一定要配置rocketmq 环境变量 不然启动 mqnamesrv.cmd

    报错: Please set the ROCKETMQ_HOME variable in your environment!

    启动mqnamesrv

    1. 下载rocketmq安装包

    2. 解压rocketmq安装包

    3. 配置rocketmq环境变量

    系统环境变量配置

    变量名:ROCKETMQ_HOME

    变量值:MQ解压路径\MQ文件夹名

    eg、ROCKETMQ_HOME=D:\rocketmq-all-4.3.0-bin-release

    4.启动 mqnamesrv.cmd

    [图片上传失败...(image-d05b9c-1622358860602)]

    启动mqBroker

    Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。

    启动Rocketmq-console

    1.下载rocketmq-externals-master

    ,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。

    1. 新增:rocketmq.config.namesrvAddr=127.0.0.1:9876

    2. 执行

    用CMD进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。

    编译成功之后,Cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.0.jar’,启动‘rocketmq-console-ng-1.0.0.jar’。

    4.浏览器中输入‘127.0.0.1:配置端口’,成功后即可查看。

    eg:http://127.0.0.1:8088

    [图片上传失败...(image-3f7598-1622358860602)]

    Springboot整合方式

    注意springboot整合rocketmq server端 版本一定要与rocketmq 不然可能启动报错

    Maven依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.1</version>
        </dependency>
    </dependencies>
    
    

    生产者

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 普通消息投递  单向发送
     */
    @GetMapping("/sendMsg")
    public String sendMsg() {
        MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
        rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
        return "投递消息 => " + msg.toString() + " => 成功";
    }
    
    
    
    消费者
    /**
     * @ClassName RocketMQConsumer
     * @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com
     * @Version V1.0
     **/
    @Service
    @Slf4j
    @RocketMQMessageListener(consumerGroup = "mayikt-group5", topic = "topic_meite")
    public class RocketMQConsumer implements RocketMQListener<MsgEntity> {
        @Override
        public void onMessage(MsgEntity msgEntity) {
    
            log.info("消费者监听到消息:<msg:{}>", msgEntity);
        }
    }
    
    

    配置文件

    spring:
      application:
        name: mayikt-rocketmq
    server:
      port: 8000
    rocketmq:
      # rocketmq地址
      name-server: 127.0.0.1:9876
      producer:
        # 必须填写 group
        group: mayikt-group
    
    
    Rocketmq配置文件详解
    所属集群名字
    brokerClusterName=rocketmq-cluster
    此处需手动更改
    broker名字,注意此处不同的配置文件填写的不一样
    附加:按配置文件文件名来匹配
    brokerName=broker-a
    0 表示Master, > 0 表示slave
    brokerId=0
    此处许手动更改
    (此处nameserver跟host配置相匹配,9876为默认rk服务默认端口)nameServer 地址,分号分割
    附加:broker启动时会跟nameserver建一个长连接,broker通过长连接才会向nameserver发新建的topic主题,然后java的客户端才能跟nameserver端发起长连接,向nameserver索取topic,找到topic主题之后,判断其所属的broker,建立长连接进行通讯,这是一个至关重要的路由的概念,重点,也是区别于其它版本的一个重要特性
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
    defaultTopicQueueNums=4
    是否允许Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    是否允许Broker自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    Broker 对外服务的监听端口
    listenPort=10911
    删除文件时间点,默认是凌晨4点
    deleteWhen=04
    文件保留时间,默认48小时
    fileReservedTime=120
    commitLog每个文件的大小默认1G
    附加:消息实际存储位置,和ConsumeQueue是mq的核心存储概念,之前搭建2m环境的时候创建在store下面,用于数据存储,consumequeue是一个逻辑的概念,消息过来之后,consumequeue并不是把消息所有保存起来,而是记录一个数据的位置,记录好之后再把消息存到commitlog文件里
    mapedFileSizeCommitLog=1073741824
    ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    destroyMapedFileIntervalForcibly=120000
    redeleteHangedFileInterval=120000
    检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    存储路径
    storePathRootDir=/usr/local/rocketmq/store
    commitLog存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    消费队列存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    限制的消息大小
    maxMessageSize=65536
    flushCommitLogLeastPages=4
    flushConsumeQueueLeastPages=2
    flushCommitLogThoroughInterval=10000
    flushConsumeQueueThoroughInterval=60000
    Broker 的角色
    •   ASYNC_MASTER 异步复制Master
    •   SYNC_MASTER 同步双写Master
    •   SLAVE
    brokerRote=ASYNC_MASTER
    刷盘方式
    •   ASYNC_FLUSH 异步刷盘
    •   SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    checkTransactionMessageEnable=false
    发消息线程池数量
    sendMessageTreadPoolNums=128
    拉消息线程池数量
    pullMessageTreadPoolNums=128
    
    

    Rocketmq 队列分区模型:
    Rocketmq 底层存储结构中 将一个主题分成n多个不同的队列实现存放消息
    创建了一个主题: 需要指定队列个数 默认是4 和16
    写队列数量: 对我们生产者写投递shul 16
    读队列数量: 对我们消费者获取消息队列数量 16

    在rocketmq中,如果一个topic只有一个队列的情况下支持的并行能力比较弱,所以会将一个topic分成分成n多个不同的队列queue来实现存放, 类似于Kafka的分区模型概念。

    RocketMQ 解决方案:
    如果消费者获取消息,处理器都是同一个线程的情况下有可能会影响我们的效率
    消费者获取消息10毫秒
    Rocketmq 中消费者消费的消息,采用多线程的形式,需要注意消息顺序一致性的问题:
    与kafka思想一样,消费者不管消费成功还是失败,最终消息不会立即删除,后期通过日志删除策略,定时删除消息;

    生产者发送消息有三种形式:同步,异步,和单向
    三种形式:
    1,单向: 生产者投递消息到mq中,不需要返回结果
    优点: 延迟概率比较低
    缺点: 丢失消息数据

    2,异步:生产者投递消息到mq中,使用回调形式返回;

    1. 同步
      生产者投递消息到mq中,采用同步的形式获取到返回消息是否有
      投递成功的结果,导致接口延迟概率比较大。
      投递消息过程比较耗时时间10毫秒

    发送请求 基于请求与响应
    1.同步发送:发送请求模式属于同步的,发送该条消息不需等待该条消息发送成功之后,才可以继续发送下一条。
    2.异步发送:采用异步的发送模式,不需要同步阻塞等待,通过回调的形式监听生产者消息投递结果
    3单向发送:只负责发送消息给mq,不管是否有发送成功。

    同步发送

    /**
     * 同步发送
     *
     * @throws Exception
     */
    @GetMapping("/sync")
    public void sync() {
        MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
        SendResult sendResult = rocketMQTemplate.syncSend(RocketMQConfig.TOPIC_NAME, msg);
        log.info("同步发送字符串{}, 发送结果{}", msg.toString(), sendResult);
    }
    
    异步发送
    /**
     * 异步发送
     *
     * @throws Exception
     */
    @GetMapping("async")
    public void async() {
        MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
        log.info(">msg:<<" + msg);
        rocketMQTemplate.asyncSend(RocketMQConfig.TOPIC_NAME, msg.toString(), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                log.info("异步发送成功{}", var1);
            }
    
            @Override
            public void onException(Throwable var1) {
                log.info("异步发送失败{}", var1);
            }
        });
    }
    

    单向发送

    /**
     * 普通消息投递  单向发送
     */
    @GetMapping("/sendMsg")
    public String sendMsg() {
        MsgEntity msg = new MsgEntity("mayikt" + UUID.randomUUID().toString(), 1234);
        rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
        return "投递消息 => " + msg.toString() + " => 成功";
    }
    

    顺序消息
    Rocketmq中,消费者消处理业务逻辑的时候是采用多线程。
    如何解决消息顺序一=一致性的问题
    1,生产者投递消息根据key 投递到同一个队列中存放
    2,消费者应该订阅到同一个队列中实现消费
    3,最终在同一个线程去消费消息(不能够实现多想想消费)
    生产者

    String uuid = UUID.randomUUID().toString();
    SendResult result1 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, "insert", uuid);
    log.info("insert:" + result1.toString());
    SendResult result2 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, "update", uuid);
    log.info("update:" + result2.toString());
    SendResult result3 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, "delete", uuid);
    log.info("delete:" + result3.toString());

    
    消费者
    
    

    @Service
    @Slf4j
    @RocketMQMessageListener(consumerGroup = "mayikt-group20", topic = "topic_seq", consumeMode = ConsumeMode.ORDERLY
    )
    public class RocketMQConsumer01 implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
    try {
    Random r = new Random(100);
    int i = r.nextInt(500);
    Thread.sleep(i);
    } catch (Exception e) {

        }
        log.info("消费者监听到消息:<msg:{}>", msg);
    }
    

    }

    相关文章

      网友评论

          本文标题:91 RocketMQ

          本文链接:https://www.haomeiwen.com/subject/ufxtsltx.html