RabbitMQ消息 确认分为两种:一是生产确认,二是消费确认
RabbitMQ本身支持两种 确认方式:一是事务确认,二是ACK确认
这里直接介绍Spring Boot+RabbitMQ 的消息确认(ACK)
一:生产确认
生产者确认需要在生产的地方实现 RabbitTemplate.ConfirmCallback
@Service
public class PersonalService implements RabbitTemplate.ConfirmCallback{
@Autowired
public UserInfoDao userInfoDao;
@Autowired
public MailTools mailTools;
@Autowired
public LoginService loginService;
public RabbitTemplate rabbitTemplate;
static Logger log = Logger.getLogger(PersonalService.class);
/**
* 需要通过生产者的构造器去注入RabbitTemplate,并设置他 回调确认对象为 当前对象。
*/
public PersonalService(RabbitTemplate rabbitTemplate){
this.rabbitTemplate=rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
}
实现了 ConfirmCallback 之后需要实现 confirm 方法
/**
* 消息发送到队列回调该方法
* correlationData : 发送消息时给的id
* cause : 会返回错误信息,正确为null
* ack:是否正确发送
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO Auto-generated method stub
System.out.println("confirm --> "+correlationData.getId()+" ->"+ack+" ->"+cause);
}
二:消费确认
在Rabbitmq+Springboot中,消费者的实现方式为注解方式:
@Component
public class MessageReceiver {
@RabbitListener(queues = AmqpConfiguration.QUEUE)
//@RabbitHandler
public void receive(String hello, Channel channel, Message message){
// 限流处理:消息体大小不限制,每次限制消费一条,只作用于该Consumer层,不作用于Channel
channel.basicQos(0, 1, false);//限制于消费级别
String messsageText = new String(message.getBody());
System.out.println("[receiver] receive message : "+ messsageText);
try {
if(validate(messsageText)){
System.out.println("[receiver] confirm");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息接收
}else{
System.out.println("[receiver] reject");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//拒绝消息接收
}
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean validate(String messsageText) {
return !(messsageText!=null && messsageText.indexOf("fuck")>-1);
}
}
-> @RabbitListener:监听的队列
-> @RabbitHandler:但@RabbitListener注解在类上时,需要使用@RabbitHandler来指明调用的方法。
-> void basicAck(long deliveryTag, boolean multiple) throws IOException; 确认消息接收 deliveryTag:该消息的index,multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
-> channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 拒绝消息接收 deliveryTag:该消息的index,multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息,requeue:被拒绝的是否重新入队列。
-> channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); 拒绝消息接收 deliveryTag:该消息的index,requeue:被拒绝的是否重新入队列,channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息。
-> chanel.basicQos(int prefetchSize, int prefetchCount, boolean global) 消息限流的功能,防止生产过多,导致消费者消费吃力的情况;
prefetchSize: 0表示对消息的大小无限制,单位为(B-字节)
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将 阻塞 掉,直到有消息ack。0为无上限
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别。
该图片来自RabbitMq实战.pdf
网友评论