美文网首页
rabbitmq(二)订阅模式\路由模式\topic

rabbitmq(二)订阅模式\路由模式\topic

作者: guideEmotion | 来源:发表于2019-05-21 20:30 被阅读0次

    一 消息应答和消息持久化

    boolan autoAck = true;
    channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    

    boolan autoAck = true;时时自动确认模式:一旦rabbitmq将消息分给消费者,就会从内存中删除
    缺陷这种情况下,如果杀手正在执行的消费者,就会丢失正在处理的消息

    boolan autoAck = true;自动确认模式:一旦rabbitmq将消息分给消费者,就会从内存中删除
    缺陷这种情况下,如果杀手正在执行的消费者,就会丢失正在处理的消息

    boolan autoAck = false;手动确认模式:一旦有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答。高速rabbitmq这个消息我已经处理完成,你可以删了。然后rabbitmq就会删除内存中的消息

    消息应答默认展开的即autoAck = false
    ack:message acknowledgment

    消息持久化

    boolean durable = false;
    channel.queueDeclare(QUEUE_NAME,durable,false,false,null)
    

    已经定义的队列不能改变持久化状态

    二 订阅模式

    前面都是一个消息只能被一个消费者消费,该模式可以实现一个消息发送给多个消费者
    模型

    image.png

    特点

    1. 一个生产者,多个消费者
    2. 每个消费者都有自己的队列
    3. 生产者没有直接把消息发送到队列,而是发送到了交换机 转发器 exchange
    4. 每个队列都要绑定到交换机上
    5. 生产者发送的消息 经过交换机 到达队列 就能实现 一个消息被多个消费者消费

    生产者

        private  static final  String EXCHANGE_NAME = "test_first_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Connection connection = ConnectiionUtil.getConnection();
    
            Channel channel = connection.createChannel();
    
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
    
            String msg = "hello excahnge";
    
            channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
            System.out.println(" 发送完毕 "+msg);
    
            channel.close();
            connection.close();
    
    
        }
    

    效果

    image.png
    注意:交换机没有存储的能力,在rabbitmq中只有队列有存储能力。因为此时还没有队列绑定,所以数据丢失了

    消费者

        private static  final String QUEUE_NAME = "test_first_exchange_queue1";
    
        private  static final  String EXCHANGE_NAME = "test_first_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接
            Connection connection = ConnectiionUtil.getConnection();
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    // 接收到的消息
                    String message = new String(body);
                    System.out.println("Receive 接收到的消息 " + message);
                }
    
    
            };
            channel.basicConsume(QUEUE_NAME,true,consumer);
            channel.close();
            connection.close();
        }
    
    image.png

    三 Exchange(交换机 转发器)

    一方面是接收生产者的消息,另一方面是向队列推送消息

    匿名和非匿名

    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//匿名往队列发
    
    channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());//非匿名,往exchange发,且第二个字符串表示route key
    

    fanout(不处理路由键)

    image.png

    direct(处理路由键)

    image.png

    路由模式

    image.png

    生产者

        private  static final  String EXCHANGE_NAME = "test_exchange_direct";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Connection connection = ConnectiionUtil.getConnection();
    
            Channel channel = connection.createChannel();
    
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    
    
            String msg = "hello excahnge direct";
    
            String routingkey = "error";
            channel.basicPublish(EXCHANGE_NAME,routingkey,null,msg.getBytes());
            System.out.println(" 发送完毕 "+msg);
    
            channel.close();
            connection.close();
    
    
        }
    

    消费者

        private static  final String QUEUE_NAME = "test_exchange_direct_queue2";
    
        private  static final  String EXCHANGE_NAME = "test_exchange_direct";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接
            Connection connection = ConnectiionUtil.getConnection();
            //从连接中获取一个通道
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warn");
    
            channel.basicQos(1);//保证一次只分发一个
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    // 接收到的消息
                    String message = new String(body);
                    System.out.println("Receive2 接收到的消息 " + message);
    
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("Receive2 end" );
                        //手动回执
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
    
    
            };
            
    
            channel.basicConsume(QUEUE_NAME,false,consumer);
    
        }
    

    topic模式

    将路由键和某模式匹配

    匹配一个或或多个

    • 匹配一个
    image.png

    模型

    image.png

    生产者

       private  static final  String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Connection connection = ConnectiionUtil.getConnection();
    
            Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");
            String msg = "test test";
            String routingkey = "white.big.s";
            channel.basicPublish(EXCHANGE_NAME,routingkey,null,msg.getBytes());
            System.out.println(" 发送完毕 "+msg);
    
            channel.close();
            connection.close();
    
    
        }
    

    消费者

     private static  final String QUEUE_NAME = "test_exchange_topic_queue2";
    
        private  static final  String EXCHANGE_NAME = "test_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接
            Connection connection = ConnectiionUtil.getConnection();
            //从连接中获取一个通道
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"white.#");
    
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    // 接收到的消息
                    String message = new String(body);
                    System.out.println("Receive-2 接收到的消息 " + message);
    
    
                }
    
    
            };
    
    
            channel.basicConsume(QUEUE_NAME,true,consumer);
    
    
    
        }
    

    相关文章

      网友评论

          本文标题:rabbitmq(二)订阅模式\路由模式\topic

          本文链接:https://www.haomeiwen.com/subject/qfolzqtx.html