RocketMQ

作者: 奋斗的韭菜汪 | 来源:发表于2022-03-01 23:55 被阅读0次

    RocketMq常用管理命令链接:https://gper.club/articles/7e7e7f7ff3g58gc3g69
    Open Message协议

    结构简单、解析快、有事务设计、有持久化设计


    rocketmq架构图.png

    NameServer的作用类似于zk,brocker启动之后会注册到nameserver上,并且每30秒发送一次心跳给nameserver也称服务续约,nameserver每隔10秒钟检查一次broker是否长时间没有发送心跳,默认超过120秒没有发送心跳broker会被nameserver剔除,producer和consumer通过NameServer获取Broker,每30秒拉取刷新一次Broker List,Broker集群节点分为master和slave节点

    NameServer之间不通信,是如何保持Broker List一致的?

    1、服务注册(Broker新增):Broker主动去每个nameserver续约,保证最终一致
    2、服务剔除(Broker关闭或者宕机):每个nameserver主体去检查Broker是否长时间没有心跳,保证最终一致性
    3、路由发现(客户端获取最新的Broker List,初始连接、后续连接):客户端(consumer和producer主动每隔30秒主动去拉取刷新Broker List)

    消费模式

    • 集群模式:消费者互斥的消费消息,一条消息只有一个消费者
    • 广播模式:所有消费者都消费生产者发送的所有消息

    rocketMq既支持pull也支持push的方式,但push的方式也是通过pull的方式实现的

    Message Queue(不同于其他mq的地方)

    作用:实现消息的分片存储(存储在不同broker上)(类似于kafaka的partition),同时确定producer和consumer应该同那么broker进行通信。

    • 写队列的数量:决定Message Queue的数量(会在磁盘的consumequeue的对应topic下创建同等数量的目录)
    • 读队列的数量:决定消费者有几个队列来消费Message Queue
      通常情况下最好保证写队列的数量和读队列的数量相同
      注:如果读队列数量小于写队列数量,会出现消息消费不到的情况
      消费者MessageQueue对象三个重要属性
    • topic指定是哪个topic的队列
    • brokerName 指定broker
    • queueId 一个topic下的队列编号

    Message对象

    • topic用于标记消息,producer和对应的consumer需要一致
    • tags:用于过滤消息,consumer可以定义表达式来过滤同一topic下那些消息可以消费
    • keys:索引,需要唯一,用于控制台检索消息

    消费者消息消费失败返回:return ConsumeConcurrentlyStatus.RECONSUME_LETER,broker不更新消息偏移量offset
    消费者消息消费成功返回:return ConsumeConcurrentlyStatus.RECONSUME_SUCCESS,broker更新消息偏移量offset

    rokcetmq支持同步消息、异步消息、无序消息、有序消息、单向消息sendOneWay方式、单向顺序消息sendOneWayOrderly

    同步:需要受到消息消费成功的ack,才发送下一条消息
    异步:前一条消息未返回,依然发送下一条消息
    oneway:单向发送消息,不关心消费情况,不会收的sendresult

    RocketMq原理

    • 生产者:如何选择Message Queue、如何发送顺序消息、事务消息、延时消息
    • Broker:消息如何存储、文件如何清理、主从同步与故障转移
    • 消费者: 如何实现负载、rebalance、重试与死信队列
      1、自定义消息策略,可以去实现MessageQueueSelector接口返回Messagequeue对象即可
      2、如何发送顺序消息:前提条件:1、生产者不能使用多线程(保证消息到底broker是有序的),2、写入broker必须顺序写入同一个broker中,3:消费者只能用一个线程消费顺序消息(多线程情况下消费者消费速率不同,可能导致无序)
      生产者发送消息核心方法:processSendResponse,该方法发送消息到broker后必须有响应Response才能发送下一条消息
      3、事务消息:在保证本地数据库和mq消息一致性的情况下,需要先操作数据库若数据库操作失败,直接回滚,(如果先操作mq,mq消费之后无法回滚)
      生产者发送消息到Broker中,消息状态为未确认状态(消费者不可消费),两阶段提交,先处理本地数据库,再处理mq消息(本地成功,确认Broker消息;失败,丢弃消息;若本地无反馈,rocketmq会主动去获取本地数据是否成功(消息回查))


      image.png

      4、延时消息:msg.setDelayTimeLevel();


      image.png
      消息发送至broker临时存储在内存中,通过delayService(定时任务)定时消费
      5、rocketmq消息存储在磁盘(commit log文件下)是怎么实现低延迟高吞吐量的?
      page cache,cpu操作磁盘数据时先要将磁盘数据拷贝到内核缓冲区,每次拷贝一页(4k),page cache是内核空间的一个缓存用于提高操作效率(page cache有预读取的功能,如当前读取了某页数据,page cache会同时读取相临的页到缓存空间),rocketmq使用零拷贝,通过memory map实现零拷贝(内存映射,通过在用户空间建立内核空间的数据的映射,操作映射来直接操作内核空间的数据,免除了内核空间拷贝数据到用户空间,用户空间数据拷贝到内核空间)
      6、文件如何清理
      清理策略,过期文件需要清理(默认72小时)或者磁盘空间使用率达到85%直接清理(磁盘空间使用率超过90%会拒绝消息写入)
      7、故障转移:当消费者消息消费失败,会通知Broker,broker会生成一个名字前面带retry的topic队列,重试消费,重试多次依然失败会将消息丢入死信队列(名字前带delq的topic队列),死信队列里的消息需要人为去干预处理

    rocketmq架构

    borker内master节点和slave节点是怎么关联到一起的 ?

    1、集群的名字相同,
    2、连接到相同的nameserver
    3、在配置文件中:brokerId=0代表master,borkerId=1代表是slave

    主从同步和刷盘类型

    brokerRole(broker的角色)
    ASYNC_MASTER主从异步复制(有可能导致消息丢失)(只要master节点消息写入成功就给生产者返回ack,不需要等待slave收到消息)
    SYNC_MASTER主从同步双写(推荐)(需要master和slave都收到消息才给生产者返回ack)
    flushDiskType(刷盘类型)(对于单个节点)
    ASYNC_FLUSH 异步刷盘,(默认)先缓存消息,缓存成功后直接给生产者返回(批量从缓存刷盘,提高了效率)
    SYNC_FLUSH 同步刷盘,每一条消息都写入成功,才返回给生产者ack

    故障转移(Dledger)

    基于raft协议实现的自动选举功能,当master节点挂了,slave节点可以重新选举master节点

    相关文章

      网友评论

        本文标题:RocketMQ

        本文链接:https://www.haomeiwen.com/subject/uyycqrtx.html