美文网首页
RabbitMQ ACK、NACK、Type、TTL、死信

RabbitMQ ACK、NACK、Type、TTL、死信

作者: dylan丶QAQ | 来源:发表于2020-09-28 20:54 被阅读0次

起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,比如说我发布一个动态之后,需要在30分钟使用默认用户给他点几个赞,之前考虑使用redis的zset对他进行操作,之后决定使用RabbitMQ,专业的事情使用专业的工具来操作。


1. 消息接收的应答模式ACK和NACK的使用

  • ACK就是手动签收的标示,如果消息的签收模式设置成为了手工模式,在MQ没有接收到ACK信息时都是Unacked的状态,并且消息还在队列中,这个时候消息不回重试,不会再主动发给消费者

  • 如果在进行业务操作的时候,我们系统业务流程中出现了未知业务异常,比如里面某个服务环节出现网络超时的情况,这个时候如果签收,是不是业务根本没有达成,消息还消费掉了,这就无法补偿了,如果我没签收,这个时候消息就停留在这个队列里了,这个时候如果想要重试再次接收消息难道要重启服务吗?

  • NACK:将消息重回队列,如果我们发现异常,就可以调用NACK来将消息重回队列,他会重回到队尾

    • 比如说执行的过程中发现异常,我们可以在catch里进行重回队列让消息再次执行

    • 一般在业务中,我们重回队列执行的过程中会设置一个最大重回次数(重回计数可以使用redis),如果超过这个次数就执行ACK并进行记录,记录这个消息没有执行成功

import com.icoding.basic.po.OrderInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
​
@Component
public class OrderReceiver {
​
 int flag = 0;
​
 @RabbitListener(bindings = @QueueBinding(
 value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
 exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
 key = "order.*"
 )
 )
 @RabbitHandler
 public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println("************消息接收开始***********");
 System.out.println("Order Name: "+orderInfo.getOrder_name());
 Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
 //ACK进行签收,第一个参数是标识,第二个参数是批量接收为fasle
 //channel.basicAck(deliverTag,false);
 if(flag>3){
 //说明执行了3次都没有成功
 //消息确认
 channel.basicAck(deliverTag,false);
 //记录这个消息的日志或数据到DB/Redis/file
 }else {
 //可以设置时延迟几秒
 flag = flag+1;
 //前两个参数和上面ACK一样,第三个参数是否重回队列
 channel.basicNack(deliverTag, false, true);
 }
 }
}

2. Exchange交换机Type详解

  • direct : 点对点直连的概念,比如我在binding里设置了一个routingkey是order.update,这个时候我们发送消息的routingkey就必须是order.update,如果不是order.update,这个消息就接收不到

  • topic : 点对点直连的概念,但是他支持routingkey的模糊匹配,可以在我们的routingke写匹配符

    • *:代表一个单词

    • #:代表没有或多个单词

    • 这个匹配符用在我们binding里,既可以写在routingkey的前面也可以写在routingkey的后面(order.*/#.insert)

  • fanout:只要exchange进行binding了消息队列,就直接将消息传给消息队列了,因为不绑定任何的routingkey所以是转发消息最快的(广播方式)

  • header:根据消息header头来判断,较少使用

3. 消息队列的TTL设置和使用

什么是TTL:Time To Live,也就是生存时间

  • RabbitMQ是支持消息过期机制的,如果你设置了消息的TTL,这个消息没有及时消费,这个消息就丢了,或者说消失了

  • 队列整体的消息过期时间,就是一个Time box,给这个队列设置一个过期时间,那么这个队列里的消息从进入队列开始计算,达到了这个时间如果还没有消费就直接丢掉了

    • x-message-ttl : 消息队列的整体消息的TTL,单位是毫秒

    • x-expires :消息队列空闲的市场,如果空闲超过这个时间就会自动删除,单位毫秒

    • x-max-length :消息队列存放消息的总消息数,如果超过会挤掉最早的那个数据

    • x-max-length-bytes :消息队列的最大容量,新消息过来如果容量不够会删除最早的消息,如果还不够,再删一条次最早的消息

  • 如果我只想某个消息按时间过期,那么就不能使用消息队列的TTL,就要对消息本身进行TTL

    • 在发送端进行消息过期设置
import com.icoding.basic.po.OrderInfo;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
public class OrderSender {
​
 @Autowired
 RabbitTemplate rabbitTemplate;
​
 public void sendOrder(OrderInfo orderInfo) throws Exception{
 /**
 * exchange: 交换机名字
 * routingkey: 队列关联的key
 * object: 要传输的消息对象
 * correlationData: 消息的唯一id
 */
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(orderInfo.getMessage_id());
 //在这里设置message的TTL
 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
 @Override
 public Message postProcessMessage(Message message) throws AmqpException {
 message.getMessageProperties().setExpiration("5000");
 return message;
 }
 };
 //将messagePostProcessor加入到参数中
 rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,messagePostProcessor,correlationData);
 }
}

4. 死信队列详解及进入死信队列的机制

首先要看一下什么是死信:当一个消息无法被消费时就变成了死信

死信是怎么形成的:消息在没有被消费前就失效的就属于死信

一个系统中是没有无缘无故生成的消息,如果这个消息失效了没有了,是不是可能导致业务损失,如果这种消息我们需要记录或补偿,将这种消息失效的时候放到一个队列中,待我们人工补偿和消费,这个放死信的队列就是死信队列

希望我们的消息在失效的时候进入到死信队列中

我们的死信队列其实也是一个正常队列,只是赋予了他这个概念

死信队列的另一功能就是延迟消息

x-dead-letter-exchange :这个参数就是指定死信队列的Exchange的名字,这个Exchange就是一个普通的Exchange,需要手工创建

x-dead-letter-routing-key :这个参数就是指定死信队列的Routingkey,这个routingkey需要自己创建好队列和Exchange进行binding时填入

redis的keyevent通过pub/sub机制来订阅信息的,如果sub端在pub发布信息之后订阅就会导致信息丢失,而我们的死信因为时队列所以无所谓什么时候消费


不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!

相关文章

网友评论

      本文标题:RabbitMQ ACK、NACK、Type、TTL、死信

      本文链接:https://www.haomeiwen.com/subject/tbgfuktx.html