美文网首页
7.RabbitMQ Topics

7.RabbitMQ Topics

作者: xialedoucaicai | 来源:发表于2018-06-21 11:17 被阅读0次
之前我们说了RabbitMQ的广播和单播模式,要么发给所有队列,要么发给指定的某个队列,那么我的消息想发给某些队列,类似组播,该怎么实现呢?这就要用到Topics。 Topics

1.Topic exchange

为了实现组播,我们将exchange的类型设置为topic,同时指定routingKey,注意其routingKey的匹配规则:

  1. routingKey是一些单词构成,单词将作为最小的匹配单元,单词之间以.分割,上限255字节
  2. 引入* 和#两个通配符,*可以匹配一个单词,#可以匹配0个或多个单词

对于消费者,在队列绑定到交换器时,指定了每个队列的routingKey如上图所示;对于生产者发送如下routingKey的消息,最终匹配结果为:

  • "quick.orange.rabbit" "lazy.orange.elephant" 将会发给Q1 Q2
  • "quick.orange.fox" 将会发给Q1 * "lazy.brown.fox"*将会发给Q2
  • "lazy.pink.rabbit" 将会发给Q2一次,尽管它匹配了两个绑定规则
  • "quick.brown.fox" Q1,Q2都不匹配,将会被丢弃
  • "orange" "quick.orange.male.rabbit" 将会被丢弃
  • "lazy.orange.male.rabbit" 将会发给Q2

其实Topic Exchange是很灵活的,它可以"变身"成fanout和direct。

  • 如果消费者绑定队列到交换器时,指定routingKey为#,将会匹配任何routingKey,就变成了fanout
  • 如果不使用# *,直接使用明确的单词,比如"error" "info"就会变成direct

2.代码示例

生产者

//topic模式生产者
public class Send {
    public static final String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 topic 介于单播/广播之间,通过匹配进行多播
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String msg = "hello topic";
        //向交换机发送消息,由于交换机没有存储消息的能力,所以如果没有队列绑定到交换机,消息将被丢弃
        //这里指定路由键为goods.add,表示发送添加商品的消息
        channel.basicPublish(EXCHANGE_NAME, "goods.add.test", null, msg.getBytes());
        System.out.println("向交换机发送了消息"+msg);
        
        channel.close();
        connection.close();
    }
}

消费者

public class Receive1 {
    public static final String QUEUE_NAME = "topic_queue1";
    public static final String EXCHANGE_NAME = "exchange_topic";
    
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到转发器,这里指定路由键,#表示匹配任意个单词(0个 1个 n个),*表示匹配一个单词,单词之间用.分隔
        //这里匹配goods后面带一个单词 匹配不到 goods.add.test 和 goods
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.*");
        
        //一次只发一条消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //收到消息就会触发
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("消费者1:"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //向生产者发送回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //自动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者

public class Receive2 {
    public static final String QUEUE_NAME = "topic_queue2";
    public static final String EXCHANGE_NAME = "exchange_topic";
    
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到转发器,这里指定路由键,#表示匹配任意个单词(0个 1个 n个),*表示匹配一个单词,单词之间用.分隔
        //这里匹配goods后面带任意多个单词 可以匹配goods.add 和 goods.add.test 和 goods
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
        //一次只发一条消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //收到消息就会触发
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("消费者2:"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //向生产者发送回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //自动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

相关文章

  • 7.RabbitMQ Topics

    1.Topic exchange 为了实现组播,我们将exchange的类型设置为topic,同时指定routin...

  • Topics

    索拉里斯星 我会看一些讲书的节目,但通常不会真的去买去看。索拉里斯星是一个科幻小说,他说的是人们探索一个奇怪星球的...

  • Topics

    As long as I am remember I enjoy my cozy home I crazy abo...

  • How we are going to address Busi

    --> From managers' perspective Topics General topics: 1. ...

  • kafka数据迁移

    迁移前: 编辑json文件:vi topics-to-move.json{"topics": [{"topic":...

  • LC刷题记录To Be Continued

    1. Two Sum - Easy topics: array, hash table topics:704....

  • 0 Topics

    1. Precalculus 1.1 algebra 1.2 trigonometry 2. Calcul...

  • Topics Exchange

    尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.bas...

  • 一个好的唐词网站

    https://topics.gmw.cn/node_121344.htm[https://topics.gmw....

  • Additional Filtering Topics

    本笔记只做复习以及巩固知识点使用,初次学习请下载练习工作簿,跟随网络课程的讲解同步操作,结果参考解法工作簿。 Fi...

网友评论

      本文标题:7.RabbitMQ Topics

      本文链接:https://www.haomeiwen.com/subject/odcqyftx.html