Rabbit MQ

作者: 喵你一口 | 来源:发表于2019-07-16 17:37 被阅读0次

    为啥要用MQ

    1. 消费方不需要实时等待依赖上一个任务的执行结果,只要生产者随时发送消息,消费者随时可接受消息调用
    2.生产者不需要关心消费者的执行结果,直接调用的话会影响生产者的执行结果

    RabbitMQ基本概念

    RabbitMQ结构图.jpg
    • exchange(交换器)发送到不同的Queue
    • Queue用来存储生产者发送的消息,也让消费者消费的
    • Routing Key(路由Key),生产者发送消息时附带Routing Key选择对应的Queue发送消息
    • Binding(绑定key),Queue和分发器之间的桥梁、Routing Key选择合适的桥梁到达对应的Queue。
    • prefetch count(预取计数):用于指定消费者从Queue中每一次预取得消息、执行完城后才能接着去取。
    • Exchange Type-fanout(广播式),无论Routing Key是什么,所有Queue都会收到消息。


      EXCHANGE广播试.jpg
    • 2、 Exchange Type-direct(直接式):Binding Key和Routing Key相同才能收到消息。如下图


      EXCHANGE直接式.jpg
    • 3、 Exchange Type-topic(主题式):模糊匹配,Binding Key和Routing Key都是被点"."分个开的多个"单词",如"story.ses.nse",使用通配符*(代表单个"单词"),#(代表多个"单词")进行匹配。如下图


      EXCHANGE主题式.jpg

    4、Exchange Type-topic(主题式)是项目中经常使用的MQ方式,故特此举例:
    routingkey由一些单词,中间由.分割,单词可以任意给,但总长度不能超过255个字符。对应的bindingkey也对应是这种形式,绑定键有两种特殊的字符,
    和#

    绑定键举例.png

    对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:<speed>.<colour>.<species>。
    我们创建了三个绑定,Q1的绑定键为.orange.,Q2的绑定键有两个,分别是..rabbit和lazy.#。
    上述绑定关系可以描述为:
    ①Q1关注所有颜色为orange的动物。
    ②Q2关注所有的rabbit,以及所有的lazy的动物。

    • 如果一个消息的路由键是quick.orange.rabbit,那么Q1和Q2都可以接收到,
    • 路由键是lazy.orange.elephant的消息同样如此。
    • 路由键是quick.orange.fox的消息只会到达Q1,路由键是lazy.brown.fox的消息只会到达Q2。注意,路由键为lazy.pink.rabbit的消息只会到达Q2一次,尽管它匹配了两个绑定键。
    • 路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。
    • 路由键为orange或者quick.orange.male.rabbit,那这个消息不与任何绑定键匹配,则会丢弃。

    生产者code举例(官网):

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogTopic {
    
      private static final String EXCHANGE_NAME = "topic_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
    
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
            String routingKey = getRouting(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
      }
      //..
    }
    

    消费者code举例:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class ReceiveLogsTopic {
    
      private static final String EXCHANGE_NAME = "topic_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
    
        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }
    
        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }
    
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
      }
    }
    
    • 如果bingdkey输入#,启动第一个消费者,将会接受所有的消息;
    • bindingkey输入kern.*,启动第二个消费者,将会受到kern前缀的所有消息;
    • bindingkey输入*.critical,启动第三个消费者,将会收到critical后结尾的所有消息
    • bindingkey输入"kern.", ".critical",启动第四个消费者,将会受到第二个消费者和第三个消费者的所有消息;

    转载: http://www.ltens.com/article-6.html

    相关文章

      网友评论

          本文标题:Rabbit MQ

          本文链接:https://www.haomeiwen.com/subject/aqslfctx.html