起因:在实际项目开发过程中,需要使用RabbitMQ来实现消息队列的功能,比如说我发布一个动态之后,需要在30分钟使用默认用户给他点几个赞,之前考虑使用redis的zset对他进行操作,之后决定使用RabbitMQ,专业的事情使用专业的工具来操作。
1. 消息接收的应答模式ACK和NACK的使用
-
ACK就是手动签收的标示,如果消息的签收模式设置成为了手工模式,在MQ没有接收到ACK信息时都是Unacked的状态,并且消息还在队列中,这个时候消息不回重试,不会再主动发给消费者
-
如果在进行业务操作的时候,我们系统业务流程中出现了未知业务异常,比如里面某个服务环节出现网络超时的情况,这个时候如果签收,是不是业务根本没有达成,消息还消费掉了,这就无法补偿了,如果我没签收,这个时候消息就停留在这个队列里了,这个时候如果想要重试再次接收消息难道要重启服务吗?
-
NACK:将消息重回队列,如果我们发现异常,就可以调用NACK来将消息重回队列,他会重回到队尾
-
比如说执行的过程中发现异常,我们可以在catch里进行重回队列让消息再次执行
-
一般在业务中,我们重回队列执行的过程中会设置一个最大重回次数(重回计数可以使用redis),如果超过这个次数就执行ACK并进行记录,记录这个消息没有执行成功
-
import com.icoding.basic.po.OrderInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class OrderReceiver {
int flag = 0;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.*"
)
)
@RabbitHandler
public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("Order Name: "+orderInfo.getOrder_name());
Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//ACK进行签收,第一个参数是标识,第二个参数是批量接收为fasle
//channel.basicAck(deliverTag,false);
if(flag>3){
//说明执行了3次都没有成功
//消息确认
channel.basicAck(deliverTag,false);
//记录这个消息的日志或数据到DB/Redis/file
}else {
//可以设置时延迟几秒
flag = flag+1;
//前两个参数和上面ACK一样,第三个参数是否重回队列
channel.basicNack(deliverTag, false, true);
}
}
}
2. Exchange交换机Type详解
-
direct : 点对点直连的概念,比如我在binding里设置了一个routingkey是order.update,这个时候我们发送消息的routingkey就必须是order.update,如果不是order.update,这个消息就接收不到
-
topic : 点对点直连的概念,但是他支持routingkey的模糊匹配,可以在我们的routingke写匹配符
-
*:代表一个单词
-
#:代表没有或多个单词
-
这个匹配符用在我们binding里,既可以写在routingkey的前面也可以写在routingkey的后面(order.*/#.insert)
-
-
fanout:只要exchange进行binding了消息队列,就直接将消息传给消息队列了,因为不绑定任何的routingkey所以是转发消息最快的(广播方式)
-
header:根据消息header头来判断,较少使用
3. 消息队列的TTL设置和使用
什么是TTL:Time To Live,也就是生存时间
-
RabbitMQ是支持消息过期机制的,如果你设置了消息的TTL,这个消息没有及时消费,这个消息就丢了,或者说消失了
-
队列整体的消息过期时间,就是一个Time box,给这个队列设置一个过期时间,那么这个队列里的消息从进入队列开始计算,达到了这个时间如果还没有消费就直接丢掉了
-
x-message-ttl : 消息队列的整体消息的TTL,单位是毫秒
-
x-expires :消息队列空闲的市场,如果空闲超过这个时间就会自动删除,单位毫秒
-
x-max-length :消息队列存放消息的总消息数,如果超过会挤掉最早的那个数据
-
x-max-length-bytes :消息队列的最大容量,新消息过来如果容量不够会删除最早的消息,如果还不够,再删一条次最早的消息
-

-
如果我只想某个消息按时间过期,那么就不能使用消息队列的TTL,就要对消息本身进行TTL
- 在发送端进行消息过期设置
import com.icoding.basic.po.OrderInfo;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendOrder(OrderInfo orderInfo) throws Exception{
/**
* exchange: 交换机名字
* routingkey: 队列关联的key
* object: 要传输的消息对象
* correlationData: 消息的唯一id
*/
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getMessage_id());
//在这里设置message的TTL
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
};
//将messagePostProcessor加入到参数中
rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,messagePostProcessor,correlationData);
}
}
4. 死信队列详解及进入死信队列的机制
首先要看一下什么是死信:当一个消息无法被消费时就变成了死信
死信是怎么形成的:消息在没有被消费前就失效的就属于死信
一个系统中是没有无缘无故生成的消息,如果这个消息失效了没有了,是不是可能导致业务损失,如果这种消息我们需要记录或补偿,将这种消息失效的时候放到一个队列中,待我们人工补偿和消费,这个放死信的队列就是死信队列
希望我们的消息在失效的时候进入到死信队列中
我们的死信队列其实也是一个正常队列,只是赋予了他这个概念
死信队列的另一功能就是延迟消息

x-dead-letter-exchange :这个参数就是指定死信队列的Exchange的名字,这个Exchange就是一个普通的Exchange,需要手工创建
x-dead-letter-routing-key :这个参数就是指定死信队列的Routingkey,这个routingkey需要自己创建好队列和Exchange进行binding时填入
redis的keyevent通过pub/sub机制来订阅信息的,如果sub端在pub发布信息之后订阅就会导致信息丢失,而我们的死信因为时队列所以无所谓什么时候消费
不要以为每天把功能完成了就行了,这种思想是要不得的,互勉~!
网友评论