美文网首页
RocketMQ简介

RocketMQ简介

作者: 牛牛_735d | 来源:发表于2019-11-15 21:45 被阅读0次

    组件:

    nameSrv作用:

    1.加载kv配置、创建nettyServer网络处理对象、
    2.开启定时任务进行心跳监测
    task1: nameSrv 每10s扫描一次broker、移除处于不激活状态的broker
    task2: nameSrv 每10min打印一次kv配置
    3.注册jvm钩子函数、监听broker、消息生产者的请求
    
    broker如果挂掉、producer向broker的消息发送是否失败 ? 如何处理 ?
    10s之后才会更新nameSrv信息、producer才可以检测到
    
    

    NameSrv功能实现

    1. 路由注册、故障剔除
    nameSrv 测主要作用是为Producer和Consumer提供topic信息、所以需要存储路由的基础信息、能够管理Broker节点、包括路由注册、故障剔除等
    
    元信息:
    topicQueueTable: topic 消息队列路由信息、消息发送时进行负载均衡
    brokerAddrTable: broker基础信息、包含BrokerName、所属集群地址、主备broker地址
    clusterAddrTable: broker集群信息、存储集群中brokerName
    brokerLiveTable: broker状态信息、nameServer每次收到心跳包时更新
    filterServerTable: broker上的FilterServer列表、用于消息过滤
    
    一个topic有多个Queue、一个broker默认为每个topic创建4个读Q 和 4个写Q、多个broker组成一个集群、多台相同brokerName的broker机器组成M-S架构、brokerId=0代表M、
    
    BrokerLiveInfo中的lastUpdateTimestamp存储上次收到broker心跳包的时间
    
    路由注册通过broker和nameSrv的心跳实现、broker每隔30s向所有nameSrv发送心跳、nameSrv收到心跳包更新brokerLiveTable中的BrokerLiveInfo的心跳时间、10s扫描一次brokerLiveTable、若连续120s未收到心跳包、nameSrv会移除该broker
    
    路由删除:
    1. nameSrv定时扫描 brokerLiveTable、超过120s未收到心跳包、会移除broker信息
    2. broker正常关闭时、主动调用unregisterBroker
    1) 从brokerLiveTable、filterServerTable移除该broker
    2) 维护brokerAddrList信息
    3) 维护clusterAddrTable信息
    4) 遍历topic、从路由信息移除该broker
    
    路由发现:
    1. 非实时、topic路由信息变化、nameSrv不会主动推送、而是client主动定时拉取、
    从topicQueueTable、brokerAddrTable、filterServerTable中填充TopicQueueData的QueueData、BrokerData、filterServer地址表
    2. 若从topic找到对应的路由信息为顺序消息、则从nameServer kvConfig中获取关于顺序消息相关的配置填充路由信息
    
    

    Borker:

    提供消息存储、
    对于mq的消息存储、一般考虑 消息堆积能力 和 消息存储能力
    rocketmq 引入内存映射机制、所有主题的消息顺序存储在同一文件
    引入 消息文件过期机制 和 文件存储空间报警机制 - 避免消息无限堆积
    
    

    Producer启动流程

    1. 检查producerGroup是否符合要求、改变生产者的instanceName为进程id
    2. 创建MQClientInstance实例、整个jvm中只存在一个MQClientManager实例、维护一个MQClientInstance的hashMap表、即: 同一个clientID只会创建一个MQClientInstance
       instance是封装了rocketMQ的网络处理API、是Producer、Consumer与NameSrv、Broker打交道的网络通道
    3. 将producer注册到MQClientInstance管理中、方便后续进行网络请求、心跳检测等
    
    

    消息发送过程

    支持3种方式: 同步sync、异步async、单向oneway
    sync: 同步等待broker将结果返回
    async: 指定回调函数、不阻塞producer线程、消息结果通过回调通知
    oneway: 不等待消息存储结果、亦不提供回调函数、不关心是否存储成功
    
    1. 消息长度验证、消息长度最大4M
    2. 查找路由信息
       首次发msg时、本地未缓存topic路由信息、查询nameSrv获取、未找到时会尝试创建、若路由信息变化则更新broker地址列表
    3. queue选择
    若2m-2s架构、queueNum=4、rocketmq会如何选择 ?
    消息发送时会多次执行queue选择算法、lastBrokerName是上次send failed的broker、第一次执行Q选择时、last为null、直接在所有Q中选择、
    再次执行时、会先判断、选择brokerName!=lastBrokerName的来进行规避
    
    如果再次选择、得到的依然是不可用的Queue呢 ?会再次重试、造成资源浪费
    broker故障不会立即摘除(nameSrv 10s才检测一次、Producer也是30s才更新路由信息、最快感知也需要30s)、
    
    broker延迟故障机制
    1) 获取一个小小队列
    2) 验证是否可用
    3) 若可用、移除latencyFaultTolerance关于该topic的条目、表明broker故障恢复
    
    消息发送
    
    1. 消息队列如何负载 ?
    2. 如何实现高可用?
    3. 批量消息如何实现一致性?
    

    Consumer消息消费

    集群模式: 同一topic下一条消息只能允许被其中一个消费者消费、消费进度保存在broker端
    广播模式: 同一topic下一条消息可以被同一消费组的所有消费者消费、消息消费进度保存在消费端
    
    
    
    消息队列负载与重新分布
    消息消费模式
    消息拉取方式
    消息进度反馈
    消息过滤
    顺序消息
    

    消费者启动流程

    1. 构建订阅主题信息SubscriptionData并加入到RebalanceImpl的订阅消息中、订阅关系来自:
       1) 调用subscribe方法
       2) 订阅重试主题消息 以消费组为单位 命名: %RETRY%+消费组名
    2. 初始化MQClientInstance、RebalanceImpl等
    3. 初始化消费进度、broadcast保存在broker、cluster保存在consumer
    4. 根据是否顺序消费、创建消费端消费线程服务、ConsumeMessageService主要负责消息消费、内部维护一个线程池
    5. 向MQClientInstance注册消费者、并启动MQInstance、在一个JVM中的所有消费者、生产者持有同一个MQClientInstance、只启动一次
    

    消息可用性保障

    1. Broker 正常关机
    2. Broker 异常Crash
    3. OS Crash
    4. 机器断电、可立即恢复供电
    5. 机器无法开机、可能是CPU、主板、内存等关键设备损坏
    6. 磁盘设备损坏
    
    1-4 在同步刷盘机制下、可以确保不丢失消息、异步刷盘模式下丢失少量消息
    5、6属于单点故障、一旦发生、节点上的消息全部丢失、若开启了异步复制可保证只丢失少量消息、双写机制下 不会丢失消息
    

    消息延迟

    在正常不发生消息堆积的情况下、以长轮询方式实现准实时消息推送
    

    消息堆积

    rocketmq消息存储使用磁盘文件(内存映射)并且在物理布局上为多个大小相等的文件组成逻辑文件组、可无限循环使用、提供消息过期机制、默认保留3天
    
    

    消息过滤:

    1. broker端过滤、
    2. consumer过滤、
    

    msg status 说明:

     SEND_OK, // 发送成功
     FLUSH_DISK_TIMEOUT, // 在规定时间内没有完成刷盘、在 flushDiskType 为 SYNC_FLUSH 时才出现 
     FLUSH_SLAVE_TIMEOUT, // 在主备方式下、broker被设置为 sync_master方式时、未在指定时间内完成主从同步
     SLAVE_NOT_AVAILABLE, // 在主备方式下、broker被设置为 sync_master方式时、未找到slave
    

    疑问

    1. broker收到消息会先写commitlog、为什么producer发消息的时候会选择一个Queue去发 ?而不是broker ?再由broker分发到Queue ?
    
    2. 写commitlog的时候、同步刷盘指针和异步刷盘指针写的是同一个commitlog文件、如何保证消息的有序存储?
    
    3. 文件清除何时进行 ?
    
    4. 消息队列如何进行负载均衡?
    
    5. 消息发送如何保证高可用
    
    6. 批量消息如何保持一致性
    
    7. broker失效后120s才能将该broker从路由表中移除、若生产者获取到的路由信息包含已宕机的broker如何处理?
    
    8. 
    

    相关文章

      网友评论

          本文标题:RocketMQ简介

          本文链接:https://www.haomeiwen.com/subject/zlceictx.html