1.生产者失败通知
- 在发送消息时设置 mandatory 标志,告诉 RabbitMQ,如果消息不可路由,应该将消息返回给发送者,并通知失败。
- 它只会让 RabbitMQ 向你通知失败,而不会通知成功。
如果消息正确路由到队列,则发布者不会受到任何通知。 -
带来的问题是无法确保发布消 息一定是成功的,因为通知失败的消息可能会丢失。
image.png
生产者中:
- 设置监听回调
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String message = new String(bytes);
System.out.println("返回的replycode:"+replycode);
System.out.println("返回的replyText:"+replyText);
System.out.println("返回的exchange:"+exchange);
System.out.println("返回的routeKey:"+routeKey);
}
});
- 发送时将mandatory设置为true
channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());
2.事务
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
- channel.txSelect()声明启动事务模式;
- channel.txComment()提交事务;
- channel.txRollback()回滚事务;
既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低 2~10 倍的性能。
3.发送方确认模式
image.png比事务更轻量,性能影响几乎可以忽略不计
原理:
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),由这
个 id 在生产者和 RabbitMQ 之间进行消息的确认。
confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最 终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产 者应用程序同样可以在回调方法中处理该 nack 消息决定下一步的处理。
Confirm 的三种实现方式:
- 方式一:channel.waitForConfirms()普通发送方确认模式;
消息到达交换器,就会返回 true - 方式二:channel.waitForConfirmsOrDie()批量确认模式;
使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未到达交换器就会 抛出 IOException 异常。 - 方式三:channel.addConfirmListener()异步监听发送方确认模式;
4.备用交换器
如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器。
如果生产者设置了 mandatory 会发生什么?如果主交换器无法路由消息,RabbitMQ 并不会通知发布者,因为,向备用交换器发送消息。
建议备用交换器设置为 faout 类型,Queue 绑定时的路由键设置为“#”
5.消息获取方式
- 拉取
属于一种轮询模型,发送一次 get 请求,获得一个消息。如果此时 RabbitMQ 中没有消息,会获得一个表示空的回复。
总的来说,这种方式性能比较差,很明显,每获得一条消息,都要和 RabbitMQ 进行网络通信发出请求。而且对 RabbitMQ 来说,RabbitMQ 无法进行任何优化,因为它永远不知道应用程序何时会发出请求。对我们实现者来说,要在一个循环里,不断去服 务器 get 消息。
- 推送 Consume
属于一种推送模型。注册一个消费者后,RabbitMQ 会在消息可用时,自动将消息进行推送给消费者。
6.消息的应答
- 自动确认
消费者在声明队列时,可以指定 autoAck 参数,当 autoAck=true 时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。 - 手动确认
当 autoAck=false 时,RabbitMQ 会等待消费者显式发回 ack 信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。
只要令 autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题, 因为 RabbitMQ 会一直持有消息直到消费者显式调用 basicAck 为止。
/*声明了一个消费者*/
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
+"]"+message);
//TODO 这里进行确认
System.out.println("手动确认的tag:"+envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/*消费者正式开始在指定队列上消费消息*/
//TODO 这里第二个参数是自动确认参数,如果是false则是手动确认
channel.basicConsume(queueName,false,consumer);
- 我们一般使用手动确认的方法是,将消息的处理放在 try/catch 语句块中,成功处理了,就给 RabbitMQ 一个确认应 答,如果处理异常了,就在 catch 中,进行消息的拒绝:
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
+"]"+message);
//TODO
//确认
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
//拒绝
}
}
};
/*消费者正式开始在指定队列上消费消息*/
//TODO 这里第二个参数是自动确认参数,如果是false则是手动确认
channel.basicConsume(queueName,false,consumer);
7.消息的拒绝
消息确认可以让 RabbitMQ 知道消费者已经接受并处理完消息。但是如果消息本身或者消息的处理过程出现问题怎么办?需要一种机制,通知RabbitMQ,无法正常消费,拒绝,请让别的消费者处理。这里就有两种机制,Reject 和 Nack。
- Reject
Reject 在拒绝消息时,可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被
RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递 - Nack
跟 Reject 类似,只是它可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。 -
过程:
image.png
image.png
/*声明了一个消费者*/
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
try{
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()+"]"+message);
//模拟异常
throw new RuntimeException("处理异常"+message);
}catch (Exception e){
e.printStackTrace();
//TODO Reject方式拒绝(这里第2个参数决定是否重新投递requeue)
// channel.basicReject(envelope.getDeliveryTag(),true);
//TODO Nack方式的拒绝(第2个参数决定是否批量,第三个参数requeue)
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
}
};
/*消费者正式开始在指定队列上消费消息*/
channel.basicConsume(queueName,false,consumer);
8.QoS 预取模式
在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息
之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
这种机制一方面可以实现限速(将消息暂存到 RabbitMQ 内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。
注意:消费确认模式必须是非自动 ACK 机制(这个是使用 baseQos 的前提条件,否则会 Qos 不生效),然后设置 basicQos 的值;另外,还可以基于 consume 和 channel 的粒度进行设置(global)。
9.消费者中的事务
分两种情况:
- autoAck=false 手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但 RabbitMQ 对消息的确认会等事务的 返回结果,再做最终决定是确认消息还是重新放回队列,如果你手动确认之后,又回滚了事务,那么以事务回滚为准,此条消息会重新放回队列;
- autoAck=true 如果自动确认为 true 的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把 消息移除了。
10.死信交换器 DLX
死信条件:
- 消息被拒绝,并且设置 requeue 参数为 false
- 消息过期
- 队列达到最大长度(一般当设置了最大队列长度或大小并达到最大值时)
死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的 参数是 x-dead-letter-exchange。
//TODO 绑定死信交换器
String queueName = "dlx_make";
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-dead-letter-exchange", DlxProcessConsumer.DLX_EXCHANGE_NAME);
//TODO 死信路由键,会替换消息原来的路由键
channel.queueDeclare(queueName,false,true,
false,
args);
/*绑定,将队列和交换器通过路由键进行绑定*/
channel.queueBind(queueName,
DlxProducer.EXCHANGE_NAME,"#");
网友评论