美文网首页
RabbitMQ-路由模式

RabbitMQ-路由模式

作者: jiahzhon | 来源:发表于2020-07-20 18:04 被阅读0次
    image.png
    • 生产者:
    public class Send {
        private static final String EXCHANGE_NAME = "test_exchange_direct";
    
        public static void main(String[] args) throws IOException {
    
            Connection connections = ConnectionUtils.getConnections();
            Channel channel = connections.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String msg = new String("routing  test");
            String routingKey="info";
            System.out.println("send   " + msg);
            channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes());
            channel.close();      
            connections.close();
    
        }
    }
    
    • 消费者1:
    public class Recv1 {
        private static final String QUEUE_NAME="erro_info_manage";
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] args) throws IOException {
        Connection connections = ConnectionUtils.getConnections();
        final Channel channel = connections.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println("recv1"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
        
    }
    
    • 消费者2:
    public class Recv2 {
        private static final String QUEUE_NAME="all_info_manage";
        private static final String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] args) throws IOException {
        Connection connections = ConnectionUtils.getConnections();
        final Channel channel = connections.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println("recv2   "+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
        
    }
    }
    

    此时只有消费者2能收到

    相关文章

      网友评论

          本文标题:RabbitMQ-路由模式

          本文链接:https://www.haomeiwen.com/subject/ihbykktx.html