美文网首页RabbitQM
RabbitMQ学习总结 第四篇:发布/订阅 Publish/S

RabbitMQ学习总结 第四篇:发布/订阅 Publish/S

作者: ChinaXieShuai | 来源:发表于2019-03-15 17:35 被阅读0次

    上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)。

    为了说明这个模式,我们将会构建一个简单的日志系统。这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们。

    在我们的日志系统里,每个运行的消费者程序都能接收到消息。这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者来把日志消息打印到屏幕上。

    从本质上来说,是把日志消息推送到了所有的消费者端。

    1、消息交换机

    上篇中我们往Queue里发送消息,并从Queue里取出消息。现在我们来介绍RabbitMQ的完全消息模型。

    我们来快速回顾一下之前博文中的内容:

    • 一个生产者者应用程序发送消息;
    • 一个消息队列用来存储和缓存消息;
    • 一个消费者程序接收消息

    RabbitMQ的消息发送模型核心思想是生产者不直接把消息发送到消息队列中。事实上,生产者不知道自己的消息将会被缓存到哪个队列中。

    其实生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息时应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义。

    img

    有一些可用的exchange类型:direct, topic, headers和fanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为logs、类型为fanout的exchange:

    channel.exchangeDeclare("logs", "fanout");

    fanout类型的exchange是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中。这正是我们的日志系统所需要的。

    列出exchange****:

    可以在服务器上使用rabbitmqctl命令来列出RabbitMQ服务器上的所有消息exchange:

    $ sudo rabbitmqctl list_exchanges
    Listing exchanges ...
            direct
    amq.direct      direct
    amq.fanout      fanout
    amq.headers     headers
    amq.match       headers
    amq.rabbitmq.log        topic
    amq.rabbitmq.trace      topic
    amq.topic       topic
    logs    fanout
    ...done.
    

    在这个列表中有一些形如amp.*的exchange,还有默认(未命名)的交换机。这些都是被默认创建的,但这些已经被默认创建的都不是你现在需要用到的。

    没有名字的exchange:

    在之前的博文里没有使用都exchange的相关知识,但是任然能够发送消息。之所以能发送成功是因为我们使用一个默认exchange,我们使用(””)来标识的。

    channel.basicPublish("", "hello", null, message.getBytes());
    

    第一个参数就是exchange的名字。空字符串的符号指的是默认的或没有命名的exchange:消息会根据routingKey被路由到指定的消息队列中。

    现在我们来吧消息推送到已命名的exchange上:

    channel.basicPublish( "logs", "", null, message.getBytes());
    

    2、临时队列

    如果你看过之前几篇的博文,应该会发现我们都是使用了一个指定名字的消息队列(hello和task_queue)。对应的生产者和消费者之间都要使用相同的消息队列名称,这在很重要的。

    但是在我们的log系统中却不是这样,我们希望能够接收到所有的log消息,不只是其中的一部分。我们只要处理当前的log消息,不用管过去的历史log。为了实现,我们需要做以下两步:

    1. 无论什么时候我们和RabbitMQ建立连接时,我们都要刷新、清空Queue。为了达到这一的目的,我们可以用一个随机的名字(随机性可由自己来定义)来创建Queue,也可以让服务器来自动建立一个随见的Queue。
    2. 当消费者断开连接时,Queue能自动被删除。

    使用Java客户端时,我们使用无参数的queueDeclare方法,就可以创建一个已经生成名字的、排他性的且会自动删除的Queue:

    String queueName = channel.queueDeclare().getQueue();
    

    这是就拿到了一个随机名字的queue,形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

    3、绑定(bindings)

    image.png

    我们已经创建了一个fanout类型的exchange和一个队列。现在我们需要让exchange向我们的queue里发送消息。Exchange和queue之间关系被称为binding(绑定)。

    channel.queueBind(queueName, "logs", "");
    

    现在开始,名字为logs的exchange就会忘我们的queue里退消息了。

    查看binding****列表:

    使用rabbitmqctl list_bindings命令来看已经存在的所有的binding。

    4、最终实现

    image.png

    发送日志消息的生产者程序和之前的程序没有太多的差别。最大的区别就是我们把消息推送到一个命名的exchange上,而不是之前未命名的默认exchange。在我们发送消息时需要提供一个routingKey,但对于fanout类型的exchange可以忽略。下边是生产者的代码EmitLog.java:

    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明exchange名字以及类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            
            // getMessage的实现请见上篇博文
            String message = getMessage(argv);
    
            //指定exchange的名字
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
        //...
    }
    

    正如你所见,在建立连接后我们声明了exchange。这一步是必须的,因为禁止向一个不存在的exchange推送消息。

    如果没有向exchange负责的queue,那么消息将会被丢失,这是没有问题的;如果没有消费者监听的话,我们会安全的丢掉这些消息。

    ReceiveLogs.java的代码如下:

    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogs {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明消息路由的名称和类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            //声明一个随机消息队列
            String queueName = channel.queueDeclare().getQueue();
            
            //绑定消息队列和消息路由
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //启动一个消费者
            channel.basicConsume(queueName, true, consumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
    
                System.out.println(" [x] Received '" + message + "'");
            }
        }
    }
    
    • 编译文件:

    javac -cp .:amqp-client-3.3.5.jar ReceiveLogs.java EmitLog.java

    • 把日志存到文件里:

    java -cp .:amqp-client-3.3.5.jar ReceiveLogs > logs_from_rabbit.log

    然后监听该日志文件:

    tail -10f logs_from_rabbit.log

    • 往屏幕上打印日志消息:

    java -cp .:amqp-client-3.3.5.jar ReceiveLogs

    • 启动生产者:

    java -cp .:amqp-client-3.3.5.jar EmitLog

    image.png

    日志输出到文件中:

    image.png

    日志消息打印到了屏幕上:

    image.png

    在运行ReceiveLogs的时候,使用rabbitmqctl list_bindings命令来查看RabbitMQ 中的exchange:

    leo@leocook:~$ sudo rabbitmqctl list_bindings
    Listing bindings ...
            exchange        amq.gen-1Zuyn_44c8IWsdJWrI42Og  queue   amq.gen-1Zuyn_44c8IWsdJWrI42Og  []
            exchange        amq.gen-rSrGSPWLNTuq1dfXipPfAA  queue   amq.gen-rSrGSPWLNTuq1dfXipPfAA  []
            exchange        task_queue      queue   task_queue      []
    logs    exchange        amq.gen-1Zuyn_44c8IWsdJWrI42Og  queue           []
    logs    exchange        amq.gen-rSrGSPWLNTuq1dfXipPfAA  queue           []
    ...done.
    

    总结:

    1、在生产者和消费者的信道中声明exchange名字以及类型

    2、在生产者的信道中指定发送目标的exchange

    3、在消费者端的信道中声明一个随机的消息队列,并拿到这个队列名称;然后在信道上绑定该消息队列和消息路由

    参考链接:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

    相关文章

      网友评论

        本文标题:RabbitMQ学习总结 第四篇:发布/订阅 Publish/S

        本文链接:https://www.haomeiwen.com/subject/hzermqtx.html