美文网首页
7.RabbitMQ Topics

7.RabbitMQ Topics

作者: xialedoucaicai | 来源:发表于2018-06-21 11:17 被阅读0次
    之前我们说了RabbitMQ的广播和单播模式,要么发给所有队列,要么发给指定的某个队列,那么我的消息想发给某些队列,类似组播,该怎么实现呢?这就要用到Topics。 Topics

    1.Topic exchange

    为了实现组播,我们将exchange的类型设置为topic,同时指定routingKey,注意其routingKey的匹配规则:

    1. routingKey是一些单词构成,单词将作为最小的匹配单元,单词之间以.分割,上限255字节
    2. 引入* 和#两个通配符,*可以匹配一个单词,#可以匹配0个或多个单词

    对于消费者,在队列绑定到交换器时,指定了每个队列的routingKey如上图所示;对于生产者发送如下routingKey的消息,最终匹配结果为:

    • "quick.orange.rabbit" "lazy.orange.elephant" 将会发给Q1 Q2
    • "quick.orange.fox" 将会发给Q1 * "lazy.brown.fox"*将会发给Q2
    • "lazy.pink.rabbit" 将会发给Q2一次,尽管它匹配了两个绑定规则
    • "quick.brown.fox" Q1,Q2都不匹配,将会被丢弃
    • "orange" "quick.orange.male.rabbit" 将会被丢弃
    • "lazy.orange.male.rabbit" 将会发给Q2

    其实Topic Exchange是很灵活的,它可以"变身"成fanout和direct。

    • 如果消费者绑定队列到交换器时,指定routingKey为#,将会匹配任何routingKey,就变成了fanout
    • 如果不使用# *,直接使用明确的单词,比如"error" "info"就会变成direct

    2.代码示例

    生产者

    //topic模式生产者
    public class Send {
        public static final String EXCHANGE_NAME = "exchange_topic";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 topic 介于单播/广播之间,通过匹配进行多播
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String msg = "hello topic";
            //向交换机发送消息,由于交换机没有存储消息的能力,所以如果没有队列绑定到交换机,消息将被丢弃
            //这里指定路由键为goods.add,表示发送添加商品的消息
            channel.basicPublish(EXCHANGE_NAME, "goods.add.test", null, msg.getBytes());
            System.out.println("向交换机发送了消息"+msg);
            
            channel.close();
            connection.close();
        }
    }
    

    消费者

    public class Receive1 {
        public static final String QUEUE_NAME = "topic_queue1";
        public static final String EXCHANGE_NAME = "exchange_topic";
        
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到转发器,这里指定路由键,#表示匹配任意个单词(0个 1个 n个),*表示匹配一个单词,单词之间用.分隔
            //这里匹配goods后面带一个单词 匹配不到 goods.add.test 和 goods
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.*");
            
            //一次只发一条消息
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //收到消息就会触发
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body,"utf-8");
                    System.out.println("消费者1:"+msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //向生产者发送回执消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //自动应答
            boolean autoAck = false;
            //监听队列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    消费者

    public class Receive2 {
        public static final String QUEUE_NAME = "topic_queue2";
        public static final String EXCHANGE_NAME = "exchange_topic";
        
        public static void main(String[] args) throws Exception{
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到转发器,这里指定路由键,#表示匹配任意个单词(0个 1个 n个),*表示匹配一个单词,单词之间用.分隔
            //这里匹配goods后面带任意多个单词 可以匹配goods.add 和 goods.add.test 和 goods
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
            //一次只发一条消息
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel){
                //收到消息就会触发
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body,"utf-8");
                    System.out.println("消费者2:"+msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //向生产者发送回执消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //自动应答
            boolean autoAck = false;
            //监听队列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    相关文章

      网友评论

          本文标题:7.RabbitMQ Topics

          本文链接:https://www.haomeiwen.com/subject/odcqyftx.html