美文网首页
rabbitMq-路由模式direct-java

rabbitMq-路由模式direct-java

作者: ssttIsme | 来源:发表于2019-04-13 12:58 被阅读0次

    工作原理:每一个对列都有自己的路由key,当生产者发送消息时,都会携带一个路由key,这时,会将消息发往路由key一致的队列中。路由模式是发布订阅模式的升级版。
    路由key起到的是消息的导向作用。

    [root@bogon rabbitmq-server-3.6.1]# cd /etc/rabbitmq
    
    [root@bogon rabbitmq]# service rabbitmq-server start
    Starting rabbitmq-server: SUCCESS
    rabbitmq-server.er.
    

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.rabbit</groupId>
        <artifactId>schoolmanage</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>war</packaging>
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
            </dependency>
        </dependencies>
    </project>
    
    package schoolmanage;
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class TestStudentMsgRouting {
        private Connection connection = null;
    
        @Before
        public void init() throws IOException {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 为工厂对象添加数据
            // 远程主机
            factory.setHost("192.168.6.130");
            // 端口号
            factory.setPort(5672);
            // 虚拟主机
            factory.setVirtualHost("/school");
            // 用户名
            factory.setUsername("student");
            // 密码
            factory.setPassword("student");
            // 创建连接
            connection = factory.newConnection();
        }
    
        // 消息生产者
        @Test
        public void provider() throws IOException {
            System.out.println("生产者启动。。。");
            // 创建通道
            Channel channel = connection.createChannel();
    
            // 创建交换机
            String exchangeName = "RountingExchange";
            /**
             * 定义交换机模式 String exchange String type fanout发布订阅 redirect路由模式 topic主题模式
             */
            channel.exchangeDeclare(exchangeName, "direct");
    
            String info = "数学课正常上课!";
            channel.basicPublish(exchangeName, "info", null, info.getBytes());
            String error= "语文教师出去开会,取消今天的语文课!";
            channel.basicPublish(exchangeName, "error", null, error.getBytes());
            String warning="多读书,少游戏!";
            channel.basicPublish(exchangeName, "warning", null, warning.getBytes());
            // 将流关闭
            channel.close();
            connection.close();
            System.out.println("消息发送成功!!!");
        }
    
        // 消费者1
        @Test
        public void consumer1() throws Exception {
            // 创建通道
            Channel channel = connection.createChannel();
    
            // 定义对列
            String queueName = "routingCounsumer1";
            channel.queueDeclare(queueName, false, false, false, null);
            // 订阅E1交换机的消息
            String exchangeName = "RountingExchange";
            // 定义交换机模式
            channel.exchangeDeclare(exchangeName, "direct");
            // 将对列与交换机绑定
            channel.queueBind(queueName, exchangeName, "error");
            // 只允许一次执行一个消息
            channel.basicQos(1);
    
            // 定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 将消费者与队列进行绑定
            /**
             * 定义回复方式 autoAck为false表示手动返回
             */
            channel.basicConsume(queueName, false, consumer);
            System.out.println("消费者1,启动。。。");
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = "消费者1收到 :" + new String(delivery.getBody());
                System.out.println(msg);
                // 告知rabbitMq当前消费的是哪一个消息
                /**
                 * multiple false表示不扩展
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    
        // 消费者2
        @Test
        public void consumer2() throws Exception {
            // 创建通道
            Channel channel = connection.createChannel();
            // 定义对列
            String queueName = "routingCounsumer2";
            channel.queueDeclare(queueName, false, false, false, null);
            // 订阅E1交换机的消息
            String exchangeName = "RountingExchange";
            // 定义交换机模式
            channel.exchangeDeclare(exchangeName, "direct");
            // 将对列与交换机绑定
            channel.queueBind(queueName, exchangeName, "error");
            channel.queueBind(queueName, exchangeName, "info");
            channel.queueBind(queueName, exchangeName, "warning");
            // 只允许一次执行一个消息
            channel.basicQos(1);
    
            // 定义消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 将消费者与队列进行绑定
            /**
             * 定义回复方式 autoAck为false表示手动返回
             */
            channel.basicConsume(queueName, false, consumer);
            System.out.println("消费者2,启动。。。");
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = "消费者2收到 :" + new String(delivery.getBody());
                System.out.println(msg);
                // 告知rabbitMq当前消费的是哪一个消息
                /**
                 * multiple false表示不扩展
                 */
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    
    }
    
    

    消费者1,启动。。。
    
    消费者2,启动。。。
    
    生产者启动。。。
    消息发送成功!!!
    
    消费者1收到 :语文教师出去开会,取消今天的语文课!
    
    消费者2收到 :数学课正常上课!
    消费者2收到 :语文教师出去开会,取消今天的语文课!
    消费者2收到 :多读书,少游戏!
    

    相关文章

      网友评论

          本文标题:rabbitMq-路由模式direct-java

          本文链接:https://www.haomeiwen.com/subject/huulwqtx.html