美文网首页程序员
RabbitMQ 在Spring Boot中的简单应用

RabbitMQ 在Spring Boot中的简单应用

作者: 阿懒土灵 | 来源:发表于2018-09-06 20:30 被阅读46次

RabbitMQ作为消息中间件来使用,主要完成程序间的消息传递。那么至少有2个程序来配合,这里假设一个程序是MessageProvider,一个程序是MessageConsumer,程序都使用spring boot 搭建,集成RabbitMQ。


MessageProvider:

@Component
public class MessageProvider implements RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public Queue infoQueue() {
        return new Queue("info");
    }

    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    @Async
    @Transactional
    public void send(Object obj) {
            try {
                rabbitTemplate.convertAndSend("info",obj, new 
                        CorrelationData(String.valueOf(obj.getId())));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

    /**
     * 异步消息确认
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId();
        if (ack) {
            doSomething(id);
        } else {
            doSomething(id);
        }
    }
}

application.yml中rabbitMQ配置:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: 123456
    publisher-confirms: true
  1. 该类需要继承RabbitTemplate.ConfirmCallback接口,并手动实现消息确认功能。
  2. rabbitTemplate.convertAndSend负责发送消息到指定的queue,如果发送的是对象,则该对象实现序列化接口:Serializable,同时程序发送的对象所在的包必须和接收程序该对象的包一致。
  3. 消息相关对象correlationData可以通过构造器传入消息的特征,比如id之类的,便于消息确认发送成功的是哪条数据。
  4. confirm方法中参数ack如果为true,则rabbitMQ成功接收到消息,flase则为未接收到。
  5. new Queue("info"),默认是生成持久化的queue,即未及时消费的消息会存储到磁盘,RabbitMQ重启之后,数据任然存在,保证不丢失。

接下来看一下消费者
MessageConsumer:

@Component
@RabbitListener(queues = "info")
public class MessageConsumer{

    @RabbitHandler
    public void messageReceive(Object obj, Channel channel, Message message) {
        System.out.println(obj.getId())
        try {
            //消息消费确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

application.yml:

spring:
  rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: root
      password: 123456
      publisher-confirms: true
      listener:
          simple:
            prefetch: 5
      #      采用手动应答
            acknowledge-mode: manual
    #        当前监听容器数
            concurrency: 1
    #        最大数
            max-concurrency: 1
    #        是否支持重试
            retry:
              enabled: true
  1. @RabbitListener注解如果放在类上,则类里面的添加了@RabbitHandler的方法都会去监听同一个queue。@RabbitListener也可以放在方法上,让该方法去指定监听哪个queue。
  2. 如果配置文件开启了重试,则消息消费之后如果没有成功返回给RabbitMQ确认消费,则程序会重新尝试获取该条信息,默认尝试次数为3次,如果3次都失败,则消息从unacked状态变为ready状态,会等待发送给其他监听器。
  3. 配置文件中prefetch的意思是,同一时间进入到待发送队列的消息数量,进入该待发送队列的消息不会再发送给其他监听者。这个待发送队列有别于queue,是为了应付多个监听时,防止一个消息同一时间被多个监听器获取到。
  4. acknowledge-mode:manual开启手动消息确认机制,只有RabbitMQ在收到确认后才会将消息从queue中删除。
  5. concurrency设置的是一个监听器开启几个线程去监听消息;max-concurrency设置的是多个监听器最多可以开启的线程数。
  6. 如果获取的消息是java对象,则该对象必须和发送程序的对象一模一样,实现同样的Serializable接口,该类所在的包路径也必须一致。
  7. 如果开启多个程序去监听同一个queue,则RabbitMQ会平均分配消息到各个程序。当然通过RabbitMQ的路由功能可以设置多种消息分发机制。

So,介绍就这么多,欢迎交流!

相关文章

网友评论

    本文标题:RabbitMQ 在Spring Boot中的简单应用

    本文链接:https://www.haomeiwen.com/subject/zylfgftx.html