美文网首页RabbitMQ工作生活
Rabbitmq打怪升级之路(九)Publish/Subscri

Rabbitmq打怪升级之路(九)Publish/Subscri

作者: 亚武de小文 | 来源:发表于2019-07-02 19:39 被阅读0次

    简书:亚武de小文 【原创:转载请注明出处】

    发布/订阅模式(Publish/Subscribe)

    Fanouot类型交换机
    LengToo上学.png

    RabbitMQ有以下几种工作模式 :

    • Work queues
    • Publish/Subscribe
    • Routing
    • Topic
    • Headers
    • RPC

    Publish/Subscribe

    模型图
    [亚武de小文]发布订阅模型图.png

    Producer:生产者,就是发消息的一方。
    Exchanger:路由器交换机,此处为fanout,它负责接收发送者发送的消息并将消息转发到订阅它的所有队列上。
    Queue:队列。它如果对某个交换机感兴趣的话,那么就可以把自己绑定到这个交换机上,专业术语叫绑定
    binding:绑定,订阅

    • 发布订阅模式:
      1、每个消费者监听自己的队列。
      2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

    参考代码

    生产者

    声明Exchange_fanout_inform交换机。 声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey 发送消息时不需要指定routingkey

    package com.yawu.xiaowen.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.ExchangeTypes;
    
    /**
     * Fanout交换机
     * 生产者-发件人01
     * @date 2019.06.26
     * @author yawu
     */
    public class Producer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_fanout_exchange";
    
        public static void main(String[] args) {
            try {
                // 应用程序与RabbitMQ建立连接的管理器。
                ConnectionFactory factory = new ConnectionFactory();
                // 服务器地址
                factory.setHost("127.0.0.1");
                // 设置帐号密码,默认为guest/guest
                factory.setUsername("guest");
                factory.setPassword("guest");
    
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel();
                // 通过信道创建一个交换机
                /**
                 * 【参数详解】
                 * exchange:交换机的名称
                 * type:交换机的类型 Fanout:
                 * fanout:直接把队列绑定到路由器。路由器在收到消息后,直接把消息投递到队列中,不需要路由键。
                 */
                channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
    
                // 消息用二进制的方式传输
                final String message = "亚武de小文 ,你好呀!";
                final byte[] msg = message.getBytes("UTF-8");
    
                // 直接把消息发送到路由器。路由在收到消息后,直接投递到订阅这个路由器的队列
                channel.basicPublish(EXCHANGE_NAME, "", null, msg);
    
                LOGGER.info("消息发送成功:{}", message);
    
                // 关闭最好写到finally语句块中
                channel.close();
                connection.close();
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            }
    
        }
    
    }
    
    
    消费者

    Consume01.java

    package com.yawu.xiaowen.fanout;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.ExchangeTypes;
    
    import java.io.IOException;
    
    /**
     * Fanout交换机
     * 消费者-收件人01
     * @date 2019.06.26
     * @author yawu
     */
    public class Consume01 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_fanout_exchange";
        private static final String QUEUE_NAME_01 = "mq_fanout_queue_01";
        private static final String QUEUE_NAME_02 = "mq_fanout_queue_02";
    
        public static void main(String[] args) {
            try {
                // 设置服务器信息
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                // 声明一个队列
                /**
                 * 【参数详解】
                 * queue:要创建的队列名
                 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                 * exclusive:true表示一个队列只能被一个消费者占有并消费
                 * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                 * arguments:其它参数
                 */
                channel.queueDeclare(QUEUE_NAME_01, true, false, false, null);
                // 声明交换机
                channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
    
                // 在fanout类型的路由器中,路由键无效,所以设计为空字符串
                final String routeKey = "";
                // 将这个队列绑定到这个路由器上,表示这个队列对这个路由器感兴趣
                channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, routeKey);
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        LOGGER.info("收件人01收到消息:{}", message);
                    }
                };
                // 队列一确认收到消息
                channel.basicConsume(QUEUE_NAME_01, true, consumer);
    
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            }
    
        }
    }
    

    Consume02.java

    package com.yawu.xiaowen.fanout;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.ExchangeTypes;
    
    import java.io.IOException;
    
    /**
     * Fanout交换机
     * 消费者-收件人02
     * @date 2019.06.26
     * @author yawu
     */
    public class Consume02 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_fanout_exchange";
        private static final String QUEUE_NAME_01 = "mq_fanout_queue_01";
        private static final String QUEUE_NAME_02 = "mq_fanout_queue_02";
    
        public static void main(String[] args) {
            try {
                // 设置服务器信息
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                // 声明一个队列
                /**
                 * 【参数详解】
                 * queue:要创建的队列名
                 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                 * exclusive:true表示一个队列只能被一个消费者占有并消费
                 * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                 * arguments:其它参数
                 */
                channel.queueDeclare(QUEUE_NAME_02, true, false, false, null);
                // 声明交换机
                channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
    
                // 在fanout类型的路由器中,路由键无效,所以设计为空字符串
                final String routeKey = "";
                // 将这个队列订阅到这个路由器上,表示这个队列对这个路由器感兴趣
                channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, routeKey);
    
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        LOGGER.info("收件人02收到消息:{}", message);
                    }
                };
                // 队列二确认收到消息
                channel.basicConsume(QUEUE_NAME_02, true, consumer);
    
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            }
    
        }
    }
    
    

    测试(多种情况分析)

    1. 启动发送者服务,可多次运行
    发件人服务启动.png
    2. 分别启动收件者服务Consume01,Consume02
    收件人启动.png
    收件人启动01.png
    收件人启动02.png
    3. 打开RabbitMQ的管理界面,观察交换机绑定情况:
    交换机绑定.png
    4. 分别查看两个队列,并发布消息,则对应订阅或绑定的接收者收到相应的消息
    绑定的队列.png
    5. 在交换机上面发布,所有人都收到
    交换机发布消息.png
    6. 在队列mq_fanout_queue_01发布消息,则与此队列绑定的接收者获得相应消息
    交换机绑定的队列.png
    交换机绑定的队列1.png
    交换机绑定的队列1发送消息.png
    只有收件人01收到消息.png
    收件人02收不到发布的消息.png

    我们看到只有Consume01接收到了

    7. 至此,发布订阅模式结束。

    相关文章

      网友评论

        本文标题:Rabbitmq打怪升级之路(九)Publish/Subscri

        本文链接:https://www.haomeiwen.com/subject/dcothctx.html