文章参考:Rabbit实战指南
消息何去何从
mandatory和immediate是channel.basicPublish
方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。
-
mandatory参数
当mandatory参数设置为true时,交换器无法根据自身类型和路由键找到一个符号条件的队列,那么RabbitMQ会调用
Basic.Return
命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,消息直接丢弃。生产者如何获取到没有被正确路由到合适队列的消息呢?
可以通过调用
channel.addReturnListener
来添加ReturnListener监听器来实现。使用mandatory参数的关键代码如下
channel.basicPublish(EXCHANGE_NAME,"",true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes()); channel.addReturnListener(new ReturnListener(){ public void handleReturn(int replyCode,String replyText, String exchange,String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ String message = new String(body); System.out.println("Basic.Return返回的结果是:"+message); } });
上面代码中生产者没有成功将消息路由到队列,此时RabbitMQ会通过
Basic.Return
返回“mandatory test”这条消息,之后生产者客户端通过ReturnListener监听到这个事件,上面代码最后输出的应该是“Basic.Return返回的结果是:madatory test”。 -
immediate参数
当immediate参数设为true时,如果交换器在将消息路由到队列时,发现队列上并不存在任何消费者,那么这条消息不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过
Basic.Return
返回给生产者。概况来说,mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。
-
备份交换器
备份交换器,英文名为Alternate Exchange。生产者在发送消息的时候,如果没有设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置mandatory参数,那么添加ReturnListener的编程逻辑,生产者的代码将会变得复杂。如果既不想复杂,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,在需要的时候去处理这些消息。
可以通过在声明交换器(调用
channel.exchangeDeclare
方法)的时候添加alternate-exchange参数来实现,也可以通过策略(Policy)的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉Policy的设置。使用参数设置关键代码如下:
Map<String,Object> args = new HashMap<String,Object>(); args.put("alternate-exchange","myAe"); channel.exchangeDeclare("normalExchange","direct",true,false,args); channel.exchangeDeclare("myAe","fanout",true,false,null); channel.queueDeclare("normalQueue",true,false,false,null); channel.queueBind("normalQueue","normalExchange","normalKey"); channel.queueDeclare("unroutedQueue",true,false,false,null); channel.queueBind("unroutedQueue","myAe","");
上面代码声明了两个交换器normalExchange和myAe,分别绑定了normalQueue和unroutedQueue这两个队列,同时将myAe设置为normalExchange的备份交换器。注意myAe的交换器类型为fanout。
如果此时发送一条信息到normalExchange上,当路由键等于“normalKey”的时候,消息能正确路由到normalQueue这个队列上。如果路由键设置为其他值,比如“errorKey”,即消息不能正确的路由到与normalExchange绑定的任何队列上,此时,就会发送给myAe,进而发送个unroutedQueue这个队列。
同样,如果采取Policy的方式来设置备份交换器,可参考如下:
rabbitmqctl set_policy AE "^noramlExchange$" '{"alternate-exchange":"myAE"}'
备份交换器和普通交换器没有啥太大区别,为了方便使用,建议设置为fanout类型。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。
对于备份交换器,总结了以下几个情况:
- 如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出血,此时,消息丢失。
- 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出血,此时,消息丢失。
- 如果备份交换器没有任何匹配的队列,户端和RabbitMQ服务端都不会有异常出血,此时,消息丢失。
- 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
网友评论