美文网首页
6.RabbitMQ Routing

6.RabbitMQ Routing

作者: xialedoucaicai | 来源:发表于2018-06-20 15:43 被阅读0次

在之前的Publish/Subscribe模式中,Exchange会将消息推送给所有绑定到它的队列,类似广播,那么如果我的某条消息只想发到指定的那个队列,类似单播,如何实现呢?这就需要用到Routing模式了:

Routing

1.direct与binding key

其实理解了之前的Publish/Subscribe,Routing也就好理解了,就是广播和单播的区别,Publish/Subscribe会将消息发送到所有队列,Routing只会将消息发送到routingKey完全匹配的队列。我们只要把代码稍作改动:

  1. 声明交换器类型为direct
  2. 指定routingKey,生产者/消费者通过routingKey来识别

在Publish/Subscribe中我们是这样发消息的:

//声明交换器,类型fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//发送消息给交换器,广播不需要routingKey
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

对于单播,我们需要这样写:

//声明交换器 direct类似于单播
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//发消息给交换器,指定routingKey,
channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes());

对于消费者,我们需要指明对应的routingKey
之前的写法

//绑定队列到转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

改成这样写

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");

2.完整代码

生产者

//路由模式生产者
public class Send {
    public static final String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机 direct类似于单播
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String msg = "hello routing";
        //向交换机发送消息,由于交换机没有存储消息的能力,所以如果没有队列绑定到交换机,消息将被丢弃
        //这里指定路由键为error,表示我发送error级别的日志
        channel.basicPublish(EXCHANGE_NAME, "error", null, msg.getBytes());
        channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes());
        System.out.println("向交换机发送了消息"+msg);
        
        channel.close();
        connection.close();
    }
}

消费者1

public class Receive1 {
    public static final String QUEUE_NAME = "routing_queue1";
    public static final String EXCHANGE_NAME = "exchange_direct";
    
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到转发器,这里指定路由键,这里的路由键只有和生产者的路由键匹配,队列才能收到消息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        
        //一次只发一条消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //收到消息就会触发
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("消费者1:"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //向生产者发送回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //自动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者2

public class Receive2 {
    public static final String QUEUE_NAME = "routing_queue2";
    public static final String EXCHANGE_NAME = "exchange_direct";
    
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列到转发器,这里绑定了三个路由键,表示能接收三种级别的日志消息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        
        //一次只发一条消息
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //收到消息就会触发
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body,"utf-8");
                System.out.println("消费者2:"+msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //向生产者发送回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //自动应答
        boolean autoAck = false;
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

error级别的消息,两个消费者都能收到,info级别的消息,就只有消费者2能收到了,不像之前两个队列都能收到全部的消息。

相关文章

网友评论

      本文标题:6.RabbitMQ Routing

      本文链接:https://www.haomeiwen.com/subject/ewntyftx.html