SpringBoot RabbitMQ

作者: 万物归于简 | 来源:发表于2019-02-20 12:13 被阅读0次

RabbitMQ 介绍

RabbitMQ的流程是:生产者将消息发送到对应交换机上,交换机再将消息转发到绑定的队列上,消费者从绑定的队列获取消息进行消费。


image.png

交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

SpringBoot整合RabbitMQ

添加依赖:

<!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在application.yml文件中添加配置

spring:
  application:
    name: async-task
  rabbitmq:
    host: 192.168.255.255
    port: 5672
    username: xxx
    password: 123456

Direct模式

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个路由键(RoutingKey).当发送者发送消息的时候,指定对应的Key.当Key和消息队列的RoutingKey一致的时候,消息将会被发送到该消息队列中.

@Configuration
public class RabbitMQConfig{
    // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("queueName");
    }

    // 创建一个交换机
    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange("exchangeName");
    }

    // 把队列和交换机绑定在一起
    @Bean
    public Binding testBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routingKey");
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("exchangeName", "routingKey", "hello,rabbit~");
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

AmqpTemplate 发送的消息数据还可以是对象,但对象必须序列化

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    User user = new User("name","password");
    template.convertAndSend("exchangeName", "routingKey",user);
    }
}


// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(User user){
        log.info("------------user: {}",user);
    }

@RabbitListener 可以作用在类上,需要和 @RabbitHandler 配合使用

@Component
@RabbitListener(queues = "queueName")
public class TestConsumer {

    @RabbitHandler
    public void process(String data){
        log.info("------------data: {}",data);
    }

    @RabbitHandler
    public void process(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
    
    public void test(){
        log.info("------------");
    }
}

上面的TestConsumer 消费者接收所有路由键为 routingKey 的消息,队列中的消息会转发到被@RabbitHandler修饰的方法然后被消费,不同的消息类型被转发到对应的方法中。test()方法不会消费消息
RabbitMq 服务启动后会创建一个默认的DirectExchange,这个交换机只接收 路由键routingKey 和 队列名称相同的消息,所以direct模式可以简化:

@Configuration
public class RabbitMQConfig{
    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("routingKey");  // 队列名和routingKey相同
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("routingKey", "hello,rabbit~"); // 没有交换机名称,消息会被发送到默认交换机,然后被转发到 名称和routingKey相同的队列上
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "routingKey")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

Topic模式

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
通配符:* 表示一个词,# 表示零个或多个词
注意: 通配符是针对交换机的!!!也就是说消息进入交换机时才进行通配符匹配,匹配完了以后才进入固定的队列

@Configuration
public class RabbitMQConfig{

   // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

   // topit 模式

   @Bean(name="queueName1")
   public Queue queueMessage1() {
       return new Queue("queueName1");      // 定义第一个队列,名称为 queueName1
   }
   @Bean(name="queueName2")
   public Queue queueMessage2() {
       return new Queue("queueName2");     // 定义第二个队列,名称为 queueName2
   }
   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("exchangeName");  // 定义交换机
   }
   // 定义绑定关系,通过交换机 将名称为queueName1 的队列绑定到交换机上, routingKey 为 topic.key1
   @Bean
   public Binding bindingExchangeMessage(@Qualifier("queueName1") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.key1");
   }
   // 定义另一个绑定关系,通过交换机 将名称为queueName2 的队列绑定到交换机上 ,routingKey 是符合 通配符topic.#  的路由键
   // 如:topic.xx、topic.yy 等
   @Bean
   public Binding bindingExchangeMessages(@Qualifier("queueName2") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
   }

消费者

    @RabbitListener(queues="queueName1")    //监听器监听指定的Queue
    public void process1(String str) {    
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="queueName2")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

然后发送消息

// 2个消费者都会收到消息
template.convertAndSend("exchangeName", "topic.key1", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key2", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key3", "data info");

Fanout模式

fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
因此我们发送到交换机的消息会使得绑定到该交换机的每一个Queue接收到消息,这个时候就算指定了路由键(routingKey),或者规则(即上文中convertAndSend方法的参数2),也会被忽略!

// fanout模式
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

然后发送消息

template.convertAndSend("fanoutExchange", "", "data info"); // 第二个参数会被忽略

消费者

    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }

结果三个都收到消息


image.png

RabbitMQ 实现延迟队列

rabbitMQ可以通过死信机制来实现延迟队列的功能,一些概念:
1、TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
2、Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
3、Dead Letter Exchanges(DLX),即死信交换机
4、Dead Letter Routing Key (DLK),死信路由键
直接上代码:

@Configuration
public class RabbitMQConfig { 

    // 创建一个立即消费队列
    @Bean(QueueName.ImmediateQueue)
    public Queue immediateQueue() {
        return new Queue(QueueName.ImmediateQueue);
    }

    @Bean(ExchangeName.IMMEDIATE)
    public DirectExchange immediateExchange() {
        return new DirectExchange(ExchangeName.IMMEDIATE);
    }

    // 把 立即消费的队列 和 立即消费的exchange 绑定在一起
    @Bean
    public Binding immediateBinding(@Qualifier(QueueName.ImmediateQueue) Queue queue, @Qualifier(ExchangeName.IMMEDIATE) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.IMMEDIATE_ROUTING_KEY);
    }


    // 创建一个延时队列
    @Bean(QueueName.DelayQueue)
    public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();

        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", ExchangeName.IMMEDIATE);

        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key
        params.put("x-dead-letter-routing-key", RoutingKey.IMMEDIATE_ROUTING_KEY);

        // 设置队列中消息的过期时间,单位 毫秒
        params.put("x-message-ttl", 5 * 1000);

        return new Queue(QueueName.DelayQueue, true, false, false, params);
    }

    @Bean(ExchangeName.DELAY)
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(ExchangeName.DEAD_LETTER);
    }

    // 把 延迟消费的队列 和 延迟消费的exchange 绑定在一起
    @Bean
    public Binding delayBinding(@Qualifier(QueueName.DelayQueue) Queue queue, @Qualifier(ExchangeName.DELAY) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.DELAY_KEY);
    }
}

@Component
public class TestConsumer {
    @RabbitListener(queues = QueueName.ImmediateQueue)
    public void process2(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
}

// 常量
public interface ExchangeName {
    String IMMEDIATE = "immediate";
    String DELAY = "delay";
}
public interface QueueName {
    String ImmediateQueue = "ImmediateQueue";
    String DelayQueue = "DelayQueue";
}
public interface RoutingKey {
    String IMMEDIATE_ROUTING_KEY = "immediate.key";
    String DELAY_KEY = "delay.key";
}
image.png

过程:
1、先创建一个普通队列,即上面的 ImmediateQueue,创建一个普通交换机 immediateExchange,绑定两者。
2、创建一个延迟队列,即创建时设置了参数:x-dead-letter-exchange,x-dead-letter-routing-key,x-message-ttl,该队列就相当于是一个延迟队列了
3、创建延迟交换机(其实也是普通交换机),和延迟队列绑定
4、给ImmediateQueue创建监听消费者,注意,延迟队列不要设置监听消费者,不然延迟队列就变成普通队列了,不起作用
到此延迟队列已完成,直接发送消息到延迟交换机即可

        UserInfo userInfo = new UserInfo();
        userInfo.setPhone("15800000000");
        userInfo.setUserName("aaaaaaa");

        log.info("开始发送消息");
        template.convertAndSend(ExchangeName.DELAY, RoutingKey.DELAY_KEY, userInfo);

原理:发送消息到延迟交换机,延迟交换机将消息转发到延迟队列,因为延迟队列没有监听消费者,所以消息不会被消费,直到消息超过存活时间(即 延迟)变成死信,这时延迟队列会将死信转发到死信交换机,即上面的immediateExchange(因为延迟队列绑定的死信交换机x-dead-letter-exchange指向了immediateExchange),immediateExchange将消息转发给ImmediateQueue,然后被监听消费者消费

测试结果:


image.png

可以看出过了5秒才消费消息

注意: 一个延迟队列只能设置一个存活时间,即该延迟队列里面的所有消息的存活时间都必须一致,如果需要设置不一样的存活时间,只能再创建一个延迟队列。原因是延迟队列并不会去扫描队列里面所有消息的存活时间,只会判断队列头的第一个消息是否过期,若过期了就转发消息,否则一直等待,即使队列后面已经有消息先过期,也只能等前面的消息被转发后,该消息才被转发。

消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。
生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。
在配置文件中添加:

spring:
 application:
   name: async-task
 rabbitmq:
   host: 192.168.0.0
   port: 5672
   username: username
   password: password
   publisher-confirms: true  # 开启发送确认
   publisher-returns: true   # 开启发送失败退回
   template:
     mandatory: true
   listener:
     type: simple
     simple:
       acknowledge-mode: manual # 开启消息消费手动确认

在RabbitMQConfig 中添加如下配置

    @Bean
    public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 消息发送到交换器Exchange后触发回调
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //  可以进行消息入库操作
                log.info("消息唯一标识 correlationData = {}", correlationData);
                log.info("确认结果 ack = {}", ack);
                log.info("失败原因 cause = {}", cause);
            }
        });

        // 配置这个,下面的ReturnCallback 才会起作用
        template.setMandatory(true);
        // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  可以进行消息入库操作
                log.info("消息主体 message = {}", message);
                log.info("回复码 replyCode = {}", replyCode);
                log.info("回复描述 replyText = {}", replyText);
                log.info("交换机名字 exchange = {}", exchange);
                log.info("路由键 routingKey = {}", routingKey);
            }
        });

        return template;
    }

成功确认:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。
消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

失败确认:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:该消息的index。
multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
requeue:是否重新入队列。

拒绝
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index。
requeue:被拒绝的是否重新入队列。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息

消费者消息确认代码:

@Component
public class OrderConsumer {

    @Autowired
    private TaskOrderFeign orderFeign;

    @RabbitListener(queues = QueueName.OrderCancelQueue)
    public void process(OrderDTO orderDTO, Message message, Channel channel){

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("---消费消息---------deliveryTag = {} ,  orderDTO: {}",deliveryTag ,orderDTO);

            // 取消订单
            orderDTO.setState(OrderStateEnum.DELAY_CANCEL.code);
            Response response = orderFeign.cancelOrder(orderDTO);

            //TODO 判断结果,是否需要重试

            // 成功确认消息
            channel.basicAck(deliveryTag, true);

        } catch (IOException e) {
            log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
            // 重新确认
            // 成功确认消息
            try {
                Thread.sleep(50);
                channel.basicAck(deliveryTag, true);
            } catch (IOException | InterruptedException e1) {
                log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
                // 可以考虑入库
            }

        } catch (Exception e) {

            log.error("取消订单失败 , e = {}", PrintUtil.print(e));

            try {
                // 失败确认
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e1) {
                log.error("消息失败确认失败 , e1 = {}", PrintUtil.print(e1));
            }
        }
    }
}

相关文章

网友评论

    本文标题:SpringBoot RabbitMQ

    本文链接:https://www.haomeiwen.com/subject/csbcyqtx.html