美文网首页
ActiveMQ之Consumer

ActiveMQ之Consumer

作者: 爱健身的兔子 | 来源:发表于2021-01-05 15:27 被阅读0次

    1 Exclusive Consume(独占消费)

    默认Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。 ActiveMQ从4.x版本开始支持Exclusive Consumer。Broker会从多个Consumers中挑选一个consumer来处理queue中所有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker会自动切换到其他的consumer。可以通过destination options来创建一个Exclusive Consumer,如下:

    private static final String queueName = "myQueue?consumer.exclusive=true";
    

    说明

    这个会独占这个队列频道,所有的消息都将发发到这个连接上。在多线程环境下仍然是就这一个连接可以获取到消息。

    2 Consumer Dispatche Async(消息异步分发)

    在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者。可以设置dispatchAsync属性,默认是true,通常情况下这是最佳的。 你也可以通过如下几种方式修改:

    1. 在ConnectionFactory层设置
    ActiveMQConnectionFactory.setDispatchAsync(false);
    
    1. 在Connection上设置,这个设置将会覆盖ConnectionFactory上的设置
    ActiveMQConnetion.setDispatchAsync(false);
    
    1. 在Consumer上设置
      queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
      consumer = session.createConsumer(queue);
    

    关闭异步分发

    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>
    

    3 Consumer Priority(消息优先级)

    当多个Consumer消费同一个队列的时候,可以为Consumer设置优先级来消费。

    Consumer的Priority的划分为”0~127”个级别,127是最高的级别,0是最低的也是ActiveMQ默认的。这种配置可以让Broker根据consumer的优先级来发送消息到较高的优先级的Consumer上,如果某个较高的Consumer的消息消费慢,则Broker会把消息发送到仅次于它优先级的Consumer上。

    queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
    consumer = session.createConsumer(queue);
    

    4 Manage Durable Subscribers(管理持久订阅)

    持久订阅,保证了消费者离线之后,再次进入系统,不会错过消息,但是这也会造成消息的堆积。可以通过设置消息的过期时间来控制,然后定期检查过期消息并删除:

    <policyEntry topic=">" expireMessagesPeriod="300000"/>
    

    从5.6开始,可以对不活跃的持久化订阅进行清除。如下:

    <broker name="localhost" 
            offlineDurableSubscriberTimeout="86400000" 
                offlineDurableSubscriberTaskSchedule="3600000">
    

    说明

    offlineDurableSubscriberTimeout:离线多长时间就过期删除,缺省是-1,就是不删除。 offlineDurableSubscriberTaskSchedule: 多长时间检查一次,缺省300000,单位毫秒。

    5 Message Groups(消息分组)

    Message Goups就是对消息分组,它是Exclusive Consumer功能的增强。逻辑上Message Groups可以看成是一种并发的 Exclusive Consumer。跟所有的消息都由唯一的consumer处理不同,JMS消息属性的JMSXGroupID用来区分message group。Message Group特性保证所有具有相同 JMSXGroupID 的消息都会被分发到相同的consumer(只要这个consumer保持active)
    另一方面,Message Groups 特性也是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息 JMSXGroupID 属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group.如果没有,那么broker会选择一个consumer,并将它关联到这个message group.此后,这个consumer会接收到这个message group的所有消息,直到:

    • consumer被关闭。

    • Message group被关闭(通过发送一个消息,并设置这个消息的 JMSXGroupSeq 为-1)。

    创建一个Message Groups,只需要在message对象上设置属性即可,如下:

    message.setStringProperty("JMSXGroupID","GroupA");
    

    关闭一个Message Groups,只需要在message对象上设置属性即可,如下:

     message.setStringProperty("JMSXGroupID","GroupA");
     message.setIntProperty("JMSXGroupSeq",-1);
    

    6 Message Selectors(消息选择器)

    JMS Selectors 用在获取消息的时候,可以基于消息属性和 Xpath 语法对消息进行过滤。JMS Selectors有SQL92语义定义。以下是个Selectors的例子:

    //创建消费者的时候指定 Selector
    MessageConsumer consumer = session.createConsumer(destination, "JMSXGroupID='GroupA'");
    

    注意

    1. JMS Selectors表达式中,可以使用IN, NOT IN, LIKE等。
    2. 需要注意的是,JMS Selectors 表达式中的日期和时间需要使用标准的Long型毫秒值。
    3. 表达式中的属性不会自动进行类型转换。
    4. Message Groups虽然可以保证具有相同的message group的消息会被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。

    7 Redelivery Policy(重传策略)

    当Consumer在接收到消息时,由于以下原因没有返回Ack信息,就会导致消息的重新投递。

    1. Client用了transactions,且在Session中调用了rollback()。
    2. Client用了transactions,且在调用commit()之前关闭。
    3. Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()。
    4. Client在超时回复Ack。

    可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的重传策略,可用的Redelivery属性如下:

    property default value description
    collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
    maximumRedeliveries 6 最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
    maximumRedeliveryDelay -1 传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
    initialRedeliveryDelay 1000L 初始重发延迟时间。
    redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效。
    useCollisionAvoidance false 启用防止冲突功能。
    useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。
    backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

    示例

    // 1创建ConnectionFactory
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setMaximumRedeliveries(3);// 重传次数
    policy.setInitialRedeliveryDelay(2 * 1000);
    connectionFactory.setRedeliveryPolicy(policy);
    

    8 Slow Consumer Handling(慢消费处理)

    8.1 Prefetch机制

    ActiveMQ通过Prefetch机制来提供性能,在客户端得内存里缓存一定数量得消息。缓存消息得数量由prefetch limit来控制。当某个 consumer 的 prefetch buffer 已经达到上限或者消息缓冲区满了,那么broker不会再向consumer分发消息,直到consumer像broker发送消息的确认,确认后的消息将会从缓存中去掉。ActiveMQ默认预取数量如下:

    转发模式 队列类型 prefetchSize
    PERSISTENT Queue 1000
    NON_PERSISTENT Queue 1000
    PERSISTENT Topic 100
    NON_PERSISTENT Topic 32766

    注意

    如果预取数量设置为1,将会导致消息一条一条的推送。如果预取数量被设置为0,将会导致关闭Broker的推送功能,需要消费者主动拉取数据。

    prefetchk可以通过如下方式 设置:

    1. 通过URL
    tcp://localhost:61616?jms.prefetchPolicy.all=50
    tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
    或
    queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
    
    1. 通过ActiveMQPrefetchPolicy策略对象修改
    ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();
    prefetchPolicy.setQueuePrefetch(200);
    
    1. 通过Properties属性设置
    Properties properties = connectionFactory.getProperties();
    properties.setProperty("prefetchPolicy.queuePrefetch","1000");
    
    8.2 慢消费处理

    生产者在发送消息,Broker在将消息转发给消费者时,如果发现其内部有大量消息没有消费完成,那么Broker就会认为该消费者是慢消费者。在队列模式下,如果已发送,但没有确认的消息数量大于 prefetchSize,则消费者会被标记为 Slow。在主题模式下,如果cacheLimit已满,但是向主题的订阅者要发送的消息大于 prefetchSize,那么订阅者将被标记为 Slow。

    由于慢消费者会导致消息堆积,消耗内存,从而导致内存数据和磁盘文件不停交换,消耗磁盘IO。同时还会影响生产者生成消息的速率。ActiveMQ使用等待消息限制策略(Pending Message Limit Strategy)来解决这个问题,当超过这个上限后有新消息到来时将根据不同的策略抛弃。

    1. 等待消息限制

    目前等待消息限制策略有以下两种:

    • Constant Pending Message Limit Strategy

    limit 可以设置0, > 0, -1三种方式:0表示:不额外的增加其预存大小,> 0表示:在额外的增加其预存大小,-1表示:不增加预存也不丢弃旧的消息,这个策略使用常量限制,配置如下:

    <constantPendingMessageLimitStrategy limit="50"/>
    
    • Prefetch Rate Pending Message LimitStrategy

    这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。比如:

     <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
    

    注意

    在以上两种方式中,如果设置了0,意味着除了prefetch之外不再缓存消息,如果设置了-1意味着禁止丢弃消息。

    1. 消息丢弃

    目前消息丢弃策略有三种:

    • oldestMessageEvictionStrategy:这个策略丢弃最旧的消息。

    • oldestMessageWithLowestPriorityEvictionStrategy: 这个策略丢弃最旧的,而且具有最低优先级的消息。

    • uniquePropertyMessageEvictionStrategy:从5.6开始,可以根据自定义的属性来进行抛弃。<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>表示要抛弃属性名称为Stock的消息。

    示例:

    <policyEntry topic="PRICES.>"> 
                <!-- lets force old messages to be discarded for slow consumers -->  
                <pendingMessageLimitStrategy> 
                  <constantPendingMessageLimitStrategy limit="10"/> 
                </pendingMessageLimitStrategy>  
                <!-- 10 seconds worth -->  
                <messageEvictionStrategy> 
                  <uniquePropertyMessageEvictionStrategy propertyName="STOCK"/> 
                </messageEvictionStrategy> 
    </policyEntry>
    

    9 Subscription Recovery Policy(订阅恢复策略)

    生产者在某个topic发送了多条消息后,这个时候非持久订阅者才订阅,那么它是不能获取之前生产者发送的信息的。或者,由于网络问题,非持久类型的消费者处于非活跃状态,无法接收到生产者发送的消息。使用消息恢复策略,可以解决上面的问题。ActiveMQ目前支持一个定时或固定大小的恢复缓冲区,在你连接到broker后,在一段时间内的消息会重新发送给订阅者。

    Policy Name Sample Configuration Description
    FixedSizedSubscriptionRecoveryPolicy <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/> 保留固定字节的消息。
    FixedCountSubscriptionRecoveryPolicy <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> 保留固定数量的消息。
    LastImageSubscriptionRecoveryPolicy <lastImageSubscriptionRecoveryPolicy/> 保留最后一条记录。
    NoSubscriptionRecoveryPolicy <noSubscriptionRecoveryPolicy/> 禁用回溯,这是默认配置。
    QueryBasedSubscriptionRecoveryPolicy <queryBasedSubscriptionRecoveryPolicy query="JMSType = 'car' AND color = 'blue'"/> 根据查询机制使用回溯。
    TimedSubscriptionRecoveryPolicy <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> 保留指定时间内的消息。
    RetainedMessageSubscriptionRecoveryPolicy <retainedMessageSubscriptionRecoveryPolicy/> 保留ActiveMQ.Retain属性值为true的最后1条消息。

    示例:

    <policyEntry topic=">">
        <subscriptionRecoveryPolicy>
            <timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
        </subscriptionRecoveryPolicy>
    </policyEntry>
    

    注意

    需要设置retroactive属性为true。如下:

    Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
    MessageConsumer consumer = session.createConsumer(topic);
    

    JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息 - hapjin - 博客园
    ActiveMQ

    相关文章

      网友评论

          本文标题:ActiveMQ之Consumer

          本文链接:https://www.haomeiwen.com/subject/ezyvoktx.html