美文网首页
8_rabbitmq路由模式

8_rabbitmq路由模式

作者: 奇点一氪 | 来源:发表于2018-08-18 10:17 被阅读36次

    交换机

    一方方面是接收生产者的消息,另一方面是想队列推送消息。

    匿名转发 “”


    image.png
    • 声明交换机不处理路由键 :Fanout(不处理路由键) 非匿名方式不处理


      image.png
      image.png
    • 声明交换机处理路由键 :Direct(处理路由键) 非匿名方式不处理


      image.png
    image.png
    子代码地址
    https://github.com/csy512889371/learndemo/tree/master/ctoedu-rabitmq

    产者

    public class Send {
         private final static String EXCHANGE_NAME = "test_exchange_direct";
         public static void main(String[] argv) throws Exception {
             // 获取到连接以及mq通道
             Connection connection = ConnectionUtils.getConnection();
             Channel channel = connection.createChannel();
             // 声明exchange     direct 处理路由键设置
             channel.exchangeDeclare(EXCHANGE_NAME, "direct");
             // 消息内容
             String message = "id=1001的商品删除了";
    
                     //第二个队列能收到
                     String  routingKey="info";
                     //两个队列都能收到
                     //String  routingKey="error";
    
             channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
             System.out.println(" [x] Sent '" + message + "'");
             channel.close();
             connection.close();
         }
    }
    

    消费者 1

    import cn.ctoedu.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class Recv {
        private final static String QUEUE_NAME = "test_queue_direct_1";
        private final static String EXCHANGE_NAME = "test_exchange_direct";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机,绑定key路由到error
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
    
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发这个方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done ");
            // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    消费者 2

    public class Recv2 {
        private static final String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //获取连接
            Connection connection = ConnectionUtils.getConnection();
            //获取channel
            final Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
             // 绑定队列到交换机,绑定key路由到insert
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "infor");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
            channel.basicQos(1);//保证一次只分发一个
            //定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                //消息到达 触发这个方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("[2] Recv msg:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[2] done ");
                        //手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    

    相关文章

      网友评论

          本文标题:8_rabbitmq路由模式

          本文链接:https://www.haomeiwen.com/subject/sfbtiftx.html