美文网首页
ActiveMQ之Dispatch Message

ActiveMQ之Dispatch Message

作者: 爱健身的兔子 | 来源:发表于2021-01-08 15:35 被阅读0次

    1 Message Cursors(消息指针)

    在使用非持久消息传递时,ActiveMQ的早期版本中的一个常见问题是RAM缓存区不足。

    ActiveMQ 5.0之前的版本在内存中保留了对所有可能分发给活动的“持久主题使用者”或“队列”的消息的引用。尽管引用本身并不大,但确实对可以挂起的最大消息数量施加了限制。

    消息系统分发持久消息的一种典型方法是,当客户端准备使用它们时,使用游标保持下一个分配位置,从长期存储中批量提取它们。这是一种健壮且可扩展性强的方法,但是对于消费者(一个或多个)可以跟上消息的产生者(一个或多个)的情况,并不是最有效的方法。

    ActiveMQ 5.0采用了一种混合方法,允许消息直接从生产者传递到使用者(在消息保留之后),但是如果消费者慢下来,则切换回使用游标。

    当消息的消费者处于活跃状态并且处理能力较强时,被持久化存储的消息直接发送到与消费者相关联的发送队列,如下:

    当消息已经出现积压,消费者再开始活跃,或者是消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,并发送与消费者关联的发送队列。见下图:

    1.1 游标类型

    ActiveMQ 5.0中的默认消息光标类型是基于存储的。它的行为如上所述。可以使用两种其他类型的游标:VM游标基于文件的游标。同时支持非持久消息的处理,Store-based内嵌了File-based的模式,非持久消息直接被Non-persistent pending Cursor所处理。工作模式见下图:

    1.2 VM游标

    VM Cursor是ActiveMQ 4.x的工作方式:对消息的引用保存在内存中,并在需要时传递给调度队列。这可能非常快,但也有一个缺点,就是无法处理非常慢的消费者或长时间不活动的消费者。

    1.3 基于文件的游标

    基于文件的游标源自VM游标。当代理中的内存达到其限制时,它可以将消息分页到磁盘上的临时文件:

    1.4 配置使用

    默认情况下,使用基于存储的游标,但是可以根据目标配置不同的游标。

    1. Topic subscribers
    <destinationPolicy>
          <policyMap>
            <policyEntries>
              <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
                <dispatchPolicy>
                  <strictOrderDispatchPolicy />
                </dispatchPolicy>
                <deadLetterStrategy>
                  <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
                </deadLetterStrategy>
                <pendingSubscriberPolicy>
                    <vmCursor />
                </pendingSubscriberPolicy>
                <pendingDurableSubscriberPolicy>
                    <vmDurableCursor/>
                </pendingDurableSubscriberPolicy>
              </policyEntry>
            </policyEntries>
          </policyMap>
    </destinationPolicy>
    

    说明

    有效的Subscriber类型是vmCursorfileCursor,缺省是store based cursor。有效的持久化subscribe的cursor types是storeDurableSubscriberCursorvmDurableCursorfileDurableSubscriberCursor,缺省是store based cursor。

    1. Queue
    <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry queue="org.apache.>" >
                        <deadLetterStrategy>
                            <individualDealLetterStrategy  topicPrefix="Test.DLQ"/>
                        </deadLetterStrategy>
                        <pendingQueuePolicy>
                            <vmQueueCursor/>
                        </pendingQueuePolicy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
    </destinationPolicy>
    

    2 Async Send(异步发送)

    ActiveMQ支持异步和同步发送消息,是可以配置,通常对于快的消费者,是直接把消息同步发送过去,但是对于一个慢消费者,你使用同步发送消息可能出现producer堵塞等现象,慢消费者适合异步发送。

    配置使用:

    1. ActiveMQ默认设置dispatchAsync=true是最好的性能设置。如果你处理Fast Consumer则使用dispatchAsync=false

    2. 在Connection URI级别来配置使用Async Send

    factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");
    
    1. 在ConnectionFactory级别来配置使用Async Send
    factory.setUseAsyncSend(true);(此方法是存在于ActiveMQConnectionFactory中的)
    
    1. Connection级别来配置使用Async Send
    connection.setUseAsyncSend(true); (此方法存在于ActiveMQConnection中)
    

    3 Optimized Acknowledgement(消息确认机制)

    ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。若希望禁止使用经过优化的确认方式,有以下几种方式:

    1. 在Connection URI 上禁止启用Optimized Acknowledgements。
    cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
    
    1. 在ConnectionFactory 上禁止启用Optimized Acknowledgements。
    ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
    
    1. 在Connection上禁止启用Optimized Acknowledgements。
    ((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
    

    4 Dispatch Policies(分发策略)

    ActiveMQ的prefetch缺省参数,是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大,而且缺省的dispatche policies会尝试尽可能快的填满缓冲。

    然而有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个边的consumer上,这样就会因为负载的不均衡而导致处理时间的增加。

    对于队列,您可以定义是否以round-robin循环方式(默认行为)进行调度,还是在调度过程选择下一个消费者(strictOrderDispatch)之前是否耗尽一个消费者的预取缓冲区。

    4.1 Queue配置

    queue配置成 strictOrderDispatch 模式。

    <policyEntry queue=">" strictOrderDispatch="false" />
    

    注意

    从5.14.0版开始-当有一个使用者时,strictOrderDispatch = true(round-robin) 选项将确保重新分发的消息的严格顺序。

    4.2 Topic配置

    Round Robin dispatch policy会尝试平均分发消息:

    <policyEntry topic="FOO.>">
        <dispatchPolicy>
            <roundRobinDispatchPolicy/>
        </dispatchPolicy>
    </policyEntry>    
    

    5 Producer Flow Control(生产者流量控制)

    流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞知道资源可以继续使用,或者抛出一个JMSException,可以通过来配置。 值得注意的是,默认<systemUsage>设置会在达到或限制时导致生产者阻塞

    • 同步发送消息的producer会自动使用producer flow control。

    • 对于异步发送消息的producer,要使用producer flow control,先要为connection配置一个ProducerWindowSize参数,如下:

    ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);
    

    ProducerWindowSize是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前,能够发送消息的最大字节数。

    5.1 禁用 producer flow control

    可以通过producerFlowControl在Broker配置中将适当的目标策略上的标志设置为false来对代理上的特定JMS队列和主题禁用流控制。

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="FOO.>" producerFlowControl="false"/>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
    

    注意

    自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。

    如果想把所有的非持久化消息存放到内存中,并在达到内存限制的时候停掉生产者,你需要配置,示例如下:

    <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">    
      <pendingQueuePolicy>
        <vmQueueCursor/>
      </pendingQueuePolicy>
    </policyEntry>
    

    上面的片段将确保所有非持久队列消息都保留在内存中,每个队列的限制为1Mb。

    5.2 配置客户端异常

    为了应对broker空间不足,而导致的不确定的阻塞send()方法的一种替代方案,就是将其配置成客户端抛出的一个异常,通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端,配置示例:

    <systemUsage>
     <systemUsage sendFailIfNoSpace="true">
       <memoryUsage>
         <memoryUsage limit="20 mb"/>
       </memoryUsage>
     </systemUsage>
    </systemUsage>
    

    此属性的优点是客户端可以捕获javax.jms.ResourceAllocationException,稍等片刻然后重试该send()操作,而不仅仅是无限期地挂起。

    从版本5.3.1开始,sendFailIfNoSpaceAfterTimeout已添加该属性。此属性会导致send()操作失败,并在客户端发生异常,但仅在等待给定的时间后才会失败。如果在配置的时间后仍未释放代理上的空间,则send()客户端操作才会失败。下面是一个示例:

    <systemUsage>
     <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
       <memoryUsage>
         <memoryUsage limit="20 mb"/>
       </memoryUsage>
     </systemUsage>
    </systemUsage>
    

    超时以毫秒为单位定义,因此以上示例在send()操作失败之前要等待三秒钟。此属性的优点是,它将在配置的时间量内阻塞,而不是立即失败或无限期阻塞。此属性不仅在代理方面提供了改进,而且还为客户端提供了改进,因此它可以捕获异常,稍等片刻然后重试该send()操作。

    ActiveMQ

    相关文章

      网友评论

          本文标题:ActiveMQ之Dispatch Message

          本文链接:https://www.haomeiwen.com/subject/krdfaktx.html