这节我们将实现分发一个消息到多个consumer,这就是“publish/subscribe”模式。
为了实现这个模式,这里构建一个简单的日志系统,将会写两个小程序,一个负责log message的发送,另一个则负责接收和打印它们。在这个系统中,每个运行着的receiver都会收到message,在consumer端,我们还会运行两个receiver程序,这样我们就可以用一个来实现日志的存储,另一个实现日志的打印。本质上,每个日志消息都会被广播到所有的receiver上。
Exchanges
回顾一下之前所讲到的一些知识
producer:发送消息的应用
queue:存储消息的缓冲区
consumer:接收消息的应用
在RabbitMQ的消息模式中producer并不会直接将消息发送至queue。实际上,producer根本不知道该将一条消息分发到哪一个queue上。
所以producer只能发送消息至exchanges,exchange的作用是一端从producer接收消息,另一端是将消息推送到queue。exchange还应该知道怎样处理这些消息,如将一条消息发送到特定的queue;发送到多个queue;又或者这条消息应该被丢弃等。其规则由exchange type来定义。
有4个exchange type可选:direct,topic,headers和fanout。现在我们创建一个fanout类型的exchange,命名为logs。
注:查看exchanges列表的命令:sudo rabbitmqctl list_exchanges
名为amq.*的exchange和default (unnamed) exchange是默认创建的。在前面的使用中我们并不知道exchange,但是仍然发送了message到queue,那是因为我们使用了默认的exchange(""),消息是通过routing key参数指定的名称到达队列的,这个routing key为空时则默认使用queue的名字。
Temporary queues
之前我们用到的queue都是有名字的(如hello和work),因为我们需要将worker指向一个queue。当你想要在生产者和消费者之间共享queue时,给queue一个名字很重要。但这并不适合我们现在的场景,我们希望接收所有的log message,而不只是其中的一部分log message。并且我们只关心当前的message,并不希望接收到旧的log message,为了满足这个需求我们需要做两件事:
(1)当我们连接至Rabbit时需要一个新的,空的queue,我们可以创建一个随机名字的queue,一个更好的选择是,让RabbitMQ Server为我们选择一个随机的名字。
(2)当我们断开连接时,这个queue应该被自动删除。
在receive.go中,用空字符串作为queue的name,将durable置为false。
当这个方法返回时,RabbitMQ就会为这个queue实例生成一个随机的name,当连接断开时,这个queue将会被删除,因为exclusive=true。
Bindings
我们已经创建了一个fanout类型的exchange和一个随机名的queue。现在我们要告诉这个exchange需要发送消息至这个queue。exchange和queue之间的关系被称为binding。
现在,log exchange将会发送消息至刚才声明的那个queue中。
注:列出现有的binding:rabbitmqctl list_bindings
看下运行结果:
producer.exe
consumer.go
在rabbitMQ的监控页面:
这样,我们就完成了Publish/Subscribe模式。
网友评论