美文网首页程序员
activeMq消息转投rabbitMq研究

activeMq消息转投rabbitMq研究

作者: 3c69b7c624d9 | 来源:发表于2017-11-30 02:02 被阅读368次

在研究activemq转投消息到rabbitmq的过程中还是发现了很多有趣的细节。 消息发送端分为PERSISTENT与NON_PERSISTENT,该类型表示是否持久化消息到数据库中。

  1. Activemq默认使用kahaDB。我大Q9使用的也是kahaDB。当然也是支持mysql等数据库的。

具体配置在${activemq.base}/conf/activemq.xml中。

    <persistenceAdapter>
       <kahaDB directory="${activemq.base}/data/kahadb"/>
    </persistenceAdapter>
    
    <!--
       The systemUsage controls the maximum amount of space the broker will
       use before slowing down producers. For more information, see:
    
       http://activemq.apache.org/producer-flow-control.html
       -->
    <systemUsage>
       <systemUsage>
          <memoryUsage>
             <memoryUsage limit="4 gb"/>
          </memoryUsage>
          <storeUsage>
             <storeUsage limit="15 gb"/>
          </storeUsage>
          <tempUsage>
             <tempUsage limit="100 mb"/>
          </tempUsage>
       </systemUsage>
    </systemUsage>

和大家理解不一样的地方是NON_PERSISTENT是会使用文件作为存储介质的。主要是为了防止内存挤爆。当发送者发送过快或者接受者处理过慢都会导致使用大量内存。此时将消息临时存储在临时文件中(swap)。

  1. 对于PERSISTENT与NON_PERSISTENT区别在于是否在mq服务器重启后能够正常发送消息。PERSISTENT的消息在服务器重启后依然能够将message发送出去。

如果服务端的topic没有订阅者该消息将被直接丢弃。

  1.   消费者的持久化则有一定区别。当为queue的时候,若客户端不在线等到某个客户端消费了该消息时则会将该消息删除。当为topic时,若客户端未设置subscriptionDurable,则该客户端必须要在线才能收到订阅。当客户端设置subscriptionDurable为true时,则服务器会保存该消息直到被所有的订阅者均消费一次(消费是指服务器收到ack回复)
    

<table>
<tbody>
<tr>
<td>消息发送端</td>
<td>消息接收端</td>
<td>可靠性及因素</td>
</tr>
<tr>
<td>PERSISTENT</td>
<td>queue receiver/durable subscriber</td>
<td>消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。</td>
</tr>
<tr>
<td>PERSISTENT</td>
<td>non-durable subscriber</td>
<td>最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。</td>
</tr>
<tr>
<td>NON_PERSISTENT</td>
<td>queue receiver/durable subscriber</td>
<td>最多消费一次。这是由于服务器的宕机会造成消息丢失</td>
</tr>
<tr>
<td>NON_PERSISTENT</td>
<td>non-durable subscriber</td>
<td>最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定</td>
</tr>
</tbody>
</table>

服务端可以根据clientId及durableSubscriptionName来辨别指定的订阅者以便将该订阅者尚未消费的消息供消费。

记得在设置subscriptionDurable时候也需要设置durableSubscriptionName如下注释。否则该名称会变成listener的名称。

    /**  * Set whether to make the subscription durable. The durable subscription name  * to be used can be specified through the "durableSubscriptionName" property.  * <p>Default is "false". Set this to "true" to register a durable subscription,  * typically in combination with a "durableSubscriptionName" value (unless  * your message listener class name is good enough as subscription name).  * <p>Only makes sense when listening to a topic (pub-sub domain).  * @see #setDurableSubscriptionName  */
``` plain

如下为一配置示例
```xml
    <bean id="jmsbillChoiceOfAviationContainer"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="expressJmsFactory" />    <property name="destination" ref="expressDssScanTopicDestination" />    <property name="messageListener" ref="billChoiceOfAviationTopicListener" />    <property name="durableSubscriptionName" value="billChoiceOfAviation" />    <property name="clientId" value="billChoiceOfAviationClient" />    <property name="subscriptionDurable" value="true" /> </bean>

请注意在配置destination属性是topic。该类还有一个属性为destinationName。配置该属性时会默认为queue。

在设置destination时执行如下,可以看到还有setPubSubDomain(true);才会表示为订阅模式否则为点对点模式

    /**  * Set the destination to receive messages from.  * <p>Alternatively, specify a "destinationName", to be dynamically  * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}.  * <p>Note: The destination may be replaced at runtime, with the listener  * container picking up the new destination immediately (works e.g. with  * DefaultMessageListenerContainer, as long as the cache level is less than  * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!  * @see #setDestinationName(String)  */ public void setDestination(Destination destination) {    Assert.notNull(destination, "'destination' must not be null");    this.destination = destination;    if (destination instanceof Topic && !(destination instanceof Queue)) {       // Clearly a Topic: let's set the "pubSubDomain" flag accordingly.       setPubSubDomain(true);    } }

因此在设置destinationName属性时如果是topic需要增加

    <property name="pubSubDomain" value="true"/>
  1.  rabbitMQ和activeMq很大的不同在于rabbitMq的消息发送完全基于queue。
    

在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

l 消费者是无法订阅或者获取不存在的MessageQueue中信息。

l 消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的

当rabbitmq的生产者发送消息出来后该消息会发送到指定的exchange中。Exchange分为如下几种常用类型:direct, fanout,topic

  1. fanout
    所有bind到此exchange的queue都可以接收消息

通常此处routingkey为””

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    Channel channel = connection.createChannel();  
    channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
    channel.queueDeclare("queueName");  
    channel.queueBind("queueName", "exchangeName", "routingKey");  
      
    byte[] messageBodyBytes = "hello world".getBytes();  
    //需要绑定路由键  
    channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
 
10151121_lhsn.png10151121_lhsn.png
  1. direct
    通过routingKey和exchange决定的那个唯一的queue可以接收消息

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。

    Channel channel = connection.createChannel();  
    channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
    channel.queueDeclare("queueName");  
    channel.queueBind("queueName", "exchangeName", "routingKey");  
      
    byte[] messageBodyBytes = "hello world".getBytes();  
    //需要绑定路由键  
    channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
10151121_WeHT.png10151121_WeHT.png
  1. topic
    所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
    表达式符号说明:#代表一个或多个字符,*代表任何字符
    例:#.a会匹配a.a,aa.a,aaa.a等
    *.a会匹配a.a,b.a,c.a等
    注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
    Channel channel = connection.createChannel();  
    channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic  
    channel.queueDeclare("queueName");  
    channel.queueBind("queueName", "exchangeName", "routingKey.*");  
      
    byte[] messageBodyBytes = "hello world".getBytes();  
    channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
10151121_aaUJ.png10151121_aaUJ.png

Rabbitmq和activemq区别很大的一点是在于当生产者发送消息给topic时,activemq是将该消息广播至该处所有的订阅者(包括离线持久订阅者),而rabbitmq的消息在发送时不一样是还需要配合routingkey。只有符合表达式的订阅者才会被转发。二订阅者依旧是关注被转发的queue,符合该表达式的消息会被转发至对应的queue中,这样客户端消费者才可以消费到。

因此想要持久化订阅topic在rabbitmq中还需要对应在exchange中增加一个名称唯一的queue来进行转发。

如下如果多个listener需要订阅该topic,则需要每个listener对应一个不同的queue,以便转发。

    <rabbit:topic-exchange  id="expressDssScanTopicExchange" name="expressDssScanTopicExchange" durable="true" >       <rabbit:bindings>           <rabbit:binding queue="express.scan" pattern="express.dssScan"/>    <rabbit:binding queue="express.dss" pattern="express.dssScan"/>       </rabbit:bindings>   </rabbit:topic-exchange>

相关文章

网友评论

    本文标题:activeMq消息转投rabbitMq研究

    本文链接:https://www.haomeiwen.com/subject/snymbxtx.html