RabbitMq

作者: 笔记本一号 | 来源:发表于2020-06-06 03:15 被阅读0次

    中间件大比拼

    主流的中间有ActiveMq、kafka、rocketMq、rabbitMq

    • ActiveMq是jms规范的中间,是老牌的中间件了,其优点就是API丰富、支持多种集群模式、并且与其他组件有非常完善和稳定的集成体系,但是在性能方面比上述的几个要低很多,适用于低并发场景
    • kafka是中间件业界的性能之王,追求高吞吐量,是开源的分布式发布订阅消息系统,其优点就是高性能,高可用,支持持久化,在廉价的机器中也能达到每秒处理100kb以上数据的能力,但是对消息的重复、丢失、错误没有严格的要求,也就是说kafka的特点就是对消息精准度要求低,对性能吞吐量要求高,低并发、高并发场景都可以适用,适用于对数据精度度要求不高的业务场景,常用于互联网的数据收集业务
    • rocketMq阿里巴巴开源的中间件,经过了数次淘宝双十一的洗礼后依然屹立不倒而声名鹊起,是纯java语言编写,rocketMq是基于kafka的基础上做的优化,其对kafka的消息传输可靠性上做了提升,并支持事务,所以rocketMq同样具备了高性能,高可用性的特点,适合大规模高并发的分布式系统,广泛应用于充值交易、消息推送等业务场景
    • rabbitMq是Erlang语言编写,它是开源的基于AMQP协议的消息代理和消息队列服务器,由于内部结构是基于AMQP规范,其消息通过内部路由,路由键,消息队列等组件的传送,对消息传递的稳定性,可靠性,安全性的要求比较高,甚至可以做到消息百分之百的精准投递,但是对性能的要求排在其次,适用于对数据的传输精准度、数据的一致性、安全性、可靠性很高的场景。

    总结
    在性能要求上kafka是最好的,在数据的传输的可靠性上rabbitMq是最好的,在性能和数据传输可靠性上择中的话就是rocketMq比较适合

    AMQP简述

    AMQP用于提供统一消息模式服务,它是应用层的高级消息队列模式,也是消息中间件设计规范(jms也是消息中间件设计规范)的一种,rabbitma就是遵循AMQP的规范

    AMQP模型结构
    image.png
    AMQP核心概念
    • server:又称broker,是AMQP的服务端,接受客户端的连接,发送消息到给客户端,实现了AMQP的实体服务
    • connection:连接,客户端与服务端的网络连接
    • channel:网络信道,客户端与服务端进行消息读写的通道。客户端可以建立多个channel,每个channel都代表着一个会话任务
    • message:是客户端与服务之间传送的消息,message由properties和body组成,body主要由消息的实际内容组成,而properties是由消息的高级特性组成,properties可以设用于置消息的优先级、延迟等属性
    • vitaulhost:虚拟地址,主要是进行逻辑隔离,rabbitMq中可以包含多个VitaulHost,每个vitaulhost中可以包含多个exchange和queue,但是同一个VitaulHost中不能含有相同名称的exchange和queue,客户端可以选择投递到哪个VitaulHost中,vitaulhost默认是"/"
    • exchange:交互机,绑定queue,并接收客户端消息,通过routingKey将消息投递到绑定的queue中
    • binding:exchange和queue的虚拟连接,bingding中包含了routingKey,每个binding都绑定着一个队列,当binding的routingKey和exchange的routingKey有对应的关系时(这个对应的关系还得看exchange的类型,例如Direct exchange就要求routingKey要完全相等才对应的上)exchange才会把相应的信息投递到bingding绑定的队列中
    • routingKey:路由规则,exchange通过routingKey知道如何进行路由
    • queue:消息队列,是存放消息的物理结构的定义,客户端通过监听queue即可获取消息

    rabbitmq

    概述

    rabbitmq是Erlang语言编写,它是开源的基于AMQP协议的消息代理和消息队列服务器,rabbitmq具有高性能、高可用、高可靠的特性,具有消息投递模式丰富的特点,rabbitmq用于使完全不同的应用之间能够共享数据互相通信。由于rabbitmq是使用Erlang编写,Erlang广泛应用于交换机领域,Erlang语言在数据交互和数据同步方面性能优秀与原生Socket一样有着出色的延迟,使得rabbitmq在数据交互上具备了天然的优势

    rabbitMq消息流转过程

    客户端(生产者)将预先指定消息投递到哪个VitaulHost中的路由(exchange)上,同时客户端也会指定routingKey,exchange通过消息的routingKey决定将消息投递到哪个binding的queue中,监听了相应的queue的客户端(消费者)可以从queue中获取相应的消息

    image.png

    rabbitMq收发消息的简单代码演示

    //properties文件配置
    rabbitmq.id=127.0.0.1
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    rabbitmq.virtualHost=testhost
    

    将rabbitMq注册的spring容器

    @Configuration
    @PropertySource("classpath:/myApplication.properties")
    @ComponentScan(value = "com.example.rabbitmq.demo")
    public class RabbitMqConfig {
        @Value("${rabbitmq.id}")
        private String ip;
        @Value("${rabbitmq.port}")
        private int port;
        @Value("${rabbitmq.username}")
        private String username;
        @Value("${rabbitmq.password}")
        private String password;
      /*  @Value("${rabbitmq.virtualHost}")
        private String virtualHost;*/
        @Bean
        public Connection connectionFactory(){
            try {
                ConnectionFactory connectionFactory=new ConnectionFactory();
                connectionFactory.setHost(ip);
                connectionFactory.setPort(port);
                connectionFactory.setVirtualHost("/");
                connectionFactory.setUsername(username);
                connectionFactory.setPassword(password);
                return connectionFactory.newConnection();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    

    消息调用

    @RestController
    public class Test {
        @Autowired
        private Connection connection;
        private Channel channel=null;
        @PostConstruct
        public void newChannel() throws Exception{
             this.channel = this.connection.createChannel();
        }
        @PreDestroy
        public void destroyChannel() throws Exception{
             this.channel.close();
             this.connection.close();
        }
        @GetMapping("/consummer")
        public String consummer(){
            if (this.connection!=null){
                try {
                    Channel channel = this.channel
                    DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                            long deliveryTag = envelope.getDeliveryTag();
                            //消息id
                            System.out.println("消息id:  " + deliveryTag);
                            //交换机
                            System.out.println("交换机:  " + envelope.getExchange());
                            //路由key
                            System.out.println("路由key: " + envelope.getRoutingKey());
                            //接受到的消息
                            System.out.println("收到的消息:  " + new String(body, "utf-8"));
                            System.out.println("---------------------------------");
                            /*channel.basicAck(deliveryTag, false);*/
                        }
                    };
                    String exchangeName="myexchange";
                    String exchangeType="direct";
                    String queueName="rutingkey";
                    String routingKey="rutingkey";
                    channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
           //创建队列("rutingkey","持久化","独占队列","脱离exchange是否被自动删除","拓展参数")
                    channel.queueDeclare(queueName,true,false,false,null);
     //设置exchange和queue的绑定关系,并且声明队列的routingKey
                    channel.queueBind(queueName, exchangeName, routingKey);
                    channel.basicConsume(queueName, true, defaultConsumer);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return "不是空的";
            }else {
                return "是空的";
            }
        }
    
        @GetMapping("/procuder")
        public String procuder(){
            if (this.connection!=null){
                try {
                    Channel channel = this.channel;
                    String exchangeName="myexchange";
                    String routingKey="rutingkey";
                    String s="helloword"+UUID.randomUUID().toString();
                       //发布消息时使用rutingkey规则上
                    channel.basicPublish(exchangeName,routingKey,null,s.getBytes());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return "不是空的";
            }else {
                return "是空的";
            }
        }
    
    }
    
    

    confirm模式

    当生产者发送消息的同时监听一个事件,如果消费者消费成功则监听到相应的ack

    生产者
    public class Producer {
        public static void main(String[] args) throws Exception {   
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setPassword("guest");
            connectionFactory.setUsername("guest");
            
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
    
            //4 指定我们的消息投递模式: 消息的确认模式 
            channel.confirmSelect();
            
            String exchangeName = "test_confirm_exchange";
            String routingKey = "confirm.save";
            //5 发送一条消息
            String msg = "Hello RabbitMQ Send confirm message!";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            //6 添加一个确认监听
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.err.println("-------no ack!-----------");
                }
                
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.err.println("-------ack!-----------");
                }
            });
    
        }
    }
    
    消费者
    public class Consumer {
        public static void main(String[] args) throws Exception {
            //1创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setPassword("guest");
            connectionFactory.setUsername("guest");
            
            //2 获取C onnection
            Connection connection = connectionFactory.newConnection();
            
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_confirm_exchange";
            String routingKey = "confirm.#";
            String queueName = "test_confirm_queue";
            
            //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //5 创建消费者 
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
            
            while(true){
                Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                
                System.err.println("消费端: " + msg);
            }
            
            
        }
    }
    
    

    限流

    如果消费端的消费速度跟不上生产端的消息生产速度,那么很有可能会造成,消息大量堆积,每次都会有大量的消息一次打到消费端,给消费端造成很大的压力

    设置手工签收,并且消费成功后向broker发送ack,才会让消费者继续消费消息,不然消息就会排队等待消费
    basicQos的参数:

    • prefetchSize:消费消息的大小,0表示不做限制;
    • prefetchCount:同时消费消息的数量;
    • global:限流策略应用的级别,true:Channel级别;false:consumer级别;
    生产者
    public class Producer {
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_qos_exchange";
            String routingKey = "qos.save";
            
            String msg = "Hello RabbitMQ QOS Message";
            
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
            
        }
    }
    
    
    消费者
    public class Consumer {
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            
            String exchangeName = "test_qos_exchange";
            String queueName = "test_qos_queue";
            String routingKey = "qos.#";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //参数1:消费消息的大小,0表示不做限制
            //参数2:消费者一次最多消费消息的数量
                    //参数3:限流策略应用的级别,true:Channel级别;false:consumer级别
            channel.basicQos(0, 1, false);
    
            //参数2 限流方式  第一件事就是 autoAck设置为 false,也就是手工签收消息
                    //参数3:消费端自定义监听
            channel.basicConsume(queueName, false, new MyConsumer(channel));        
        }
    }
    
    消费端自定义监听
    public class MyConsumer extends DefaultConsumer {
        private Channel channel ;
        
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
            //消费成功则发送ACK到broker中
            channel.basicAck(envelope.getDeliveryTag(), false); 
        }
    }
    

    不可路由消息的处理

    image.png
    生产者
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_return_exchange";
            String routingKey = "return.save";
            String routingKeyError = "abc.save";
            
            String msg = "Hello RabbitMQ Return Message";
            
            
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                        String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    
                    System.err.println("---------handle  return----------");
                    System.err.println("replyCode: " + replyCode);
                    System.err.println("replyText: " + replyText);
                    System.err.println("exchange: " + exchange);
                    System.err.println("routingKey: " + routingKey);
                    System.err.println("properties: " + properties);
                    System.err.println("body: " + new String(body));
                }
            });
            //参数3,true:监听器会接收到不可路由消息,然后我们可以对消息进行后续处理:存库、重发或者记录日志等。false:broker端会自动删除不可路由消息,监听器是监听不到的
            channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
            //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        }
    }
    
    消费者
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_return_exchange";
            String routingKey = "return.#";
            String queueName = "test_return_queue";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            channel.basicConsume(queueName, true, new MyConsumer(channel));
            
        }
    }
    
    
    
    public class MyConsumer extends DefaultConsumer {
    
    
        public MyConsumer(Channel channel) {
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
        }
    
    
    }
    
    

    ACK与重回队列

    Nack:失败应答,重回队列的尾端
    Ack:成功应答

    生产者
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_ack_exchange";
            String routingKey = "ack.save";
    
            for(int i =0; i<5; i ++){
                //自定义头信息
                Map<String, Object> headers = new HashMap<String, Object>();
                headers.put("num", i);
    //RabbitMQ发送消息附带BasicProperties属性详解
    //BasicPropertie属性字段详解
    // contentType:消息的内容类型,如:text/plain
    // contentEncoding:消息内容编码
    // headers:设置消息的header,类型为Map<String,Object>
    // deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
    //  priority:消息的优先级
    // correlationId:关联ID
    //replyTo:用于指定回复的队列的名称
    //  expiration:消息的失效时间
    //  messageId:消息ID
    //  timestamp:消息的时间戳
    //  type:类型
    // userId:用户ID
    //  appId:应用程序ID
    // custerId:集群ID
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF-8")
                        .headers(headers)
                        .build();
                String msg = "Hello RabbitMQ ACK Message " + i;
                channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
            }
            
        }
    }
    
    消费者
    public class Consumer {
        public static void main(String[] args) throws Exception {
                
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_ack_exchange";
            String queueName = "test_ack_queue";
            String routingKey = "ack.#";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            // 手工签收 必须要关闭 autoAck = false
            channel.basicConsume(queueName, false, new MyConsumer(channel));    
        }
    }
    
    public class MyConsumer extends DefaultConsumer {
        private Channel channel ;
        
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("body: " + new String(body));
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if((Integer)properties.getHeaders().get("num") == 0) {
     // 参数2:设置为true 批量消息处理,设置为false单条消息处理
     // 参数3:设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            } else {
                channel.basicAck(envelope.getDeliveryTag(), false);
            }   
        }
    }
    

    DLX:死信队列

    • 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
    • DLX是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列
    image.png

    消息变成死信有以下几种情况 :

    • 队列消息长度到达限制;
    • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
    • 原队列存在消息过期设置,消息到达超时时间未被消费
    死信队列的设置 :
    • 首先要设置死信队列的exchange和queue, 然后进行绑定
      Exchange : dlx.exchange
      Queue : dlx.queue
      RoutingKey : #
    • 然后正常声明交换机, 队列, 绑定, 只不过需要在队列加上一个扩展参数即可 : arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);
    • 这样消息在过期, reject或nack(requeue要设置成false), 队列在达到最大长度时, 消息就可以直接路由到死信队列
    生产者
    
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_dlx_exchange";
            String routingKey = "dlx.save";
            
            String msg = "Hello RabbitMQ DLX Message";
            
            for(int i =0; i<1; i ++){
                
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF-8")
                        .expiration("10000")
                        .build();
                channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
            }
            
        }
    }
    
    
    消费者
    public class Consumer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            // 这就是一个普通的交换机 和 队列 以及路由
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.#";
            String queueName = "test_dlx_queue";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            
            Map<String, Object> agruments = new HashMap<String, Object>();
            agruments.put("x-dead-letter-exchange", "dlx.exchange");
            //这个agruments属性,要设置到声明队列上
            channel.queueDeclare(queueName, true, false, false, agruments);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //要进行死信队列的声明:
            channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
            channel.queueDeclare("dlx.queue", true, false, false, null);
            channel.queueBind("dlx.queue", "dlx.exchange", "#");
            
            channel.basicConsume(queueName, true, new MyConsumer(channel));
            
            
        }
    }
    
    public class MyConsumer extends DefaultConsumer {
    
    
        public MyConsumer(Channel channel) {
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
        }
    
    
    }
    
    image.png
    exchange:接收消息,并根据路由键转发消息所绑定的队列

    exchange的属性

    • name:交换机名称
    • type:交换机类型,direct、topic、fanout、headers
    • durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
    • autoDelete:是否自动删除exchange,如果没有与之绑定的Queue,直接删除该exchange
    • internal:Exchange是否RabbitMQ内部使用,如果为true,只能通过Exchange到Exchange,默认为false
    • arguments:扩展参数,用于Exchange自定化属性

    exchange的类型:

    • Direct exchange(直连交换机): 所有发送到Direct exchange中的消息都会被转发到RoutingKey指定的队列中,该模式下exchange binding的队列和消息的routingKey必须匹配才能进行消息的流转,否则该消息将会被丢弃。如果消息没有被指定使用的是哪种exchange,那么消息将会默认的使用rabbitMq中的default exchange,default exchange 默认使用的模式是Direct

      image.png
    • Topic exchange(主题交换机): exchange将消息的routingKey和队列的routingKey进行模糊匹配(使用通配符)

      image.png
    • Fanout exchange(广播交换机): 不处理routingKey,将消息简单的投递到exchange上binding的所有queue,由于不处理routingKey,是性能最好的模式

      image.png

    100%投递方案:

    step1:将数据落地,先将业务数据存库,同时将消息记录也存库
    step2:将消息投递到Mq服务器
    step3:设置消息的响应状态,例如0:发送中,1:成功,2:失败,消费者等待服务端回应。
    step4:消费者获取回应状态,并且将状态存库,更新库中消息的状态
    step5:定时任务将超过固定时间内状态为未发送的消息抽取
    step6:重新发送消息
    step7:重发超过一定次数的消息,对其进行失败的标记,然后人工发送或者其他的方式进行消息的补偿


    image.png

    相关文章

      网友评论

          本文标题:RabbitMq

          本文链接:https://www.haomeiwen.com/subject/xecyzhtx.html