美文网首页
RabbitMQ-路由模式

RabbitMQ-路由模式

作者: jiahzhon | 来源:发表于2020-07-20 18:04 被阅读0次
image.png
  • 生产者:
public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException {

        Connection connections = ConnectionUtils.getConnections();
        Channel channel = connections.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String msg = new String("routing  test");
        String routingKey="info";
        System.out.println("send   " + msg);
        channel.basicPublish(EXCHANGE_NAME,routingKey, null, msg.getBytes());
        channel.close();      
        connections.close();

    }
}
  • 消费者1:
public class Recv1 {
    private static final String QUEUE_NAME="erro_info_manage";
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException {
    Connection connections = ConnectionUtils.getConnections();
    final Channel channel = connections.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    channel.basicQos(1);
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
            String msg=new String(body,"utf-8");
            System.out.println("recv1"+msg);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
    
}
  • 消费者2:
public class Recv2 {
    private static final String QUEUE_NAME="all_info_manage";
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException {
    Connection connections = ConnectionUtils.getConnections();
    final Channel channel = connections.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.basicQos(1);
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
            String msg=new String(body,"utf-8");
            System.out.println("recv2   "+msg);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally{
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };
    channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
    
}
}

此时只有消费者2能收到

相关文章

网友评论

      本文标题:RabbitMQ-路由模式

      本文链接:https://www.haomeiwen.com/subject/ihbykktx.html