RabbitMQ Spring-AMQP官方教程(三)--发布/

    In the first tutorial we showed how to use start.spring.io to leverage Spring Initializr to create a project with the RabbitMQ starter dependency for create spring-amqp applications.

    在第一个教程中,我们展示了如何通过start.spring.io上的Spring初始化手脚架来创建一个包含了RabbitMQ starter依赖的项目,并以此创建基于spring-amqp的应用。

    In the previous tutorial we created a new package (tut2) to place our config, sender and receiver and created a work queue with two consumers. The assumption behind a work queue is that each task is delivered to exactly one worker.


    In this part we'll implement the fanout pattern to deliver a message to multiple consumers. This pattern is known as "publish/subscribe" and is implementing by configuring a number of beans in our Tut3Config file.

    在这部分教程中,我们将实现广播模式(fanout pattern),从而将一条消息发送给多个消费者。这个模式被称为“发布/订阅”,我们将在Tut3Config文件里配置一系列bean来实现这个模式。

    Essentially, published messages are going to be broadcast to all the receivers.



    In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.


    Let's quickly go over what we covered in the previous tutorials:


    A producer is a user application that sends messages.


    A queue is a buffer that stores messages.


    A consumer is a user application that receives messages.


    The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.


    Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

    与此相反,生产者只能将消息发送到一个交换器里。交换器做的事情很简单。一方面它接收生产者发送过来的消息,另一方面它将收到的消息推入队列里。交换器必须明确对于收到的消息它该怎么处理。这条消息是否应该附加到某个特定的队列后面?这条消息是否应该附加到多个队列后面?这条消息是否应该被丢弃?这些规则都由交换器类型(exchange type)来定义。


    There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's configure a bean to describe an exchange of this type, and call it tut.fanout:


    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    @Profile({"tut3", "pub-sub", "publish-subscribe"})
    public class Tut3Config {
        public FanoutExchange fanout() {
            return new FanoutExchange("tut.fanout");
        private static class ReceiverConfig {
            public Queue autoDeleteQueue1() {
                return new AnonymousQueue();
            public Queue autoDeleteQueue2() {
                return new AnonymousQueue();
            public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
                return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
            public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {
                return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
            public Tut3Receiver receiver() {
                return new Tut3Receiver();
        public Tut3Sender sender() {
            return new Tut3Sender();

    We ollow the same approach as in the previous two tutorials. We create three profiles, the tutorial ("tut3", "pub-sub", or "publish-subscribe"). They are all synonyms for running the fanout profile tutorial. Next we configure the FanoutExchange as a bean. Within the "receiver" (Tut3Receiver) file we define four beans: two autoDeleteQueues or AnonymousQueues and two bindings to bind those queues to the exchange.


    The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for fanning out our messages.


    Listing exchanges(列出所有的交换器)

    To list the exchanges on the server you can run the ever useful rabbitmqctl:


    sudo rabbitmqctl list_exchanges

    In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.


    Nameless exchange(匿名交换器)

    In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").


    Recall how we published a message before:


    template.convertAndSend(fanout.getName(), "", message);

    The first parameter is the the name of the exchange that was autowired into the sender. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.


    Now, we can publish to our named exchange instead:


    private RabbitTemplate template;
    private FanoutExchange fanout; // configured in Tut3Config above
    template.convertAndSend(fanout.getName(), "", message);

    From now on the fanout exchange will append messages to our queue.


    Temporary queues(临时队列)

    As you may remember previously we were using queues which had a specified name (remember hello). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.


    But that's not the case for our fanout example. We want to hear about all messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.


    Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.


    Secondly, once we disconnect the consumer the queue should be automatically deleted. To do this with the spring-amqp client, we defined an AnonymousQueue, which creates a non-durable, exclusive, autodelete queue with a generated name:


    public Queue autoDeleteQueue1() {
        return new AnonymousQueue();
    public Queue autoDeleteQueue2() {
        return new AnonymousQueue();

    At this point our queue names contain a random queue names. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.




    We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding. In the above Tut3Config you can see that we have two bindings, one for each AnonymousQueue.


    public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1){
        return BindingBuilder.bind(autoDeleteQueue1).to(fanout);

    Listing bindings(列出所有的绑定)

    You can list existing bindings using, you guessed it,


    rabbitmqctl list_bindings

    Putting it all together(代码整合)


    The producer program, which emits messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our fanout exchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for tut3.Sender.java program:


    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    public class Tut3Sender {
        private RabbitTemplate template;
        private FanoutExchange fanout;
        int dots = 0;
        int count = 0;
        @Scheduled(fixedDelay = 1000, initialDelay = 500)
        public void send() {
            StringBuilder builder = new StringBuilder("Hello");
            if (dots++ == 3) {
                dots = 1;
            for (int i = 0; i < dots; i++) {
            String message = builder.toString();
            template.convertAndSend(fanout.getName(), "", message);
            System.out.println(" [x] Sent '" + message + "'");

    As you see, we leverage the beans from the Tut3Config file and autowire in the RabbitTemplate along with our configured FanoutExchange. This step is necessary as publishing to a non-existing exchange is forbidden.


    The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.


    The code forTut3Receiver.java:


    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.util.StopWatch;
    public class Tut3Receiver {
        @RabbitListener(queues = "#{autoDeleteQueue1.name}")
        public void receive1(String in) throws InterruptedException {
            receive(in, 1);
        @RabbitListener(queues = "#{autoDeleteQueue2.name}")
        public void receive2(String in) throws InterruptedException {
            receive(in, 2);
        public void receive(String in, int receiver) throws InterruptedException {
            StopWatch watch = new StopWatch();
            System.out.println("instance " + receiver + " [x] Received '" + in + "'");
            System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
            private void doWork(String in) throws InterruptedException {
            for (char ch : in.toCharArray()) {
                if (ch == '.') {

    Compile as before and we're ready to execute the fanout sender and receiver.


    mvn clean package

    And of course, to execute the tutorial do the following:


    java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar --spring.profiles.active=pub-sub,receiver
    java -jar target/rabbit-tutorials-1.7.1.RELEASE.jar --spring.profiles.active=pub-sub,sender

    Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.java programs running you should see something like:

    使用rabbitmqctl list_bindings语句你可以验证上述代码的确按照我们所想的创建了绑定和队列。执行语句后你会看到类似于如下的信息:

    sudo rabbitmqctl list_bindings
    tut.fanout exchange 8b289c9c-a1eb-4a3a-b6a9-163c4fdcb6c2 queue []
    tut.fanout exchange d7e7d193-65b1-4128-a532-466a5256fd31 queue []

    The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.


    To find out how to listen for a subset of messages, let's move on to tutorial 4.





