美文网首页
RocketMQ-Producer

RocketMQ-Producer

作者: 丑人林宗己 | 来源:发表于2020-04-03 14:41 被阅读0次

    rocketmq 发送端的设计
    1、需要考虑什么?

    容错设计

    • 如果投递到某个服务端失败了,在多主的情况下可以选择可用的服务端
    • 服务端不可用后要进入熔断状态,服务不可用还继续提供服务会带来一系列的问题
    • 服务端需要可以自动在熔断状态下恢复,服务可用但是长时间不能自动恢复

    重试

    • 可配置的重试策略(重试策略,异步消息不存在重试的语义)
    • 重试支持选择新队列(在一个不可服务的队列上重试是一种资源的浪费,同时也是无效的重试)

    2、消息投递需要的参数

    • topic

    原则是:根据topic选择相应的逻辑队列,逻辑队列中相应的指定了broker, topic

    3、rocketmq究竟怎么做?

    3.1、TopicPublishInfo
    这个数据结构保存的是该topic下的所有的逻辑队列List<MessageQueue>,以及本次发起请求时选择队列的下标值ThreadLocalIndex(index & messagequeues.size 才是真实的下标值)。
    PS:这里有点绕,为何是保存所有的topic下的逻辑队列?而不是按照broker先分组,每个broker下再分逻辑队列呢?

    3.2、FaultItem
    这个数据结构保存着broker name, 以及对应的隔离时间,具体要看isAvailable()。该数据接口维系在faultItemTable这个Map中(ConncurrentHashMap,保证并发安全)

    3.3、失败之后做了什么?

    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); 
    

    参数1是名称,参数2一次调用耗费的时间,参数2是是否隔离(隔离意味着熔断)

    如果开启了容错保护机制,那么会根据是否隔离来计算broker的隔离时间。currentLatency + 30000 -> 并在notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}这个数组中找到第一个大于该结果的值作为隔离的持续时间。

    比如:currentLatency 0:30000,那么结果无疑就是60000,如果是30000:90000,那么结果就是120000,以此类推。所以隔离的持续时间其实是数组中的值。

    PS: faultItem中记录的是隔离时间是:System.currentTimeMillis() + notAvailableDuration。而判断broker有效是:(System.currentTimeMillis() - startTimestamp) >= 0

    3.4、选择队列的依据是什么?

    随机下标(ThreadLocalIndex + 1), 以messagequeues.size作为循环次数,找到可用的broker(判断可用的依据是如上),如果找不到,则依据某个算法(随机打乱排序取二分之一位置的节点)在整个队列中选择一个队列(队列的id为随机值%broker可写队列长度)。

    容错设计

    1、第一次有地失败,并且该服务端被标记为隔离状态,时间参考notAvailableDuration值。
    2、在这个持续时间内,该服务不会提供服务(运维这个时候需要去排查原因并重新启动)
    3、超过该持续时间后,该节点会重新变为可用(但是不一定真实可用,如果不可用将会重新回到第1步)

    重试

    • 配置项:retryTimesWhenSendFailed(默认2次),超过该次数将不再重试
    • 重试时将会按照上文说的策略进行筛选(ThreadLocalIndex对于当前线程有效地重试递增)

    核心的代码包括:

    DefaultMQProducerImpl#sendDefaultImpl()
    DefaultMQProducerImpl#tryToFindTopicPublishInfo()
    DefaultMQProducerImpl#selectOneMessageQueue()
    DefaultMQProducerImpl#updateFaultItem()
    

    相关文章

      网友评论

          本文标题:RocketMQ-Producer

          本文链接:https://www.haomeiwen.com/subject/cxbhphtx.html