美文网首页
RabbitMQ详解以及和SpringBoot整合

RabbitMQ详解以及和SpringBoot整合

作者: habit_learning | 来源:发表于2018-09-25 14:24 被阅读317次

    各消息组件的适用场景

    • ActiveMQ
      ActiveMQ 是一款比较早期的消息组件,由Apache开源出来的,它能满足吞吐量一般的业务场景,但是对于高并发场景,性能较差。
    • Kafka
      Kafka追求高吞吐量的特性,它一开始使用于日志的收集。缺点是消息可靠性支持较少,适合产生大量数据的互联网服务的数据收集业务。
    • RocketMQ
      RocketMQ 早期由阿里团队开发的,现在升级为Apache的顶级项目。纯 Java 开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。缺点是,有些功能不是开源的,如消息事务。
    • RabbitMQ
      RabbitMQ 是由 Erlang 语言编写的,适合对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

    AMQP协议

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

    1、AMQP模型

    AMQP协议模型

    消息是直接由生产者发送到Exchange中的,然后Exchange和Message Queue之间通过某种规则关联起来(后面会讲);消费者是直接订阅Message Queue的,只要Queue里面有消息,就会被消费者消费。这样就实现了生产者和消费者之间的低耦合性。

    2、AMQP核心概念

    • Channel:网络通信,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。
    • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。
    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。
    • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
    • Queue:也称 Message Queue,消息队列,保存消息,并将它们转发给消费者。
    • Binding:Exchange和Queue之间的虚拟连接,binding 中可以指定 routing key。

    RabbitMQ

    RabbitMQ 是基于AMQP协议开发的消息组件.。

    1、RabbitMQ的整体架构

    RabbitMQ的整体架构

    生产者发送消息到Exchange中的,然后Exchange和Message Queue之间通过Routing key建立路由规则,把消息发送给特定的Queue,然后消息推送给订阅了该Queue的消费者。


    RabbitMQ消息流转图

    生产者发送消息时,需要指定两个参数,Exchange和Routing key,如果Exchange不指定(即为空),则会采用默认的AMQP default的Exchange,其会根据Routing key的值,路由到对应的 Queue。

    2、Exchange

    Exchange有四种类型,分别是Direct,Topic,Fanout ,Header。

    • Direct Exchange
      所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
      注意:Direct模式可以使用RabbitMQ自带的Exchange:AMQP default,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
      exchange在和queue进行binding时会设置routingkey:
    channel.QueueBind(queue: "create_pdf_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_create",
                        arguments: null);
    

    然后我们在将消息发送到exchange时会设置对应的routingkey:

    channel.BasicPublish(exchange: "pdf_events",
                            routingKey: "pdf_create",
                            basicProperties: properties,
                            body: body);
    

    在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。
    具体流程如下:


    Direct 流程
    • Topic Exchange
      此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:“*”,“#”,其中 “*” 表示匹配一个单词, “#”则表示匹配没有或者多个单词。


      Topic 流程

      举个栗子:


      Topic Exchange
    • Fanout Exchange
      此exchange的路由规则很简单,直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。


      Fanout 流程
    • Header Exchange
      此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:
    Dictionary<string, object> aHeader = new Dictionary<string, object>();
    aHeader.Add("format", "pdf");
    aHeader.Add("type", "report");
    aHeader.Add("x-match", "all");
    channel.QueueBind(queue: "queue.A",
                        exchange: "agreements",
                        routingKey: string.Empty,
                        arguments: aHeader);
    

    其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。
    在发布消息的时候就需要传入header值:

    Properties properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
    mHeader1.Add("format", "pdf");
    mHeader1.Add("type", "report");
    properties.Headers = mHeader1;
    

    以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。
    header类型用的比较少,但还是知道一点好。

    RabbitMQ与SpringBoot整合

    • 添加 amqp 依赖
          <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
          </dependency>
    
    • 配置文件 application.properties
    # rabbitmq
    spring.rabbitmq.host=node2
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    # 对 rabbitmqTemplate 进行监听,当消息由于server的原因无法到达queue时,就会被监听到,以便执行ReturnCallback方法
    # 默认为false,Server端会自动删除不可达消息
    spring.rabbitmq.template.mandatory=true
    
    # 消费端手动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    # 并发消费
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    
    • 生产者
    
    /**
     * @author K. L. Mao
     * @create 2018/9/20
     */
    @Service
    public class RabbitSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 用于监听Server端给我们返回的确认请求,消息到了exchange,ack 就返回true
         */
        private final RabbitTemplate.ConfirmCallback  confirmCallback = (correlationData, ack, cause) -> {
            System.out.println("correlationData:" + correlationData);
            System.out.println("ack:" + ack);
            if (ack){
                System.out.println("将msg-db数据更新为处理成功");
            } else {
                System.out.println("记录异常日志...,后续会有补偿机制(定时器)");
            }
        };
    
        /**
         * 监听对不可达的消息进行后续处理;
         * 不可达消息:指定的路由key路由不到。
         */
        private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText,
        exchange, routingKey) -> System.out.println("return exchange:" + exchange + ", routingKey:" + routingKey +
                ", replyText:" + replyText);
    
        /**
         * 发送消息
         * @param order
         */
        public void sendOrder(Order order) {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            CorrelationData cd = new CorrelationData();
            // 消息唯一标识
            cd.setId(UUID.randomUUID().toString().replace("-","") + DateUtils.formatDate(new Date(), "yyyyMMdd"));
            rabbitTemplate.convertAndSend("exchange-2", "springboot.abc", order, cd);
        }
    }
    
    • 消费者
    /**
     * @author K. L. Mao
     * @create 2018/9/20
     */
    @Service
    public class RabbitConsumer {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "queue-2", durable = "true"),
                exchange = @Exchange(value = "exchange-2",
                        durable = "true", type = "topic",
                        ignoreDeclarationExceptions = "true"),
                key = "springboot.#")
        )
        @RabbitHandler
        public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> properties, Channel channel) throws IOException {
            System.out.println("消费端 order:" + order);
            // deliveryTag: 对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序(index)来表示,一条消息就为1
            Long deliveryTag = (Long) properties.get(AmqpHeaders.DELIVERY_TAG);
            System.out.println("deliveryTag:" + deliveryTag);
            // 限流处理:消息体大小不限制,每次限制消费一条,只作用于该Consumer层,不作用于Channel
            channel.basicQos(0, 1, false);
            // 手工ACK,不批量ack,multiple:当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
            channel.basicAck(deliveryTag, false);
        }
    

    可以直接使用@RabbitListener注解,声明Queue和Exchange以及Binding关系。
    消费端接收的消息是Message对象,结构为:

    package org.springframework.messaging;
    
    public interface Message<T> {
        T getPayload();
    
        MessageHeaders getHeaders();
    }
    

    我们可以直接通过注解@Payload 获取我们传输的数据,通过注解@Headers 获取消息请求头。
    这里我们增加了消息限流的功能,防止生产过多,导致消费者消费吃力的情况:
    channel.basicQos(0, 1, false):0表示对消息的大小无限制,1表示每次只允许消费一条,false表示该限制不作用于channel。
    同时,我们这里采用手工ACK的方式,因为我们配置文件配置了spring.rabbitmq.listener.simple.acknowledge-mode=manual:
    channel.basicAck(deliveryTag, false):deliveryTag表示处理的消息条数(一般为1),从heaers中取,false表示不批量ack。

    DLX(死信队列)

    • DLX定义
      DLX为Dead Letter Exchange,死信队列。当一个消息在一个队列中变成死信(dead message)之后,它能重新publish到另一个Exchange,那么这个让消息变为死信的Exchange就是DLX(死信队列)。
    • 消息变成死信的几种情况
      1、消息被拒绝,ack为false,并且 requeue=false;
      2、消息TTL(Time To Live)过期,指消息达到了过期时间;
      3、队列达到最大长度。
    • 死信队列代码演示:
      1、声明一个死信Exchange、Queue以及Binding
       /**
         * 声明一个死信交换机,不一定为Topic Exchange, 和交换机类型无关
         * @return
         */
        @Bean
        public Exchange deadLetterExchange(){
            return ExchangeBuilder.topicExchange("DL_EXCHANGE").durable(true).build();
        }
        /**
         * 声明一个死信队列,并且配置转发交换机
         * @return
         */
        @Bean
        public Queue deadLetterQueue(){
            Map<String, Object> args = new HashMap<>(2);
            // x-dead-letter-exchange 声明 死信需要转发的交换机
            args.put("x-dead-letter-exchange", "FORWARD_EXCHANGE");
            // x-dead-letter-routing-key 如果没有指定,那么消息本身使用的 routing key 将被使用。即DL.xxx
            return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
        }
        /**
         * 死信队列进行绑定,
         * @return
         */
        @Bean
        public Binding deadLetterBinding(){
            return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("DL.#").noargs();
        }
    

    2、声明一个转发队列,来接收死信消息

       /**
         * 声明一个转发交换机
         * @return
         */
        @Bean
        public Exchange forwardExchange(){
            return ExchangeBuilder.topicExchange("FORWARD_EXCHANGE").durable(true).build();
        }
        /**
         * 转发队列,即requeue、过期(TTL)、超过队列容量,就会被转发到此队列
         * @return
         */
        @Bean
        public Queue forwardQueue(){
            return QueueBuilder.durable("FORWARD_QUEUE").build();
        }
        /**
         * 绑定转发交换机和队列,这里的routing一般为"#",因为我们上面死信队列没有设置x-dead-letter-routing-key,
         * 故被转发的消息会携带原消息的routingKey,即DL.xxx,为了能够被路由到FORWARD_QUEUE,故最好"#"
         * @return
         */
        @Bean
        public Binding forwardBinding(){
            return BindingBuilder.bind(forwardQueue()).to(forwardExchange()).with("#").noargs();
        }
    

    3、对发送的消息设置TTL,模拟DLX场景

        /**
         * 发送死信队列
         * @param msg
         */
        public void sendDlx(String msg) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            MessagePostProcessor messagePostProcessor = message -> {
                MessageProperties messageProperties = message.getMessageProperties();
                // 设置编码
                messageProperties.setContentEncoding("UTF-8");
                // 设置过期时间 10s,到了过期时间还没被消费,则会进入死信队列
                messageProperties.setExpiration("10000");
                return message;
            };
            rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL.AAA", msg,
                    messagePostProcessor, correlationData);
        }
    

    这里设置了10s的过期时间,消息一开始是在DL_QUEUE队列上,如果10s之内还没被消费,则会进入FORWARD_QUEUE(转发)队列。
    以上的代码示例,通过DLX实现了延迟队列的功能,即将消息发送至DL_QUEUE队列上,10s之后才会被FORWARD_QUEUE消费(消费者监听该队列即可),从而起到了延迟消费的功能。

    可靠性投递解决方案

    可靠性投递,即保证消息的100%被消费。目前,互联网大厂主要的解决方案,有两种:
    1、消息落库,对消息状态进行打标
    2、消息的延迟投递,做二次确认,回调检查

    • 消息落库


      消息落库方案
    • 延迟投递


      延迟投递方案

      延迟投递方案相比消息落库方案,优势是在于把msg db剥离了核心业务,在大业务量的场景中,会减少核心业务的数据库压力(少了一次msg db的数据插入)。

    消息幂等性解决方案

    幂等性指的是,使用相同参数对同一资源重复调用某个接口的结果与调用一次的结果相同。

    • 可能导致消息出现非幂等的原因:


      消息的幂等性

      主要的原因是第三个,当消费端抛出异常,并且requeue=true(默认为true),消息会一直重新入队进行消费,这样就导致了重复消费。

    • 幂等性解决方案


      幂等性解决方案

      首先,给每条消息生成一个全局唯一的ID标识(messageId),然后在到达消费端时,先将消息(messageId作为主键)插入MSG DB数据库,然后再执行业务。这样如果消费端中途抛出了异常,重新入队消费时,由于还是同一个消息,则meesageId还是不变的,当插入MSG DB数据库时,会因为主键冲突而抛异常,此时我们捕捉该异常,并且拒绝该消息(channel.basicReject)即可。

    相关文章

      网友评论

          本文标题:RabbitMQ详解以及和SpringBoot整合

          本文链接:https://www.haomeiwen.com/subject/xtwinftx.html