美文网首页
ActiveMQ的高级特性

ActiveMQ的高级特性

作者: 爱健身的兔子 | 来源:发表于2020-12-31 17:08 被阅读0次

    1 异步投递与确认签收回调

    同步发送:会阻塞producer的send方法。

    异步发送:不会阻塞producer的send方法。

    消息的发送默认情况下是采用异步发送,除下面两种情况:

    1. 设置消息发送成同步发送。

    2. 发送消息是持久化消息。

    消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。但是当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。

    设置异步发送的方式:

    1. 在brokerURI添加jms.alwaysSyncSend=false&jms.useAsyncSend=true

    2. 通过ConnectionFactory接口的setUseAsyncSend方法设置。

    3. 通过Connection接口的setUseAsyncSend方法设置。

    注意:

    • 如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步。

    • 当alwaysSyncSend=false,useAsyncSend=false时;NON_PERSISTENT(非持久化)消息和事务中的消息将使用异步发送

    • 当alwaysSyncSend=false时,如果指定了useAsyncSend=true;PERSISTENT(持久化)类型的消息使用异步发送。如果useAsyncSend=false,PERSISTENT类型的消息使用同步发送。

    总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。

    jms.sendTimeout:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送。

    producerWindowSize:窗口尺寸,用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,如果达到了producerWindowSize上限,即使是异步调用也会被阻塞,防止不停向broker发送消息。

    通过ActiveMQConnectionFactory或者ActiveMQConnection的setProducerWindowSize来设置。

    2 延迟投递与定时投递

    ​ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息

    先在 activemq.xml 中配置 schedulerSupport 属性为 true

     <broker xmlns="http://activemq.apache.org/schema/core"  brokerName="localhost" dataDirectory="${ activemq. data}" schedulerSupport="true" /> 
    

    通过在Message设置如下属性可以进行延时投递和定时投递

    实例如下:

     message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
     message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, periodTime);
     message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeatTimes);
     message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cron);
    

    3 消费重试机制

    消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。消息重发的情况有以下几种:

    1. 事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发。

    2. 使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发。

    3. 所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发。

    4. 消息被消费者拉取之后,超时没有响应ack,消息会被broke重发。

    在默认情况下,当消息签收失败时ActiveMQ消息服务器会继续每隔1秒钟向消费者端发送一次这个签收失败的消息,默认会尝试6次(加上正常的1次共7次),如果这7次消费者端全部签收失败,则会给ActiveMQ服务器发送一个“poison ack”,表示这个消息不正常(“有毒”),这时消息服务器不会继续传送这个消息给这个消费者,而是将这个消息放入死信队列(DLQ,即Dead Letter Queue)。

    3.1 消息重试属性

    消费重试机制的默认相关配置如下:
    可用的Redelivery属性如下:

    property     default value     description
    collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用了useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
    maximumRedeliveries 6 最大重传次数,达到最大重传次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
    maximumRedeliveryDelay -1 传送延迟,旨在useExpoentialBackOff为true时有效(5.5之后),假设首次重间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
    initialRedeliveryDelay 1000L 初始重发延迟时间
    redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效。
    useCollisionAvoidance false 启用防止冲突功能。
    useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。
    backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。
    3.2 消息重试配置

    消息重试有三种配置:url,xml,connection连接属性。

    URL

    tcp://192.168.0.15:61616?jms.redeliveryPolicy.initialRedeliveryDelay=0&jms.redeliveryPolicy.redeliveryDelay=1000
    

    XML

    <broker schedulerSupport="true">
    
            <plugins>
                <redeliveryPlugin fallbackToDeadLetter="true" 
                                  sendToDlqIfMaxRetriesExceeded="true">
                    <redeliveryPolicyMap>
                        <redeliveryPolicyMap>
                            <redeliveryPolicyEntries>
                                <!-- a destination specific policy -->
                                <redeliveryPolicy queue="SpecialQueue" 
                                                  maximumRedeliveries="4" 
                                                  redeliveryDelay="10000"/>
                            </redeliveryPolicyEntries>
    
                            <defaultEntry>
                                <!-- the fallback policy for all other destinations -->
                                <redeliveryPolicy maximumRedeliveries="4" 
                                                  initialRedeliveryDelay="5000"
                                                  redeliveryDelay="10000"/>
                            </defaultEntry>
                        </redeliveryPolicyMap>
                    </redeliveryPolicyMap>
                </redeliveryPlugin>
            </plugins>
    
    </broker> 
    

    必须开启 schedulerSupport 属性。

    connection

            //重发策略
            RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
            queuePolicy.setInitialRedeliveryDelay(0);
            queuePolicy.setRedeliveryDelay(1000);
            queuePolicy.setUseExponentialBackOff(false);
            queuePolicy.setMaximumRedeliveries(2);
    
            //创建队列(有则不创建)
            Destination destination = session.createQueue("garine-queue");
    
            RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
            map.put((ActiveMQDestination) destination, queuePolicy);
    

    4 死信队列

    ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念,在一条消息被重复发送给消息消费者端多次(默认为6次)后,若一直签收不成功,则ActiveMQ会将这条消息移入到“死信队列”。开发时可以开启一个后台线程监听这个队列(默认死信队列的名称为ActiveMQ.DLQ)中的消息,进行人工干预,也就是说死信队列的作用主要是处理签收失败的消息。

    死信队列的配置主要有两种:SharedDeadLetterStrategy和IndividualDeadLetterStrategy

    1. SharedDeadLetterStrategy:共享的死信队列配置策略,将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ Broker端的默认策略。共享队列的名称默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定:在activemq.xml中的<policyentries>节点中配置</policyentries>
    <deadLetterStrategy>
        <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
    </deadLetterStrategy>
    
    1. IndividualDeadLetterStrategy:单独的死信队列配置策略,把DeadLetter放入各自的死信通道中。对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue”;对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic”。比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们可以使用queuePrefix和topicPrefix来指定上述前缀:
    <!-- 仅对与order队列起作用 -->
    <policyEntry queue="order">
        <deadLetterStrategy>        <!-- useQueueForQueueMessage属性的作用:是否将名为order的Topic中的DeadLetter也保存在该队列中,默认为true -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessage="false"/>
        </deadLetterStrategy>
    </policyEntry>
    

    注意:

    默认情况下,无论是Topic还是Queue,Broker都使用Queue来保存DeadLetter,即死信通道通常为Queue,不过开发时也可以指定为Topic

    4.1 自动删除过期消息

    自动删除过期消息,此时对于过期的消息将不会被放入到死信队列,而是自动删除,>表示对所有队列起作用,processExpired表示是否将过期消息放入死信队列,默认为true

    <!-- >表示对所有队列起作用 -->
    <policyEntry queue=">">
        <deadLetterStrategy>
            <sharedDeadLetterStrategy processExpired="false"/>
        </deadLetterStrategy>
    </policyEntry>
    
    4.2 存放非持久消息到死队列中

    将签收失败的非持久消息也放入到死信队列,默认情况下,ActiveMQ不会把非持久化的死消息放入死信队列,processNonPersistent表示是否将非持久化消息放入死信队列,默认为false

    <!-- >表示对所有队列起作用 -->
    <policyEntry queue=">">
        <deadLetterStrategy>
            <sharedDeadLetterStrategy processNonPersistent="true"/>
        </deadLetterStrategy>
    </policyEntry>
    

    对于过期的,可以通过processExpired属性来控制,对于redelivered的失败的消息,需要通过插件来实现如下:丢弃所有死信 。

    <broker>
        <plugins>
            <discardingDLQBrokerPlugin dropAll="true"
                    dropTemporaryTopics="true" dropTemporaryQueues="true"/>
        </plugins>
    </broker> 
    

    5 防止消息重复消费,幂等性

    ActiveMQ中的消息有时是会被重复消费的,而我们消费消息时大都会在拿到消息后去调用其他的方法,比如说将消息的内容解析为一个对象保存到数据库中。一旦发生消息的重复消费时就会重复保存,这是有问题的,因此我们需要考虑如何防止重复调用。其实我们是没有办法防止重复调用的,只能在重复调用时进行消息是否重复消费的校验,当然对于幂等性接口也可以不进行校验。 那如何进行校验呢?有很多种方式,比如说我们将消费过的消息的messageId保存到数据库,每次消费消息前先到数据库中查一下该消息是否已被消费。在分布式系统中,也可以将消费过的消息放入redis中,以messageId作为key,message对象作为value(其实value不重要,当然也要看需求本身),在消费消息时先从redis中查找该消息是否已被消费。

    相关文章

      网友评论

          本文标题:ActiveMQ的高级特性

          本文链接:https://www.haomeiwen.com/subject/jclroktx.html