Springboot 版本为2.3.10.RELEASE
消费端
在整合springboot的时候,个人认为有两种方式来消息确认,一种是完全使用配置的方式,一种是部分使用配置的方式
- 完全使用springboot配置的方式
application.yml
spring:
rabbitmq:
host: 192.168.137.141
port: 5672
username: duoduo
password: duoduo
virtual-host: /duoduo
#三种方式 SIMPLE() CORRELATED(执行ConfirmCallback) NONE(发送失败直接丢弃)
publisher-confirm-type: correlated
publisher-returns: true
listener:
type: direct #direct和simple
direct:
acknowledgeMode: manual #auto代表自动接收消息,manual手动确认消息
prefetch: 1 #这个就是basicQos,同时处理多少条消息
defaultRequeueRejected: true #消息拒绝是否重新入队
retry:
enabled: true
maxAttempts: 3
监听
@Configuration
public class RabbitMqConsumer {
public static AtomicInteger count = new AtomicInteger (0);
public static AtomicInteger count1 = new AtomicInteger (0);
/**
* @RabbitListener(queues = {"spirngboot_queue"})可以直接监听队列,前提是服务端已经创建了队列,交换机也绑定了队列
* 也可以创建交换机 队列 比如注释掉的代码
*/
//@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spirngboot_queue", durable = "true"),
//exchange = @Exchange(name = "spirngboot_queue_topic_exchange", durable = "true", type = "topic"),
//key = "springboot.#"))
@RabbitListener(queues = {"spirngboot_queue"})
@RabbitHandler
public void message(@Payload User user) throws IOException {
//这里是完全使用springboot的注解的方式,并且 acknowledgeMode: auto 要配置成auto的方式,配置成manual消息不会确定接收,
//但是这里有一个问题,如果使用了自定义序列化之后,配置成manual也可以正常接收
//配置文件中的retry,如果在接收消息的时候发生了异常那么会重试3次,3次之后消息就会丢弃掉,虽然配置了defaultRequeueRejected为true
//但是如果使用了自定义序列化之后,不管defaultRequeueRejected是否为true,消息拒绝之后会重新的放到队列中
//本人对这一地方不太了解
//实际生产中本人也不会经常使用纯配置的方式
try {
System.out.println (user.getName ());
} catch (Exception e) {
//todo 代码的回退,如果try里面操作了数据库,可以通过事务自动的回退,但是如果操作了redis,那需要对redis 的回退等操作
throw new IOException (e.getMessage ());
}
}
//配合@RabbitHandler注解实现了消息的序列化格式,这样可以直接传对象,而不用吧对象转为json字符串,而且Jackson2JsonMessageConverter序列化体积更小传输更快
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory ();
simpleRabbitListenerContainerFactory.setConnectionFactory (connectionFactory);
simpleRabbitListenerContainerFactory.setMessageConverter (new Jackson2JsonMessageConverter ());
return simpleRabbitListenerContainerFactory;
}
}
纯配置的方式,在监听的代码中只是接收消息就可以,不用手动的basicAck来确认消息,或者basicNack拒绝消息,纯配置的方式,对异常的捕获,之后是要继续手动抛出异常,默认重试3次,之后肯定不会重新放到队列中,本人没看过源码,看表象是这样的.而且acknowledgeMode:
Auto 才会接收消息
如果配置了@RabbitHandler 修改了序列化机制,上面的配置还是需要改变
本人对纯配置的方式理解不太透彻,希望大家给我意见.
- 使用手动的确认信息
@Configuration
public class RabbitMqConsumer {
public static Map<String, AtomicInteger> map = new HashMap<> ();
/**
* @RabbitListener(queues = {"spirngboot_queue"})可以直接监听队列,前提是服务端已经创建了队列,交换机也绑定了队列
* 也可以创建交换机 队列 比如注释掉的代码
*/
//@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spirngboot_queue", durable = "true"),
//exchange = @Exchange(name = "spirngboot_queue_topic_exchange", durable = "true", type = "topic"),
//key = "springboot.#"))
@RabbitListener(queues = {"spirngboot_queue"})
@RabbitHandler
public void message( Channel channel,Message message) throws Exception {
MessageProperties messageProperties = message.getMessageProperties ();
if(null==map.get("spirngboot_queue"+messageProperties.getDeliveryTag ())){
map.put ("spirngboot_queue" + messageProperties.getDeliveryTag (), new AtomicInteger ());
}
try {
//todo 逻辑代码,这里要注意的是,执行这段代码是多线程的,要注意多线程安全
System.out.println ("消息的消费"+new String(message.getBody ()));
//模拟异常,如果是操作了sevice层的数据库,发生异常可以事务回滚,如果包含其他操作,需要在catch中将操作回退,因为是多线程,要考虑线程安全问题
int i = 1 / 0;
//如果消费成功,则确认消息
channel.basicAck (messageProperties.getDeliveryTag (), false);
} catch (Exception e) {
//利用messageProperties.getDeliveryTag ()得到消息唯一的id,判断重复消息了几次,如果超过3次把最后的true改为false,将消息不重新放到队列中
//因为可能是分布式系统,可以使用reids来判断消息消费了几次,
//如果操过了3次,从redis中将messageProperties.getDeliveryTag ()删除即可
if(map.get("spirngboot_queue"+messageProperties.getDeliveryTag ()).addAndGet (1)<3){//判断消息失败消费了几次,如果操过能容忍的最大次数后将消息丢弃,这里可以使用死心队列接收失败的消息
System.out.println ("消息消费失败,重新放到队列中,失败消费次数"+map.get("spirngboot_queue"+messageProperties.getDeliveryTag ()).get ());
channel.basicNack (messageProperties.getDeliveryTag (), false, true);
}else{
System.out.println ("消息消费失败,超过3次,丢弃消息,可以放到死心队列中");
channel.basicNack (messageProperties.getDeliveryTag (), false, false);
}
}
}
//配合@RabbitHandler注解实现了消息的序列化格式,这样可以直接传对象,而不用吧对象转为json字符串,而且Jackson2JsonMessageConverter序列化体积更小传输更快
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory ();
simpleRabbitListenerContainerFactory.setConnectionFactory (connectionFactory);
simpleRabbitListenerContainerFactory.setMessageConverter (new Jackson2JsonMessageConverter ());
return simpleRabbitListenerContainerFactory;
}
}
生产者 这里使用的springboot 2.5.0版本
配置类
/**
* 保证消息的确认发送有三种方式,事务,Confirms和异步监听的方式
* 1.使用监听就是下面rabbitTemplateListener方式
* 2.Confirms方式为Web类的rabbitMq方法,Confirms可以统一确认和单调确认,实例中为统一确认
* 3.还有一种方式就是事务的方式,由于效率很低,一般很少使用,这里没有做介绍
* 效率上监听的效率要高于Confirms,实际生产上也是建议使用监听的方式
*/
@Configuration
public class RabbitMqConfiguration {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 下面是项目启动的时候创建了队列 交换机 并将队列和交换机绑定在一起
* 定义交换机的名字
*/
public static final String EXCHANGE_NAME = "spirngboot_queue_topic_exchange";
/**
* 定义队列的名字
*/
public static final String QUEUE_NAME = "spirngboot_queue";
/**
* 声明交换机
*/
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange (EXCHANGE_NAME).durable (true).build ();
}
/**
* 声明队列
*/
@Bean("bootQueue")
public Queue bootQueue() {
//指定队列的同时也指定了队列的最大优先级,发送消息的时候也要指定消息的优先级,rabbitmq 的管理页面的queue会有一个pri的标识
return QueueBuilder.durable (QUEUE_NAME).maxPriority (5).build ();
}
/**
* 队列与交换机进行绑定
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind (queue).to (exchange).with ("springboot.#").noargs ();
}
@PostConstruct
public void rabbitTemplateListener() {
//设置confirmeCallback 需要在配置文件中加publisher-confirm-type: correlated
rabbitTemplate.setConfirmCallback (new RabbitTemplate.ConfirmCallback () {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//ack 为 true表示 消息已经到达交换机,此时消息并没有到达队列
System.out.println (correlationData);
if (ack) {
//交换价接收消息成功 cause为null
System.out.println ("交换机接收成功消息");
} else {
//接收失败
System.out.println ("交换机接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//启动return机制的两种方式
/**1.配置文件中 publisher-returns: true
* 2.rabbitTemplate.setMandatory (true);
*/
//定义回调,交换机是否到达队列,发生在ack成功之后
rabbitTemplate.setReturnsCallback (new RabbitTemplate.ReturnsCallback () {
@Override
public void returnedMessage(ReturnedMessage returned) {
//获取交换机
System.out.println ("获取交换机" + returned.getExchange ());
//获取消息对象
System.out.println ("获取消息对象" + returned.getMessage ());
//获取错误码
System.out.println ("获取错误码" + returned.getReplyCode ());
//获取错误信息
System.out.println ("获取错误信息" + returned.getReplyText ());
//获取路由key
System.out.println ("获取路由key" + returned.getRoutingKey ());
}
});
}
}
发送消息
@Controller
@ResponseBody
public class Web {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("rabbitMq")
public void rabbitMq() {
//直接发送消息
// rabbitTemplate.convertAndSend ("spirngboot_queue_topic_exchange", "springboot1.HelloRabbitMq", "HelloRabbitMq" );
//使用了Jackson2JsonMessageConverter序列化,发送消息可以直接发送对象,而不是字符串
// rabbitTemplate.setMessageConverter (new Jackson2JsonMessageConverter ());
//将消息设置优先级
Boolean invoke = rabbitTemplate.invoke ((ops) -> {
for (int i = 0; i < 2; i++) {
User user = new User ("zhangsan"+i);
//偶数为优先级高的消息,接收消息的时候会发现偶数的消息比奇数的消息更早的被消费
if (i % 2 == 0) {
ops.convertAndSend ("spirngboot_queue_topic_exchange", "springboot.HelloRabbitMq", user.toString (),
message -> {
//设置优先级为5的消息,0最小 255最大
message.getMessageProperties ().setPriority (5);
return message;
});
} else {
ops.convertAndSend ("spirngboot_queue_topic_exchange", "springboot.HelloRabbitMq", user.toString (),
message -> {
message.getMessageProperties ().setPriority (0);
return message;
});
}
}
//如果2000毫秒之内没有收到服务端的确认消息,下面的invoke为false,此方法为阻塞方法
return rabbitTemplate.waitForConfirms (2000);
});
if (invoke) {
System.out.println ("消息发送成功-----");
} else {
System.out.println ("消息发送失败-----");
}
}
}
网友评论