Confirm确认消息
- 消息确认,是指生产者消息投递后,如果Broker收到消息,则会给生产者一个应答
-
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障
如何实现Confirm确认消息?
- 在channel上开启确认模式:
channel.confirmSelect()
- 在channel上添加监听:
addConfirmListener()
,监听成功和失败的返回结果,根据具体返回结果对消息进行重发或日志记录等
Return 消息机制
- Return Listener用于处理一些不可路由消息
- 生产者指定Exchange和RoutingKey,将消息投递到某个队列,然后消费者监听队列,进行消息处理
- 但在某些情况下,在发送消息时,若当前的exchange不存在或指定的路由key路由失败,这时,如果需要监听这种不可达的消息,则要使用return listener
相关的API配置
在 基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息.
Return消息机制流程
找不到Exchange或者是找到了Exchange未找到RouteKey(简单点理解就是消息没有被发送到queue中[非故障情况下])
消费端的自定义监听
- 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获下一条消息,然后进行消费处理
- 但是我们使用自定义的Consumer更加方便,解耦性更加的强,也是在实际工作当中最常用的使用方式
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
通过继承DefaultConsumer来达到自定义消费操作
然后再Consumer类中使用basicConsume()方法的时候,使用自定义的Consumer方法
//这里就没有使用到了nextDelivery()方法,因为在自定义消费端中一样的具有接收body,在自定义的Consumer中也没有创建nextDelivery()方法
channel.basicConsume(queueName, true, new MyConsumer(channel));
消费端限流
什么是消费端限流
- 假设一个场景,首先,我们的RabbitMQ服务器上有上万条吧未处理的消息,我们随便打开一个消费者客户端,会出现下面情况,巨量的消息瞬间全部传递过来,但是我们单个客户端无法处理这么多的数据.
- 或者是当生产端的生产效率大于了消费端的消费效率
解决方案:RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息,(通过基于Consume或者Chanel设置的Qos的值)未被确认前,不进行消费新的消息) 所以在实际的工作当中没有进行自动签收的操作,也就是autoAck设置为false
void BasicQos(unit prefetchSize,unshort prefetchCount,bool global)
- prefetchSize:0 消费的单挑消息的大小限制,0代表不限制
- prefetchCount:不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
- global:true\false true:上面设置应用于channel级别; false:应用于consumer级别
消费端的ACK与重回队列
消费端可以进行手工ACK和NACK(不确认,表示失败)
- 消费端进行消费时,如果由于业务异常,可以进行日志记录,然后进行补偿
- 如果由于服务器宕机等严重问题,需要手工进行ACK保障消费端消费成功
消费端的重回队列:
- 对没有处理成功的消息,把消息重新传递给Broker
- 一般在实际应用中,会关闭重回队列
在消息处于NAck的情况下,使用requeue为true,即重回队列 但是这个失败的消息,需要等队列中的消息发送完毕之后再进行
TTL消息
- 生存时间(
Time to Live,TTL
),指的是消息的生成时间 - RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
- 还支持队列的过期时间,从消息入队列开始计时,只要超过了队列的超时时间,那么消息会自动的清除
死信队列
- 死信队列(Dead-Letter-Exchange,DLX)
- 利用DLX,当消息在队列中变成死信(dead message:没有消费者去消费)之后,它能被重新publish到另一个Exchange,这个Exchange就是死信队列
- DLX是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上,进而被路由到另一个队列
- 可以监听这个队列中的消息并做相应的处理
消息变成死信的情况
- 消息被拒绝
(basic.rejcet/basic.nack)且requeue=false
- 消息TTL过期
- 队列达到最大长度
死信队列的设置
- 首先要设置死信队列的exchange和queue,然后进行绑定:
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey:#
然后进行正常声明交换机、队列、绑定,只不过需要在队列上加上一个参数:arguments.put("x-dead-letter-exchange","dlx.exchange);
这样消息在过期、requeue、队列达到最大长度时,消息就可以直接路由到死信队列
当这个队列的消息成为死信之后,就会把消息转发待死信队列当中 消息过期前,即未进入死信队列前
消息过期后,进入死信队列
网友评论