美文网首页
rocketmq学习笔记

rocketmq学习笔记

作者: 一个坏人_9c31 | 来源:发表于2022-04-28 20:56 被阅读0次

    Producer延迟容错策略:

    • 生产者在发送message时会根据一定的策略选取一个消息通道,然后将消息发送到对应的broker上,默认情况下(RoundRobin),TopicPublishInfo会维护一个计数器(ThreadLocal中),当producer需要发送时,会根据计数器(mod通道数量)选择一个queue,消息发送失败时会以当前计数器的位置开始找到下一个可用的queue,如果当前topic的queue在不同的broker上会优先选择其他broker上的queue。
    • 其次生产者还有一个延迟容错的策略默认时关闭的,需要设置sendLatencyFaultEnable为true手动开启,在每次生产者发送消息时会记录从开始发送到收到broker的响应间的延迟,随后会根据该延迟判断当前broker是否需要隔离及重置下次可用的开始时间,比如延迟600ms则对应的broker在30s内不再能被选择,50ms对应的时间就是0s也就是不影响下次的选择,再次选择通道时,会优先选择可用的没有被隔离的通道,其次是延迟最低的,最后是距离隔离结束时间最短的。

    延迟消息:

    • 目前开源版本的rocketmq只支持按级别延迟消息,共18个级别,每个级别的时间分别是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,延迟消息的大概过程如下:
    1. 生产者在发出消息时设定延迟级别delayTimeLevel 比如级别为3那么就是延迟10s后发出消息
    2. broker在接收到消息写入commitlog前会判断消息的延迟级别是否大于0,大于0就代表是个延迟消息,根据当前的延迟级别获取对应的queueId。
    3. 替换消息的topic为延迟队列的topic(SCHEDULE_TOPIC_XXXX),替换queueId为第二步获得的queueId,同时将原先的topic与queueId记在消息属性中,然后写入commitlog中
    4. broker在启动的时候会为每个延迟队列起一个线程来处理延迟消息,线程会每隔100ms取出对应延迟队列当前处理到的offset的msg描述,描述中包含消息在commitlog中的offset、消息长度等(20k),以此获得commitlog中的msg
    5. 获得到消息后将之前记录在消息中的原始topic、queueId重新设回消息,再重新投递到commitlog中,最后更新对应延迟队列的offset
    • 思考:
      根据rocketmq 的设计 broker在收到生产者的消息时会顺序写入commitlog,在消费者消费时由于不同消费者的进度并不一致,所以从commitlog中取消息时可能会涉及到随机查找,但从整体来看,消息消费的走向总是先到的消息先被消费,所以消费过程整体上也是一个顺序读取的过程,结合零拷贝(mmap+write)以及PageCache的读写分离(需要手动开启),这是rocketmq高性能的原因之一。
      结合broker延时消息的实现方案,实际上在看源码的过程中考虑了下在broker上实现延时消息需要解决的几个问题:
    1. 如何给延时消息排序(因为延迟时间不同,后来的消息也可能被先投递)
    2. 排序过程中如何避免随机读写(如果要进行排序不可避免的要在消息队列中进行插入操作)
      实际上rocketmq采用的多级延迟队列方案,就没有进行排序也就不存在第二个问题,因为是按照延迟级别划分的,分到同一个队列中的消息延迟都是相同的天然有序
      付费版的rocketmq支持任意时间的延迟消息,考虑该如何实现?(多级时间轮)

    顺序消息:
    TODO
    Consumer的负载均衡方案
    TODO

    相关文章

      网友评论

          本文标题:rocketmq学习笔记

          本文链接:https://www.haomeiwen.com/subject/ajidyrtx.html