美文网首页
rabbitMq-发布订阅模式fanout-java

rabbitMq-发布订阅模式fanout-java

作者: ssttIsme | 来源:发表于2019-04-13 11:44 被阅读0次

工作原理:生产者经过交换机向队列中发送消息,交换机会把消息发送到订阅了当前消息的队列中。
交换机实现的是动态的数据分发。分发的原则是谁订阅了我,我向谁发。
C1只能消费第一个对列中的内容。
C2只能消费第二个对列中的内容。
发布订阅模式只能做到单纯地进行发布,不能指定某一个队列进行发布。

[root@bogon rabbitmq-server-3.6.1]# cd /etc/rabbitmq

[root@bogon rabbitmq]# service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.er.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.rabbit</groupId>
    <artifactId>schoolmanage</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
    </dependencies>
</project>
package schoolmanage;

import java.io.IOException;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestStudentMsgFanOut {
    private Connection connection = null;

    @Before
    public void init() throws IOException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 为工厂对象添加数据
        // 远程主机
        factory.setHost("192.168.6.130");
        // 端口号
        factory.setPort(5672);
        // 虚拟主机
        factory.setVirtualHost("/school");
        // 用户名
        factory.setUsername("student");
        // 密码
        factory.setPassword("student");
        // 创建连接
        connection = factory.newConnection();
    }

    // 消息生产者
    @Test
    public void provider() throws IOException {
        // 创建通道
        Channel channel = connection.createChannel();

        // 创建交换机
        String exchangeName = "E1";
        /**
         * 定义交换机模式 String exchange String type fanout发布订阅 redirect路由模式 topic主题模式
         */
        channel.exchangeDeclare(exchangeName, "fanout");

        for (int i = 0; i < 6; i++) {
            String msg = "今天学校下雨了!!!——发布订阅模式,这是第" + i + "次通知";
            channel.basicPublish(exchangeName, "", null, msg.getBytes());
        }
        // 将流关闭
        channel.close();
        connection.close();
        System.out.println("消息发送成功!!!");
    }

    // 消费者1
    @Test
    public void consumer1() throws Exception {
        // 创建通道
        Channel channel = connection.createChannel();

        // 定义对列
        String queueName = "counsumer1";
        channel.queueDeclare(queueName, false, false, false, null);
        // 订阅E1交换机的消息
        String exchangeName = "E1";
        // 定义交换机模式
        channel.exchangeDeclare(exchangeName, "fanout");
        // 将对列与交换机绑定
        channel.queueBind(queueName, exchangeName, "");
        // 只允许一次执行一个消息
        channel.basicQos(1);

        // 定义消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 将消费者与队列进行绑定
        /**
         * 定义回复方式 autoAck为false表示手动返回
         */
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者1,启动。。。");
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = "消费者1收到 :" + new String(delivery.getBody());
            System.out.println(msg);
            // 告知rabbitMq当前消费的是哪一个消息
            /**
             * multiple false表示不扩展
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

    // 消费者2
    @Test
    public void consumer2() throws Exception {
        // 创建通道
        Channel channel = connection.createChannel();
        // 定义对列
        String queueName = "counsumer2";
        channel.queueDeclare(queueName, false, false, false, null);
        // 订阅E1交换机的消息
        String exchangeName = "E1";
        // 定义交换机模式
        channel.exchangeDeclare(exchangeName, "fanout");
        // 将对列与交换机绑定
        channel.queueBind(queueName, exchangeName, "");
        // 只允许一次执行一个消息
        channel.basicQos(1);

        // 定义消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 将消费者与队列进行绑定
        /**
         * 定义回复方式 autoAck为false表示手动返回
         */
        channel.basicConsume(queueName, false, consumer);
        System.out.println("消费者2,启动。。。");
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = "消费者2收到 :" + new String(delivery.getBody());
            System.out.println(msg);
            // 告知rabbitMq当前消费的是哪一个消息
            /**
             * multiple false表示不扩展
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}
消费者1,启动。。。
消费者2,启动。。。
消息发送成功!!!
消费者1,启动。。。
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第0次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第1次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第2次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第3次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第4次通知
消费者1收到 :今天学校下雨了!!!——发布订阅模式,这是第5次通知
消费者2,启动。。。
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第0次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第1次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第2次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第3次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第4次通知
消费者2收到 :今天学校下雨了!!!——发布订阅模式,这是第5次通知

相关文章

  • rabbitMq-发布订阅模式fanout-java

    工作原理:生产者经过交换机向队列中发送消息,交换机会把消息发送到订阅了当前消息的队列中。交换机实现的是动态的数据分...

  • RabbitMQ-发布订阅模式

    X:交换机、转发器 一个生产者,多个消费者 每一个消费者都有自己的队列 生产者没有直接将消息发送到队列。而是发送到...

  • 发布订阅模式(观察者模式)

    发布订阅模式(观察者模式) 发布订阅也叫观察者模式 发布 && 订阅 使用

  • 设计模式之发布订阅模式(1) 一文搞懂发布订阅模式

    目录 发布/订阅者模式的优点 实现发布/订阅者模式需要考虑的点 何时应使用发布/订阅者模式 发布/订阅者模式与观察...

  • JS-简单实现发布订阅模式

    发布订阅模式主要涉及三个对象:发布者、订阅者、主题对象。 发布-订阅模式 定义  发布-订阅模式又称观察者模式,它...

  • 从发布-订阅模式到消息队列

    发布-订阅模式 发布-订阅模式又称为观察者模式(网上也有很多说这两种模式区别,个人觉得区别不大),在发布-订阅模式...

  • MQTT 5.0 - 发布订阅模式介绍

    MQTT 协议的核心在于发布订阅模式,在本文中,我们将对这一模式进行深入的介绍。 发布订阅模式 发布订阅模式区别于...

  • Redis发布订阅模式

    Redis支持发布订阅模式,先了解一下与发布订阅相关的命令。 发布订阅模式命令 SUBSCRIBE命令用于订阅ch...

  • MQTT 发布/订阅模式介绍

    MQTT 发布/订阅模式 发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它...

  • 观察者模式&&订阅发布模式

    观察者模式&&订阅发布模式 参考:知乎-观察者模式 vs 发布订阅模式[https://zhuanlan.zhih...

网友评论

      本文标题:rabbitMq-发布订阅模式fanout-java

      本文链接:https://www.haomeiwen.com/subject/umllwqtx.html