美文网首页
消息队列实践(三)—— Exchange和Route

消息队列实践(三)—— Exchange和Route

作者: 瑞瑞余之 | 来源:发表于2019-08-08 14:36 被阅读0次

    RabbitMQ官网有一篇文章用来介绍AMQP模型。AMQP是什么呢,它是RabbitMQ所支持的队列协议,即为:Advanced Message Queue Protocol,基于这个协议producer和consumer才可以和broker进行交互。
    在我们之前的文章里介绍了producer - queue - consumer这样的结构,而在实际的AMQP中还有一个实例——Exchange,我们不妨看一下官网给出来的架构图:


    AMQP 模型

    这个是RabbitMQ完全的架构模型,我们看到Publisher将消息发布到Exchange,而非Queue,完全由Exchange决定将message发布到哪一个Queue,分发的依据来自route rules。这样做有什么好处呢?

    1. Publisher单纯的进行发布工作,它不用担心具体推送到哪一个队列;
    2. 在实际项目中,根据接受的数据类型不同可能存在多个队列,比如一个媒体中心的新闻数据有体育新闻、时事新闻、民生新闻等等,这些新闻类型是并行而不交叉的,所以每个类型都存在自己的Queue。Publisher发布的新闻到Exchange中会根据route rule找到对应的队列,而不会污染其它的队列数据。

    从上面的例子我们可以看出Exchange在接受Publisher的message后,会有不同类型的分发,这涉及到Exchange的不同类型:Direct Exchange、Fanout Exchange、Topic Exchange、Headers Exchange、Default Exchange五类

    1. Direct Exchange

    从名字可以看出,它是一种简单粗暴的分派方式,publisher在发布message的时候,会给message一个routing key,比如说字符串“key”,当message到Exchange后,会被转发给routing key也为“key”的队列(可能有多个)。

    2. Fanout Exchange

    这种方式就是消息广播,所有与该类型Exchange绑定的Queue都会收到message。有多少个queue,Exchange就会复制多少分,然后转发。比如我们在腾讯体育看NBA直播,上面的比分数据时时变化,那么腾讯体育就可以将比分message发布给Fanout Exchange,这样让所有终端都可以收到。

    3. Topic Exchange

    它实际上是对Direct Exchange的强化,在Direct X 中routing key是绝对相等式匹配,那么Topic X提供模糊匹配的方式。

    4. Header Exchange

    以上Exchange方式都是通过Routing Key进行匹配转发,Header Exchange放弃使用Routing Key,而是采用Header Properties的方式。在创建Queue时会定义一个map1,这个map1中除了普通的key-value,还会有一个熟悉key:x-match,value:any/all;Producer在创建message的时候也会产生一个map2,Header X会将message中的map2和Queue中的map1进行比对,如果map1定义的x-match:any则代表只要map1与map2中的key-value有一个匹配,该message就会转发到该Queue中;如果map1定义的x-match:all则代表map1与map2中的key-value必须全部匹配,该message才会转发到该Queue中。

    5. Default Exchange

    Default X其实就是Direct X的简易版,如果Queue在定义Routing Key的时候设为空字符串,则默认这个Queue的Routing Key就是它的名字。其它过程和Direct X一致。
    下面我们用Topic Exchange和Header Exchange作为实战样例,来看看Exchange的使用。

    • Topic Exchange

    //Rev.java
    package com.otof.rabbitmq.receive;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Recv {
        private final String QUEUE_TOPIC_1 = "QUEUE_TOPIC1";
        private final String QUEUE_TOPIC_2 = "QUEUE_TOPIC2";
        private final String QUEUE_TOPIC_3 = "QUEUE_TOPIC3";
        private final String QUEUE_HEADER = "QUEUE_HEADER";
        private final String EXCHANGE_TOPIC = "TOPICX";
    
        public void receiveMessage() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            //创建一个Topic Exchange
            channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");
            //声明3个队列
            channel.queueDeclare(QUEUE_TOPIC_1, false, false, false, null);
            channel.queueDeclare(QUEUE_TOPIC_2, false, false, false, null);
            channel.queueDeclare(QUEUE_TOPIC_3, false, false, false, null);
    
            //将三个队列都与Topic X关联,同时定义各自的Routing Key
    
            //支持类似于topic.one  *代表一个单词
            channel.queueBind(QUEUE_TOPIC_1, EXCHANGE_TOPIC, "topic.*");
            //支持类似于topic.one或topic.one.two  #代表零个或多个以.分隔的单词
            channel.queueBind(QUEUE_TOPIC_2, EXCHANGE_TOPIC, "topic.#");
            //仅支持Routing Key=topic_not_pass的message
            channel.queueBind(QUEUE_TOPIC_3, EXCHANGE_TOPIC, "topic_not_pass");
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            //定义成功入队后的回调函数
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("This is Queue 1 Received '" + message + "'");
            };
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("This is Queue 2 Received '" + message + "'");
            };
            DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("This is Queue 3 Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_TOPIC_1, true, deliverCallback1, consumerTag -> {});
            channel.basicConsume(QUEUE_TOPIC_2, true, deliverCallback2, consumerTag -> {});
            channel.basicConsume(QUEUE_TOPIC_3, true, deliverCallback3, consumerTag -> {});
        }
    }
    
    
    //Send.java
    package com.otof.rabbitmq.send;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    
    public class Send {
        private final String EXCHANGE_TOPIC = "TOPICX";
        private final String QUEUE_TOPIC = "QUEUE_TOPIC";
    
        public void sendMessage(String message, String key) {
            ConnectionFactory factory = new ConnectionFactory();
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //声明Exchange Topic,名字与Rev.java中一致
                channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");
    
                //发布带Routing Key的message信息
                channel.basicPublish(EXCHANGE_TOPIC, key, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    //RabbitmqController.java
    package com.otof.rabbitmq.controllers;
    
    import com.otof.rabbitmq.receive.Recv;
    import com.otof.rabbitmq.send.Send;
    import org.springframework.web.bind.annotation.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class RabbitmqController {
    
        @PostMapping(path = "addToQueue")
        public void addMessageToQueue(@RequestParam String message, @RequestParam String key) {
            new Send().sendMessage(message, key);
        }
    
        @GetMapping(path = "getFromQueue")
        public void getMessageFromQueue() throws IOException, TimeoutException {
            new Recv().receiveMessage();
        }
    }
    

    用postman请求getFromQueue,进行注册Exchange和Queue,这个过程叫做Subscription;然后请求addToQueue,将message插入队列。我们来看一下结果:

    1. 请求:当Key=topic


      Key=topic

      结果:匹配topic.#,#代表零个或多个以.分隔的字符串


      key=topic结果
    2. 请求:Key=topic.one


      key=topic.one结果
    3. 请求:Key=topic.one.two.three


      Key=topic.one.two.three
    4. 请求:Key= topic_not_pass


      Key= topic_not_pass结果

    以上就是Topic X的实践,把握以下几条关键点:

    1. 先注册后发布,Subscription-Public
    2. Public只针对Exchange,与Queue无关
    3. Exchange与Queue的绑定过程中需要定义Routing Key

    • Header Exchange

    //Rev.java
    package com.otof.rabbitmq.receive;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Recv {
        private final String QUEUE_HEADER_1 = "QUEUE_HEADER1";
        private final String QUEUE_HEADER_2 = "QUEUE_HEADER2";
        private final String EXCHANGE_HEADER = "HEADERX";
    
        public void receiveMessage() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明Header X
            channel.exchangeDeclare(EXCHANGE_HEADER, "headers");
    
            //任意满足key1=>aaa或key2=>bbb都通过
            Map<String, Object> map1 = new HashMap(){{
                put("key1", "aaa");
                put("key2", "bbb");
                put("x-match", "any");
            }};
            channel.queueDeclare(QUEUE_HEADER_1, false, false,false, null);
            channel.queueBind(QUEUE_HEADER_1, EXCHANGE_HEADER, "", map1);
    
            //全部满足key1=>aaa或key2=>bbb则通过
            Map<String, Object> map2 = new HashMap(){{
                put("key1", "aaa");
                put("key2", "ccc");
                put("x-match", "all");
            }};
            channel.queueDeclare(QUEUE_HEADER_2, false, false,false, null);
            channel.queueBind(QUEUE_HEADER_2, EXCHANGE_HEADER, "", map2);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("This is Queue 1 Received '" + message + "'");
            };
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("This is Queue 2 Received '" + message + "'");
            };
    
            channel.basicConsume(QUEUE_HEADER_1, true, deliverCallback1, consumerTag -> {});
            channel.basicConsume(QUEUE_HEADER_2, true, deliverCallback2, consumerTag -> {});
        }
    }
    
    //Send.java
    package com.otof.rabbitmq.send;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    
    public class Send {
        private final String EXCHANGE_TOPIC = "TOPICX";
        private final String EXCHANGE_HEADER = "HEADERX";
    
        public void sendMessage(Map<String, Object> map, String message) {
            ConnectionFactory factory = new ConnectionFactory();
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.exchangeDeclare(EXCHANGE_HEADER, "headers");
                BasicProperties basicProperties = new BasicProperties();
                basicProperties = basicProperties.builder().headers(map).build();
                channel.basicPublish(EXCHANGE_HEADER, "", basicProperties, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    //RequestData.java
    package com.otof.rabbitmq.entity;
    
    import java.util.Map;
    
    public class RequestData {
        String message;
        Map<String, Object> map;
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public Map<String, Object> getMap() {
            return map;
        }
    
        public void setMap(Map<String, Object> map) {
            this.map = map;
        }
    }
    
    //RabbitmqController.java
    package com.otof.rabbitmq.controllers;
    
    import com.otof.rabbitmq.entity.RequestData;
    import com.otof.rabbitmq.receive.Recv;
    import com.otof.rabbitmq.send.Send;
    import org.springframework.web.bind.annotation.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class RabbitmqController {
    
        @PostMapping(path = "addToQueue")
        public void addMessageToQueue(@RequestBody RequestData requestData) {
            new Send().sendMessage(requestData.getMap(), requestData.getMessage());
        }
    
        @GetMapping(path = "getFromQueue")
        public void getMessageFromQueue() throws IOException, TimeoutException {
            new Recv().receiveMessage();
        }
    }
    

    我们看一下postman发送message和Key Routing


    Key Routing

    结果:


    结果

    相关文章

      网友评论

          本文标题:消息队列实践(三)—— Exchange和Route

          本文链接:https://www.haomeiwen.com/subject/abbwdctx.html