美文网首页
(7)消息的确认和重发机制

(7)消息的确认和重发机制

作者: Mrsunup | 来源:发表于2018-12-16 14:20 被阅读0次

    1.消费端的 PrefetchSize

    还记得我们在分析消费端的源码的时候,所讲到的 prefetchsize 吗?
    可以参考上一篇的博客:https://www.jianshu.com/p/f5016f845d16

    可以参考官网:http://activemq.apache.org/what-is-the-prefetch-limit-for.html

    activemq 的 consumer 端也有窗口机制,通过 prefetchSize 就可以设置窗口大小。不同的类型的队列,prefetchSize 的默认值也是不一样的

    • 持久化队列和非持久化队列的默认值为 1000
    • 持久化 topic 默认值为 100
    • 非持久化topic的默认值为 Short.MAX_VALUE-1

    通过上面的例子,我们基本上应该知道 prefetchSize 的作用了,消费端会根据prefetchSize 的大小批量获取数据,比如默认值是 1000,那么消费端会预先加载 1000 条数据到本地的内存中。

    • prefetchSize 的设置方法
      在 createQueue 中添加 consumer.prefetchSize,就可以看到效果
    Destination destination=session.createQueue("myQueue?consumer.prefetchSize=10");
    

    既然有批量加载,那么一定有批量确认,这样才算是彻底的优化

    • optimizeAcknowledge

    ActiveMQ 提供了 optimizeAcknowledge 来优化确认,它表示是否开启“优化ACK”,只有在为 true 的情况下,prefetchSize 以及optimizeAcknowledgeTimeout 参数才会有意义,优化确认一方面可以减轻 client 负担(不需要频繁的确认消息)、减少通信开销,另一方面由于延迟了确认(默认 ack 了 0.65*prefetchSize 个消息才确认),broker 再次发送消息时又可以批量发送
    如果只是开启了 prefetchSize,每条消息都去确认的话,broker 在收到确认后也只是发送一条消息,并不是批量发布,当然也可以通过设置 DUPS_OK_ACK来手动延迟确认, 我们需要在 brokerUrl 指定 optimizeACK 选项

    ConnectionFactory connectionFactory= new ActiveMQConnectionFactory
    ("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&
    jms.optimizeAcknowledgeTimeOut=10000");
    

    注意,如果 optimizeAcknowledge 为 true,那么 prefetchSize 必须大于 0. 当 prefetchSize=0 的时候,表示 consumer 通过 PULL 方式从 broker 获取消息

    • 总结

    到目前为止,我们知道了 optimizeAcknowledge 和 prefetchSize 的作用,两者协同工作,通过批量获取消息、并延迟批量确认,来达到一个高效的消息消费模型。它比仅减少了客户端在获取消息时的阻塞次数,还能减少每次获取消息时的网络通信开销

    需要注意的是,如果消费端的消费速度比较高,通过这两者组合是能大大提升 consumer 的性能。如果 consumer 的消费性能本身就比较慢,设置比较大的 prefetchSize 反而不能有效的达到提升消费性能的目的。因为过大的prefetchSize 不利于 consumer 端消息的负载均衡。因为通常情况下,我们都会部署多个 consumer 节点来提升消费端的消费性能。这个优化方案还会存在另外一个潜在风险,当消息被消费之后还没有来得及确认时,client 端发生故障,那么这些消息就有可能会被重新发送给其它consumer,那么这种风险就需要 client 端能够容忍“重复”消息

    2.消息的确认过程

    • ACK_MODE

    通过前面的源码分析,基本上已经知道了消息的消费过程,以及消息的批量获取和批量确认,那么接下来再了解下消息的确认过程
    消息确认有四种 ACK_MODE,分别是:
    AUTO_ACKNOWLEDGE = 1 自动确认
    CLIENT_ACKNOWLEDGE = 2 客户端手动确认
    DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
    SESSION_TRANSACTED = 0 事务提交并确认

    虽然 Client 端指定了 ACK 模式,但是在 Client 与 broker 在交换 ACK 指令的时候,还需要告知 ACK_TYPE,ACK_TYPE 表示此确认指令的类型,不同的ACK_TYPE 将传递着消息的状态,broker 可以根据不同的 ACK_TYPE 对消息进行不同的操作

    • ACK_TYPE

    DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
    STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker 端可以删除消息了
    POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者 DLQ(死信队列)
    REDELIVERED_ACK_TYPE = 3 消息需"重发",比如 consumer 处理消息时抛出了异常,broker 稍后会重新发送此消息
    INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何 ACK_MODE 下
    UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一条消息在转发给“订阅者”时,发现此消息不符合 Selector 过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(相当于在 Broker 上确认了消息)。

    Client 端在不同的 ACK 模式时,将意味着在不同的时机发送 ACK 指令,每个 ACK Command 中会包含 ACK_TYPE,那么 broker 端就可以根据 ACK_TYPE 来决定此消息的后续操作

    3.消息的重发机制原理

    • 消息重发的情况

    在正常情况下,有几中情况会导致消息重新发送

    1.在事务性会话中,没有调用 session.commit 确认消息或者调用session.rollback 方法回滚消息
    2.在非事务性会话中,ACK 模式为 CLIENT_ACKNOWLEDGE 的情况下,没有调用 acknowledge 或者调用了 recover 方法;

    一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费端会给 broker 发送一个”poison ack”(ActiveMQMessageConsumer#dispatch:1460 行),表示这个消息有毒,告诉 broker 不要再发了。这个时候 broker 会把这个消息放到 DLQ(死信队列)。

    • 死信队列

    ActiveMQ 中默认的死信队列是 ActiveMQ.DLQ,如果没有特别的配置,有毒的消息都会被发送到这个队列。默认情况下,如果持久消息过期以后,也会被送到 DLQ 中

    死信队列配置策略 :
    缺省所有队列的死信消息都被发送到同一个缺省死信队列,不便于管理,可以通过individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略来进行修改

    <destinationPolicy>
    <policyMap>
    <policyEntries>
    <policyEntry topic=">" >
    <pendingMessageLimitStrategy>
        <constantPendingMessageLimitStrategy limit="1000"/>
    </pendingMessageLimitStrategy>
    </policyEntry>
    // “>”表示对所有队列生效,如果需要设置指定队列,则直接写队列名称
    <policyEntry queue=">">
    <deadLetterStrategy>
    //queuePrefix:设置死信队列前缀
    //useQueueForQueueMessage 设置队列保存到死信。
        <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
    </deadLetterStrategy>
    </policyEntry>
    </policyEntries>
    </policyMap>
    </destinationPolicy>
    

    自动丢弃过期消息 :

    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="false" />
    </deadLetterStrategy>
    

    死信队列的再次消费:

    当定位到消息不能消费的原因后,就可以在解决掉这个问题之后,再次消费死
    信队列中的消息。因为死信队列仍然是一个队列

    相关文章

      网友评论

          本文标题:(7)消息的确认和重发机制

          本文链接:https://www.haomeiwen.com/subject/eaztkqtx.html