美文网首页
springboot+mq

springboot+mq

作者: 啊了个支 | 来源:发表于2020-08-07 20:51 被阅读0次

springboot整合rabbitmq,支持消息确认机制

安装

推荐一篇博客 https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

项目结构

image

POM.XML

image

POM.XML

application.yml

需要将publisher-confrems设为true,启动确认回调, 将 publisher-returns设为true 确认返回回调

image

rabbitmq配置类--RabbitConfig

第一部分, 定义队列

image

第二部分,设置一些消息处理策略

image

/**

  • rabbitMq 配置类
  • @author milicool
  • Created on 2018/9/14 /
    @Configuration
    public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /
    *
  • 定义一个hello的队列
  • Queue 可以有4个参数
  •  1.队列名 
    
  •  2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true 
    
  •  3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false 
    
  •  4.exclusive     表示该消息队列是否只在当前connection生效,默认是false 
    */
    

@Bean
public Queue helloQueue() {
return new Queue("queue-test");
}
/** ======================== 定制一些处理策=============================*/

  • ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
  • ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack */
    @Bean public RabbitTemplate rabbitTemplate() {
    Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
    // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
    rabbitTemplate.setMandatory(true);
    // 消息返回, yml需要配置 publisher-returns: true
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); });
    // 消息确认, yml需要配置 publisher-confirms: true
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
    } else { log.debug("消息发送到exchange失败,原因: {}", cause); } });
    return rabbitTemplate; } }

生产者

/**

  • 生产者
  • @author milicool
  • Created on 2018/9/14
    /
    @Component
    public class Producer {
    @Autowired private RabbitTemplate rabbitTemplate;
    /
    *
  • 给hello队列发送消息 */
    public void send() { for (int i =0; i< 100; i++) { String msg = "hello, 序号: " + i; System.out.println("Producer, " + msg); rabbitTemplate.convertAndSend("queue-test", msg); } }
    }

消费者

/**

  • 消费者

  • @author milicool

  • Created on 2018/9/14
    */
    @Component
    public class Comsumer { private Logger log = LoggerFactory.getLogger(Comsumer.class);

    @RabbitListener(queues = "queue-test")
    public void process(Message message, Channel channel) throws IOException {
    // 采用手动应答模式, 手动确认应答更为安全稳定
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.info("receive: " + new String(message.getBody())); } }

测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() { producer.send(); }
}</pre>

[ 复制代码

](javascript:void(0); "复制代码")

测试结果

测试结果太长,没有截取全部,可以查看到消费者接收到了全部消息,如果有的消息在没有接收完,消息将被持久化,下次启动时消费

image

web端查看

image

感谢阅读 o(∩_∩)o

相关文章

  • springboot+mq

    springboot整合rabbitmq,支持消息确认机制 安装 推荐一篇博客 https://blog.csdn...

网友评论

      本文标题:springboot+mq

      本文链接:https://www.haomeiwen.com/subject/dduqdktx.html