美文网首页MQ专题
【RabbitMQ】Spring-Boot 整合 使用教程 可靠

【RabbitMQ】Spring-Boot 整合 使用教程 可靠

作者: 植富宝 | 来源:发表于2019-02-27 19:38 被阅读0次

    ==1. 简介==

    1.1 RabbitMQ定义

    RabbitMQ是一个消息代理和队列服务器,用来在不同应用之间共享数据,是Erlang语言开发的,基于AMQP协议。
    

    1.2 AMQP定义

    是一个二进制协议。
    

    1.3 AMQP协议模型

    AMQP协议模型

    1.4 核心概念

    1. Server:Broker,接受客户端连接
    2. Connection:连接,应用程序与Broker的网络连接
    3. Channel:网络信道,Channel是消息读写的通道
    4. Message:消息,传递的数据,有properties何body组成,properties是消息的属性(可以设置顺序ID),body是消息内容
    5. Virtual-Host:虚拟地址,用于"逻辑隔离",最上层的"消息路由",一个Virtual-Host中有多个Exchange和Queue,但是不能有同名的
    6. Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列
    7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing-key
    8. Routing-key:一个路由规则,虚拟机可用他来确定如何路由一个特定消息
    9. Queue:消息队列,保存消息并将它们转发给消费者
    

    1.5 整体架构图

    整体架构图

    1.6 消息流转图

    消息流转图

    1.7 交换机图

    交换机图

    ==2. 安装与配置==

    2.1 准备

    1. rabbitMQ版本要与erlang版本对应起来
    2. rabbitMQ-rpm和erlang-rpm可以去官网下载,tcp_wrappers、socat可以去https://pkgs.org下载
    

    2.2 安装+启动

    1. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
    2. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
    3. rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm    如果上一步提示缺少socat
    4. rpm -ivh tcp_wrappers-7.6-77.el7.x86_64.rpm  如果上一步提示缺少tcp_wrappers
    5. rabbitmq-server start &
    6. rabbitmqctl stop_app
    7. rabbitmq-plugins enable rabbitmq_managment
    

    2.3 常用命令

    1. rabbitmqctl stop_app
    2. rabbitmqctl start_app
    3. rabbitmqctl status 节点状态
    4. rabbitmqctl list_users 列出所有用户
    5. rabbitmqctl list_user_permissions username 列出用户权限
    6. rabbitmqctl change_password username newpwd 修改用户密码
    7. rabbitmqctl list_vhosts 列出所有虚拟主机
    8. rabbitmqctl list_permissions -p vhostpath 列出该虚拟主机的所有权限
    9. rabbitmqctl list_queues 列出所有队列
    10. rabbitmqctl reset 移除所有数据
    11. rabbitmqctl join_cluster <cluster-node> [--ram] 组成集群命令
    12. rabbitmqctl cluster_status 查看集群状态
    

    ==3. Exchange==

    3.1 交换机属性

    1. name:交换机名称
    2. type:交换机类型(direct、topic、fanout、headers)
    3. durability:是否需要持久化,true为持久化
    4. auto-delete:Exchange上的最后一个Queue被删除后,自动删除该Exchange
    5. arguments:自定义参数
    

    3.2 DirectExchange-直连

    发送到DirectExchange的消息,会被转发到RouteKey中指定的Queue。
    Direct可以使用Default-Exchange,不需要进行任何的binding操作,消息传递时,RouteKey必须完全匹配。
    
    DirectExchange

    3.3 TopicExchange-匹配

    发送到TopicExchange的消息,会被转发到,匹配RouteKey中指定的Queue。
    #:匹配多个词
    *:匹配一个词
    
    TopicExchange

    3.4 FanoutExchange

    不处理路由键,只要将队列绑定到交换机上;
    发送到交换机上的消息,都会被转发到,与该交换机绑定的所有队列上;
    FanoutExchange转发消息是最快的;
    
    FanoutExchange

    ==4. Binding+Queue+Message+Virtual==

    4.1 Binding-绑定

    Exchange<-->Exchange,Exchange<-->Queue,他们之间的绑定关系
    Binding中可以包含RouteKey或者参数
    

    4.2 Queue-消息队列

    实际存储消息数据
    Durability:是否持久化,Durable:是,Transient:否
    Auto-Delete:如果yes,则最后一个监听被移除后,该Queue也会自动被删除
    

    4.3 Message-消息

    应该程序和服务器之间传递的数据,由Properties(可以设置顺序ID)和Body组成
    常用属性:delivery_mode、headers(自定义属性)、correlation_id:唯一id、expiration:过期时间
    

    4.4 Virtual-Host-虚拟主机

    虚拟地址,用于逻辑隔离,最上层的消息路由
    一个Virtual-Host可以有若干个Exchange和Queue,但是同一个Virtual-Host中不能有同名的Exchange和Queue
    

    ==5. 高级特性==

    5.1 消息如何保证100%的投递成功

    1. 消息落库,对消息状态进行打标

    image

    2. 消息延迟投递,做二次确认,回调检查

    image

    5.2 幂等性

    1. 定义

    幂等性 就是防止高并发的情况下,执行结果都是唯一的。
    消费端实现幂等性,就是消息永远被消费一次。
    

    2. 解决方案

    1. 唯一ID+指纹码,利用数据库主键去重
        SELECT COUNT(1) FROM T_ORDER WHERE 唯一ID + 指纹码
        COUNT(1) == 0,则INSERT;
        好处:简单
        坏处:高并发下有数据库写入的性能瓶颈
        解决:根据ID进行分库分表,进行算法路由
        
    2. 利用redis的原子性实现
        setnx key value、exists key、redis的自增
        问题:
            数据是否需要落库,落库的话,缓存和数据库如何保证原子性?
            数据不落库,如何设置定时同步策略?
    

    5.3 confirm确认消息

    1. 在channel中开启确认模式:channel.confirmSelect();
    2. 在channel中添加监听:addConfirmListener();
    3. 发生Nack的情况:磁盘写满、Queue达到上线、MQ其他异常
    4. ack和Nack都收不到的情况:就要定时任务去处理
    

    5.4 return消息机制

    如果发送的消息,Exchange不存在或者RouteKey路由不到,这时就需要returnListener。
    Mandatory:true-监听器接受到这些不可达的消息,false-broker会自动删除这些消息。
    消费端自定义监听:继承DefaultConsumer
    
    image

    5.5 消费端限流

    生产端不会限流,只有消费端限流;当机器突然有上万条消息,不做限流,可能会导致消费端服务器崩溃。
    RabbitMQ提供了qos功能:非自动签收消息的情况下,一定数量消息未被确认前(通过consumer或channel设置qos值),不进行消费新的消息
    void BasicQos(uint prefetchSize = 0 不限制消息大小, 
                  ushort prefetchCount = 1 一次处理1条,手动ack后,在处理另一条, 
                  bool global = false 这个限制是channel级别还是consumer级别);
    consumer-->handleDelivery-->channel.basicAck(envelope.getDeliveryTag(), false);
    consumer-->handleDelivery-->channel.basicNack(envelope.getDeliveryTag(), false, true-->重发);
    

    5.6 TTL队列/消息

    Time To Live 生存时间
    支持消息的过期时间和队列的过期时间
    

    5.7 DLX-死信队列

    当消息变成死信(没有被消费者消费掉)的时候,他将被重新发送到另一个Exchange,这个Exchange就是死信队列
    消息变成死信的情况:
        1. 消息被拒绝(basic.reject/basic.nack)并且requeue=false
        2. TTL过期
        3. 队列打到最大长度
    在队列上添加:arguments.put("s-dead-letter-exchange", "dlx.exchange");
    

    ==6. Spring-Boot-Demo==

    6.1 pom依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    6.2 application.yml

    1. 公共配置
    spring:
      rabbitmq:
        addresses: 192.168.11.76:5672
        username: guest
        password: guest
        virtual-host: /
        connection-timeout: 15000
    
    2. 生产端配置
        publisher-confirms: true
        publisher-returns: true
        template:
          mandatory: true # 保证监听有效
    
    3. 消费端配置
        listener:
          simple:
            acknowledge-mode: manual
            concurrency: 5
            max-concurrency: 10
          order:
            key: springboot.*
            queue:
              name: queue-1
              durable: true
            exchange:
              name: exchange-1
              durable: true
              type: topic
              ignoreDeclarationExceptions: true    
    

    6.3 生产端

    @Component
    public class RabbitSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        // 回调函数: confirm确认
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if(!ack){
                    System.err.println("异常处理...");
                }
            }
        };
        
        // 回调函数: return返回
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                    String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
        
        // 发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            CorrelationData correlationData = new CorrelationData("1234567890"); // id + 时间戳 全局唯一 
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        }
        
        // 发送消息方法调用: 构建自定义对象消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            CorrelationData correlationData = new CorrelationData("0987654321"); //id + 时间戳 全局唯一 
            rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
        }
    }
    

    6.4 消费端

    @Component
    public class RabbitReceiver {
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1", durable="true"),
                exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*"))
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception {
            System.err.println("消费端Payload: " + message.getPayload());
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            // 手动ACK
            channel.basicAck(deliveryTag, false);
        }
        
        @Value("spring.rabbitmq.listener.order.key")
        private String orderKey;
        
        @Value("spring.rabbitmq.listener.order.queue.name")
        private String orderQueueName;
        
        @Value("spring.rabbitmq.listener.order.queue.durable")
        private String orderQueueDurable;
        
        @Value("spring.rabbitmq.listener.order.exchange.name")
        private String orderExchangeName;
        
        @Value("spring.rabbitmq.listener.order.exchange.durable")
        private String orderExchangeDurable;
        
        @Value("spring.rabbitmq.listener.order.exchange.type")
        private String orderExchangeType;
        
        @Value("spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions")
        private String orderExchangeIgnoreDeclarationExceptions;
        
        @RabbitListener(bindings = @QueueBinding(value = @Queue(value = orderQueueName, durable = orderQueueDurable),
                exchange = @Exchange(value = orderExchangeName, durable = orderExchangeDurable, type = orderExchangeType, ignoreDeclarationExceptions = orderExchangeIgnoreDeclarationExceptions),
                key = orderKey))
        @RabbitHandler
        public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, 
                Channel channel, 
                @Headers Map<String, Object> headers) throws Exception {
            System.err.println("消费端order: " + order.getId());
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手动ACK
            channel.basicAck(deliveryTag, false);
        }
    }
    

    ==7. Spring-Cloud-Stream==

    7.1 架构图

    image
    image
    image

    7.2 概念

    Barista接口:用来定义通道的类型和名称,通道名称作为配置用,通道类型作为该通道是发送消息还是接受消息
    @output:输出注解
    @input:输入注解
    @StreamListener:监听消息注解
    

    7.3 Demo

    7.3.1 pom依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        <version>1.3.4.RELEASE</version>
    </dependency>
    

    7.3.2 producer-application.yml

    spring:
      cloud:
        stream:
          bindings:
            output_channel:
              destination: exchange-3
              group: queue-3
              binder: rabbit-cluster
          binders: 
            rabbit-cluster: 
              type: rabbit
              environment: 
                spring:
                  rabbitmq:
                    addresses: 192.168.11.76:5672
                    username: guest
                    password: guest
                    virtual-host: /
    

    7.3.3 定义通道

    public interface Barista {
    
        String OUTPUT_CHANNEL = "output_channel";  
       
        // @Output声明了它是一个输出类型的通道,名字是output_channel。 
        @Output(Barista.OUTPUT_CHANNEL)
        MessageChannel logoutput();  
    }  
    

    7.3.4 发送消息

    @EnableBinding(Barista.class)
    @Service  
    public class RabbitmqSender {  
      
        @Autowired  
        private Barista barista;  
        
        // 发送消息
        public String sendMessage(Object message, Map<String, Object> properties) throws Exception {  
            try{
                MessageHeaders mhs = new MessageHeaders(properties);
                Message msg = MessageBuilder.createMessage(message, mhs);
                boolean sendStatus = barista.logoutput().send(msg);
                System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
            }catch (Exception e){  
                e.printStackTrace();
            }  
            return null;
        }  
    } 
    

    7.3.5 consumer-application.yml

    spring:
      cloud:
        stream:
          bindings:
            input_channel:
              destination: exchange-3
              group: queue-3
              binder: rabbit-cluster
              consumer:
                concurrency: 1
    
          rabbit:
            bindings:
              input_channel:
                consumer:
                  requeue-rejected: false # 是否支持重发
                  acknowledge-mode: MANUAL # 手动签收
                  recovery-interval: 3000 # 3s重连
                  durable-subscription: true # 是否启用持久化订阅
                  max-concurrency: 5 # 最大监听数
                  
          binders:
            rabbit-cluster:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    addresses: 192.168.11.76:5672
                    username: guest
                    password: guest
                    virtual-host: /
    

    7.3.6 定义通道

    public interface Barista {
          
        String INPUT_CHANNEL = "input_channel";  
    
        // @Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL。
        @Input(Barista.INPUT_CHANNEL)  
        SubscribableChannel loginput();
    }  
    

    7.3.7 消费消息

    @EnableBinding(Barista.class)
    @Service
    public class RabbitmqReceiver {  
    
        @StreamListener(Barista.INPUT_CHANNEL)  
        public void receiver(Message message) throws Exception {  
            Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            System.out.println("Input Stream 1 接受数据:" + message);
            channel.basicAck(deliveryTag, false);
        }  
    }  
    

    ==8. RabbitMQ集群架构模式==

    8.1 简介

    1. 主备模式:实现高可用集群,一般在并发和数据量不高的情况下使用,也称Warren模式。
    2. 远程模式:实现双活的模式,也称Shovel模式,消息进行不同数据中心的复制工作,可以跨地域的两个MQ集群互联。
    3. 镜像模式:也称Mirror模式,保证100%数据不丢失,简单、用的多。
                镜像队列:保证数据高可靠性方案,主要是实现数据同步
    

    8.2 架构模式图

    8.2.1 镜像模式

    image

    8.2.2 多活模式

    image
    image

    ==9. 架构设计==

    9.1 SET化架构

    业务:解决业务遇到的扩展性和容灾等需求,支撑业务的高速发展
    通用性:架构形成统一解决方案,岸边各业务线接入使用
    
    image
    image
    image

    9.2 集群架构图

    集群架构图

    9.3 RabbitMQ-架构设计方案

    RabbitMQ-架构设计方案

    9.4 批量消息发送

    image

    9.5 顺序消息

    image

    9.6 事务消息发送

    image

    9.7 消息幂等性设计

    image

    相关文章

      网友评论

        本文标题:【RabbitMQ】Spring-Boot 整合 使用教程 可靠

        本文链接:https://www.haomeiwen.com/subject/zqhquqtx.html