美文网首页
RabbtiMQ高级特性--消息投递机制

RabbtiMQ高级特性--消息投递机制

作者: 叫我胖虎大人 | 来源:发表于2019-08-18 15:54 被阅读0次

Confirm确认消息

  • 消息确认,是指生产者消息投递后,如果Broker收到消息,则会给生产者一个应答
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障


如何实现Confirm确认消息?

  • 在channel上开启确认模式:channel.confirmSelect()
  • 在channel上添加监听:addConfirmListener(),监听成功和失败的返回结果,根据具体返回结果对消息进行重发或日志记录等

Return 消息机制

  • Return Listener用于处理一些不可路由消息
  • 生产者指定Exchange和RoutingKey,将消息投递到某个队列,然后消费者监听队列,进行消息处理
  • 但在某些情况下,在发送消息时,若当前的exchange不存在或指定的路由key路由失败,这时,如果需要监听这种不可达的消息,则要使用return listener

相关的API配置

在 基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息.

Return消息机制流程

找不到Exchange或者是找到了Exchange未找到RouteKey(简单点理解就是消息没有被发送到queue中[非故障情况下])


消费端的自定义监听

  • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获下一条消息,然后进行消费处理
  • 但是我们使用自定义的Consumer更加方便,解耦性更加的强,也是在实际工作当中最常用的使用方式
public class MyConsumer extends DefaultConsumer {


   public MyConsumer(Channel channel) {
      super(channel);
   }

   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      System.err.println("-----------consume message----------");
      System.err.println("consumerTag: " + consumerTag);
      System.err.println("envelope: " + envelope);
      System.err.println("properties: " + properties);
      System.err.println("body: " + new String(body));
   }
}

通过继承DefaultConsumer来达到自定义消费操作
然后再Consumer类中使用basicConsume()方法的时候,使用自定义的Consumer方法

//这里就没有使用到了nextDelivery()方法,因为在自定义消费端中一样的具有接收body,在自定义的Consumer中也没有创建nextDelivery()方法
channel.basicConsume(queueName, true, new MyConsumer(channel));

消费端限流

什么是消费端限流

  • 假设一个场景,首先,我们的RabbitMQ服务器上有上万条吧未处理的消息,我们随便打开一个消费者客户端,会出现下面情况,巨量的消息瞬间全部传递过来,但是我们单个客户端无法处理这么多的数据.
  • 或者是当生产端的生产效率大于了消费端的消费效率

解决方案:RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息,(通过基于Consume或者Chanel设置的Qos的值)未被确认前,不进行消费新的消息) 所以在实际的工作当中没有进行自动签收的操作,也就是autoAck设置为false
void BasicQos(unit prefetchSize,unshort prefetchCount,bool global)

  • prefetchSize:0 消费的单挑消息的大小限制,0代表不限制
  • prefetchCount:不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
  • global:true\false true:上面设置应用于channel级别; false:应用于consumer级别

消费端的ACK与重回队列

消费端可以进行手工ACK和NACK(不确认,表示失败)

  • 消费端进行消费时,如果由于业务异常,可以进行日志记录,然后进行补偿
  • 如果由于服务器宕机等严重问题,需要手工进行ACK保障消费端消费成功

消费端的重回队列:

  • 对没有处理成功的消息,把消息重新传递给Broker
  • 一般在实际应用中,会关闭重回队列

在消息处于NAck的情况下,使用requeue为true,即重回队列 但是这个失败的消息,需要等队列中的消息发送完毕之后再进行

TTL消息

  • 生存时间(Time to Live,TTL),指的是消息的生成时间
  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
  • 还支持队列的过期时间,从消息入队列开始计时,只要超过了队列的超时时间,那么消息会自动的清除

死信队列

  • 死信队列(Dead-Letter-Exchange,DLX)
  • 利用DLX,当消息在队列中变成死信(dead message:没有消费者去消费)之后,它能被重新publish到另一个Exchange,这个Exchange就是死信队列
  • DLX是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上,进而被路由到另一个队列
  • 可以监听这个队列中的消息并做相应的处理

消息变成死信的情况

  • 消息被拒绝(basic.rejcet/basic.nack)且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

死信队列的设置

  • 首先要设置死信队列的exchange和queue,然后进行绑定:
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#

然后进行正常声明交换机、队列、绑定,只不过需要在队列上加上一个参数:arguments.put("x-dead-letter-exchange","dlx.exchange);
这样消息在过期、requeue、队列达到最大长度时,消息就可以直接路由到死信队列


当这个队列的消息成为死信之后,就会把消息转发待死信队列当中 消息过期前,即未进入死信队列前
消息过期后,进入死信队列

参考课程:https://coding.imooc.com/class/262.html

相关文章

网友评论

      本文标题:RabbtiMQ高级特性--消息投递机制

      本文链接:https://www.haomeiwen.com/subject/dcirsctx.html