美文网首页
RabbitMQ-发布订阅模式

RabbitMQ-发布订阅模式

作者: jiahzhon | 来源:发表于2020-07-20 17:35 被阅读0次
    1595237710(1).jpg
    1. X:交换机、转发器
    2. 一个生产者,多个消费者
    3. 每一个消费者都有自己的队列
    4. 生产者没有直接将消息发送到队列。而是发送到了交换机
    5. 每个队列都要绑定到交换机上
    6. 生产者发送的消息,经过交换机,到达队列,就能实现一个消息被多个消费者消费
    • 生产者:
    public class Send {
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] args) throws IOException {
            Connection connections = ConnectionUtils.getConnections();
            Channel channel = connections.createChannel();
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 分发
    
            String msg = "hello ps";
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("send  " + msg);
            channel.close();
            connections.close();
    
        }
    }
    
    image.png

    这时候消息哪去了??
    消息丢失了!因为交换机没有存储能力,在rabbitMQ里面,只有队列有存储能力

    • 消费者1:
    public class Recv1 {
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
    
        private static final String QUEUE_NAME = "email_queue";
    
        public static void main(String[] args) throws IOException {
            Connection connections = ConnectionUtils.getConnections();
            final Channel channel = connections.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("recv11   "+msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false,defaultConsumer);
        }
    }
    
    • 消费者2:
    public class Recv2 {
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
    
        private static final String QUEUE_NAME = "massage_queue";
    
        public static void main(String[] args) throws IOException {
            Connection connections = ConnectionUtils.getConnections();
            final Channel channel = connections.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("recv22   "+msg);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false,defaultConsumer);
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ-发布订阅模式

          本文链接:https://www.haomeiwen.com/subject/sfmykktx.html