美文网首页
RabbitMQ系列(四):Publish/Subscribe

RabbitMQ系列(四):Publish/Subscribe

作者: 初级赛亚人 | 来源:发表于2017-10-06 00:55 被阅读0次

    这节我们将实现分发一个消息到多个consumer,这就是“publish/subscribe”模式。

    为了实现这个模式,这里构建一个简单的日志系统,将会写两个小程序,一个负责log message的发送,另一个则负责接收和打印它们。在这个系统中,每个运行着的receiver都会收到message,在consumer端,我们还会运行两个receiver程序,这样我们就可以用一个来实现日志的存储,另一个实现日志的打印。本质上,每个日志消息都会被广播到所有的receiver上。

    Exchanges

    回顾一下之前所讲到的一些知识

    producer:发送消息的应用

    queue:存储消息的缓冲区

    consumer:接收消息的应用

    在RabbitMQ的消息模式中producer并不会直接将消息发送至queue。实际上,producer根本不知道该将一条消息分发到哪一个queue上。

    所以producer只能发送消息至exchanges,exchange的作用是一端从producer接收消息,另一端是将消息推送到queue。exchange还应该知道怎样处理这些消息,如将一条消息发送到特定的queue;发送到多个queue;又或者这条消息应该被丢弃等。其规则由exchange type来定义。

    有4个exchange type可选:direct,topic,headers和fanout。现在我们创建一个fanout类型的exchange,命名为logs。

    注:查看exchanges列表的命令:sudo rabbitmqctl list_exchanges

    名为amq.*的exchange和default (unnamed) exchange是默认创建的。在前面的使用中我们并不知道exchange,但是仍然发送了message到queue,那是因为我们使用了默认的exchange(""),消息是通过routing key参数指定的名称到达队列的,这个routing key为空时则默认使用queue的名字。

    Temporary queues

    之前我们用到的queue都是有名字的(如hello和work),因为我们需要将worker指向一个queue。当你想要在生产者和消费者之间共享queue时,给queue一个名字很重要。但这并不适合我们现在的场景,我们希望接收所有的log message,而不只是其中的一部分log message。并且我们只关心当前的message,并不希望接收到旧的log message,为了满足这个需求我们需要做两件事:

    (1)当我们连接至Rabbit时需要一个新的,空的queue,我们可以创建一个随机名字的queue,一个更好的选择是,让RabbitMQ Server为我们选择一个随机的名字。

    (2)当我们断开连接时,这个queue应该被自动删除。

    在receive.go中,用空字符串作为queue的name,将durable置为false。

    当这个方法返回时,RabbitMQ就会为这个queue实例生成一个随机的name,当连接断开时,这个queue将会被删除,因为exclusive=true。

    Bindings

    我们已经创建了一个fanout类型的exchange和一个随机名的queue。现在我们要告诉这个exchange需要发送消息至这个queue。exchange和queue之间的关系被称为binding。

    现在,log exchange将会发送消息至刚才声明的那个queue中。

    注:列出现有的binding:rabbitmqctl list_bindings

    看下运行结果:

    producer.exe

    consumer.go

    在rabbitMQ的监控页面:

    这样,我们就完成了Publish/Subscribe模式。

    相关文章

      网友评论

          本文标题:RabbitMQ系列(四):Publish/Subscribe

          本文链接:https://www.haomeiwen.com/subject/zbmhyxtx.html