美文网首页
RabbitMQ(五)发布/订阅

RabbitMQ(五)发布/订阅

作者: 薛晨 | 来源:发表于2016-11-03 21:02 被阅读173次

    RabbitMQ官网中文版教程:

    http://rabbitmq.mr-ping.com/tutorials_with_python/[3]Publish_Subscribe.html

    上述教程示例为pathon版,Java版及相应解释如下:

    生产者

    package com.xc.rabbit.publish;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * Created by xc.
     */
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "log";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            factory.setVirtualHost("/");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            
            // 声明交换器为fanout类型,连接在交换器上的队列均能收到生产者的消息(相当于广播)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            for (int i= 0; i < 5; i++) {
                String message = "Hello World! " + i;
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
    
            channel.close();
            connection.close();
        }
    }
    
    

    消费者1

    package com.xc.rabbit.publish;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * Created by xc.
     */
    public class RecvLog1 {
    
        private static final String EXCHANGE_NAME = "log";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            factory.setVirtualHost("/");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 创建一个系统随机命名的临时队列,非持久化,专用、且消费者都断连时会自动删除
            String queueName = channel.queueDeclare().getQueue();
            // 将队列与交换器绑定
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
    
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    消费者2

    package com.xc.rabbit.publish;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * Created by xc.
     */
    public class RecvLog2 {
    
        private static final String EXCHANGE_NAME = "log";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            factory.setVirtualHost("/");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 创建一个系统随机命名的临时队列,非持久化,专用、且消费者都断连时会自动删除
            String queueName = channel.queueDeclare().getQueue();
            // 将队列与交换器绑定
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
    
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    先运行消费者,在运行生产者程序,结果如下:

    两个消费者均受到了生产者发送的所有消息。

    注意
    1) RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

    2)前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。

    回想我们之前是如何发布一则消息:

    channel.basic_publish(exchange='',
    routing_key='hello',
    body=message)

    exchange参数就是交换机的名称。空字符串代表默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列。

    3)当声明交换器的类型的fanout时,routingkey不起作用。

    4) In the Java client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:
    String queueName = channel.queueDeclare().getQueue();
    At that point queueName contains a random queue name. For example it may look likeamq.gen-JzTY20BRgKO-HjmUJj0wLg.

    API

    1. Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;

    @param exchange the name of the exchange 交换器名

    @param type the exchange type 交换器类型:direct, fanout, topic

    @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 是否持久化

    @param autoDelete true if the server should delete the exchange when it is no longer in use 当已经没有消费者时,服务器是否可以删除该Exchange

    @param arguments other properties (construction arguments) for the exchange 其他配置

    2.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

    参数分别为:队列名、交换器名、路由键

    如何监听消息的子集呢?请看下节

    相关文章

      网友评论

          本文标题:RabbitMQ(五)发布/订阅

          本文链接:https://www.haomeiwen.com/subject/dxliuttx.html