美文网首页spingboot收藏
Java进阶-RocketMQ-基础

Java进阶-RocketMQ-基础

作者: GIT提交不上 | 来源:发表于2022-02-16 23:20 被阅读0次

    一、架构设计

    官方文档-技术架构
    RocketMQ源码解析(一)-架构原理

    技术架构.jpeg
    • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
    • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
    • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理、路由信息管理。
    • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证。
    部署架构.jpeg

      集群工作流程:

    • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
    • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
    • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
    • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
    • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

    二、消息发送

      Producer发送消息支持3种方式,同步、异步和Oneway。

    • 同步发送:客户端提交消息到broker后会等待返回结果,默认方式。
    • 异步发送:调用发送接口时会注册一个callback类,发送线程继续其它业务逻辑,Producer在收到broker结果后回调。
    • Oneway:Producer提交消息后,无论broker是否正常接收消息都不关心。适合于追求高吞吐、能容忍消息丢失的场景,比如日志收集。

    2.1 Name 服务器的均等性

      在 Broker 启动的时候,其会将自己在本地存储的话题配置文件 (默认位于 $HOME/store/config/topics.json 目录) 中的所有话题加载到内存中去,然后会将这些所有的话题全部同步到所有的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步:

    image.png

    2.2 寻找路由信息

      当客户端发送消息的时候,其首先会尝试寻找话题路由信息。即这条消息应该被发送到哪个地方去。客户端在内存中维护了一份和话题相关的路由信息表 topicPublishInfoTable,当发送消息的时候,会首先尝试从此表中获取信息。如果此表不存在这条话题的话,那么便会从 Name 服务器获取路由消息。

    image.png

      服务器返回的话题路由信息包括以下内容:

    image.png

      每个 Broker 上面可以绑定多个可写消息队列和多个可读消息队列,客户端根据返回的所有 Broker 地址列表和每个 Broker 的可写消息队列列表会在内存中构建一份所有的消息队列列表。之后客户端每次发送消息,都会在消息队列列表上轮循选择队列 (我们假设返回了两个 Broker,每个 Broker 均有 4 个可写消息队列):

    image.png

    2.3 话题检查

    image.png

    2.4 整体流程

    image.png

    2.5 源码解析

    Message msg = new Message( "Test", "Hello World".getBytes() );
    DefaultMQProducer producer = new DefaultMQProducer();
    producer.start();
    --------
    producer.send(msg);
    
    RocketMQ-Producer.png

    RocketMQ 消息发送流程
    RocketMQ源码解析(三)-Producer

    三、消息存储

      用来存储消息的文件被称之为 MappedFile。文件默认创建的大小为 1GB,即 1024 * 1024 * 1024 = 1073741824 字节,每个文件的命名是按照总的字节偏移量来命名的

    image.png

    3.1 文件创建

      当 Broker 启动的时候,其会将位于存储目录下的所有消息文件加载到一个列表中,当有新的消息到来的时候,其会默认选择列表中的最后一个文件来进行消息的保存。

      RocketMQ 提供了一个专门用来实例化 MappedFile 文件的服务类 AllocateMappedFileService。在内存中,也同时维护了一张请求表 requestTable 和一个优先级请求队列 requestQueue 。当需要创建文件的时候,Broker 会创建一个 AllocateRequest 对象,其包含了文件的路径、大小等信息。然后先将其放入 requestTable 表中,再将其放入优先级请求队列 requestQueue 中。

      服务类会一直等待优先级队列是否有新的请求到来,如果有,便会从队列中取出请求,然后创建对应的 MappedFile,并将请求表 requestTable 中 AllocateRequest 对象的字段 mappedFile 设置上值。最后将 AllocateRequest 对象上的 CountDownLatch 的计数器减 1 ,以标明此分配申请的 MappedFile 已经创建完毕了。

    image.png

      等待 MappedFile 创建完毕之后,其便会从请求表 requestTable 中取出并删除表中记录,然后再将其放到MappedFile列表中去。

    image.png

    3.2 文件初始化

      在 MappedFile 的构造函数中,其使用了 FileChannel 类提供的 map 函数来将磁盘上的这个文件映射到进程地址空间中。然后当通过 MappedByteBuffer 来读入或者写入文件的时候,磁盘上也会有相应的改动。

    3.3 消息写入

      每条消息的存储是按照一个 4 字节的长度来做界限的,这个长度本身就是整个消息体的长度,当读完这整条消息体的长度之后,下一次再取出来的一个 4 字节的数字,便又是下一条消息的长度。

    image.png image.png

      前两位是 4 字节的长度和 4 字节的 MAGICCODE,MAGICCODE 的可选值有:

    • CommitLog.MESSAGE_MAGIC_CODE
    • CommitLog.BLANK_MAGIC_CODE

      当这个文件有能力容纳这条消息体的情况下,其便会存储 MESSAGE_MAGIC_CODE 值;当这个文件没有能力容纳这条消息体的情况下,其便会存储 BLANK_MAGIC_CODE 值。所以这个 MAGICCODE 是用来界定这是空消息还是一条正常的消息。

    3.4 消息刷盘

    • 同步刷盘
    • 异步刷盘

    四、消息接受

      消费者客户端与 Broker 服务器进行沟通的整体流程如下:

    image.png

      RocketMQ 管理消息的单位不是话题,而是队列。
      客户端有两种消费模式,一种是广播模式,另外一种是集群模式。

      集群模式:

    • 平均分配策略
    • 平均分配轮循策略
    • 一致性哈希策略

    4.1 Broker消费队列文件

      消息往 Broker 存储就是在向 CommitLog 消息文件中写入数据的一个过程。在 Broker 启动过程中,其会启动一个叫做 ReputMessageService 的服务,这个服务每隔 1 秒会检查一下这个 CommitLog 是否有新的数据写入。ReputMessageService 自身维护了一个偏移量 reputFromOffset,用以对比和 CommitLog 文件中的消息总偏移量的差距。当这两个偏移量不同的时候,就代表有新的消息到来了。

      commitLog 文件夹下面存放的是完整的消息,来一条消息,向文件中追加一条消息。同时,根据这一条消息属于 TopicTest 话题下的哪一个队列,又会往相应的 consumequeue 文件下的相应消费队列文件中追加消息的偏移量、消息大小和标签码。

    image.png image.png

    4.2 消息队列偏移量

      针对同一话题,在集群模式下,由于每个客户端所消费的消息队列不同,所以每个消息队列已经消费到哪里的消费偏移量是记录在 Broker 服务器端的。而在广播模式下,由于每个客户端分配消费这个话题的所有消息队列,所以每个消息队列已经消费到哪里的消费偏移量是记录在客户端本地的。

      集群模式:

    image.png

      广播模式:

    image.png

    4.3 拉取消息

      客户端和 Broker 服务器端完整拉取消息的流程图如下:

    image.png

    4.3 消费消息

      并发消费:

    image.png

      有序消费:

      RocketMQ 的有序消费主要依靠两把锁,一把是维护在 Broker 端,一把维护在消费者客户端。在有序消费的时候,Broker 需要确保任何一个队列在任何时候都只有一个客户端在消费它,都在被一个客户端所锁定。

    image.png

      RocketMQ 的消息树是用 TreeMap 实现的,其内部基于消息偏移量维护了消息的有序性。每次消费请求都会从消息树中拿取偏移量最小的几条消息 (默认为 1 条)给用户,以此来达到有序消费的目的。

    image.png

    五、消息过滤

    • 标签匹配
    • SQL 匹配
    • 自定义匹配
    image.png image.png image.png

    六、消息索引

    • 根据键查询消息
    • 根据ID(偏移量)查询消息
    • 根据唯一键查询消息
    • 根据消息队列偏移量查询消息

      ID (偏移量) 查询:

    image.png

      消息队列偏移量查询:

    image.png

      消息索引服务:

      每当一条消息发送过来之后,其会封装为一个 DispatchRequest 来下发给各个转发服务,而 CommitLogDispatcherBuildIndex 构建索引服务便是其中之一。

    • 前 40 个字节存放固定的索引头信息,包含了存放在这个索引文件中的消息的最小/大存储时间、最小/大偏移量等状况
    • 中间一段存储了 500 万个哈希槽位,每个槽内部存储的是索引文件的地址 (索引槽)
    • 最后一段存储了 2000 万个索引内容信息,是实际的索引信息存储的地方。每一个槽位存储了这条消息的键哈希值、存储偏移量、存储时间戳与下一个索引槽地址
    image.png

    相关文章

      网友评论

        本文标题:Java进阶-RocketMQ-基础

        本文链接:https://www.haomeiwen.com/subject/ewxrlrtx.html