消息队列之RabbitMQ基础

作者: 橙小张 | 来源:发表于2017-04-23 13:47 被阅读384次

    MQ在工作中用途还是比较多的,RabbitMQ又是比较容易上手并且在企业中用的比较多的一种消息服务,本篇文章借鉴于ginobefun的文章和ConanLi的文章,一方面是加深理解,一方面也是补充自己在MQ的不足。

    一、AMQP基础

    Paste_Image.png

    二、RabbitMQ

    基础概念

    • 生产者、消费者
    • 队列
    • 交换器
    • 绑定

    生产者、消费者

    • 生产者(producer)创建消息,然后发送到代理服务器(RabbitMQ)
    • 消费者(consumer)连接到代理服务器上,并订阅到队列(queue)上;当消费者接收到消息时,它只得到消息的一部分:有效载荷(标签并没有随有效载荷一同传递)
    • 信道(channel)建立在”真实的”TCP连接内的虚拟连接;不论是发布信息、订阅队列或是接收消息,都是通过信道完成的;不使用TCP连接主要是因为对于操作系统而言建立和销毁TCP会话非常昂贵的开销;在一条TCP连接上创建多少条信道是没有限制的
    • 消息包含两部分:有效载荷(payload)和标签(label);有效载荷就是你想要传输的数据(可以是任何格式的任何内容);标签描述了有效载荷,并且RabbitMQ用它来决定谁将获得消息的拷贝(之后举例说明)

    队列(queue)

    Paste_Image.png
    • Queue(队列)是RabbitMQ的内部对象,用于存储消息
    • 主体流程:
      • 队列类似一个broker角色,生产者将内容(消息)发送到队列
      • 队列进行存储,消费者将消息消费
      • 消费者确认消费消息(ack)
    • 生产者和消费者都可以通过来创建队列:
    channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, arguments);
    
    • durable:队列名称,不指定则随机生成
    • exclusive:设置为true则为私有队列,只有当前消费者可以订阅;
    • autoDelete:设置为true时最后一个消费者取消订阅将自动移除队列;
    • arguments:参数

    交换器&绑定

    Paste_Image.png

    RabbitMQ的消息不是直接从生产者发送到队列的,而是要经过交换器然后才可以到达队列:

    • 生成者把消息发布到交换器上;
    • 消息最终到达队列,并被消费者接收;
    • 绑定决定了消息如何从交换器到特定的队列;

    四种交换器类型:

    • fanout:把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
    • direct:把消息路由到bindingKey与routingKey完全匹配的Queue中
    • topic:把消息路由到bindingKey与routingKey模糊匹配的Queue中
    • headers:headers类型的Exchange不依赖于routingKey与bindingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配

    1、fanout

    Paste_Image.png
    • 生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

    2、direct

    Paste_Image.png
    • routingKey=”error”发送消息,则会同时路由到Queue1(amqp.gen-S9b…)和Queue2(amqp.gen-Agl…)
    • routingKey=”info”或routingKey=”warning”发送消息,则只会路由到Queue2
    • 以其他routingKey发送消息,则不会路由到这两个Queue中

    3、topic

    Paste_Image.png
    • routingKey=”quick.orange.rabbit”发送信息,则会同时路由到Q1与Q2
    • routingKey=”lazy.orange.fox”发送信息,则只会路由到Q1
    • routingKey=”lazy.brown.fox”发送消息,则只会路由到Q2
    • routingKey=”lazy.pink.rabbit”发送消息,则只会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配)
    • routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”发送消息,则会被丢弃,它们并没有匹配任何bindingKey

    4、header

    headers类型的Exchange不依赖于routingKey与bindingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

    三、SpringBoot+RabbitMQ实战

    ** Docker环境下安装RabbitMQ**

    # 下载rabbitmq的docker镜像和managerment的镜像
    docker pull rabbitmq:management
    # 启动rabbitmq镜像
    docker run -d --name rabbitmq --publish 5671:5671 \
     --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
    rabbitmq:management
    
    • 端口解释:

      • 4369:epmd(Erlang Port Mapper Daemon)
      • 25672:Erlang distribution
      • 5672, 5671:AMQP 0-9-1 without and with TLS
      • 15672:if management plugin is enabled
      • 61613, 61614:if STOMP is enabled
      • 1883, 8883:if MQTT is enabled
    • 默认访问路径:http://localhost:15672

    • 默认用户名和密码:guest/guest

    介于代码粘贴进来比较多,提供项目GitHub地址,本文提取代码片段进行讲解

    项目结构

    Paste_Image.png

    关键参数

    • @RabbitListener(queues = "xxx")
    • RabbitTemplate
    • TopicExchange
    • Binding
    • Queue

    代码段讲解

    • 创建队列
      @Bean
        public Queue ssdQueue(){
            return new Queue("hello");
        }
    
    • 创建topic类型交换器
    @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
    • 创建fanout类型的交换器
    @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
    • 绑定topic类型的交换器
    @Bean
        public Binding bindingExchangeTopicA(Queue topicAQueue, TopicExchange topicExchange) {
            return BindingBuilder.bind(topicAQueue).to(topicExchange).with("topic.a");
        }
    
    • 绑定fanout类型的交换器
    @Bean
        public Binding bindingExchangeFanoutC(Queue fanoutCQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanoutCQueue).to(fanoutExchange);
        }
    

    四、总结

    本片文章主要讲解了以下RabbitMQ的一些基础概念和使用SpringBoot和RabbitMQ整合的几个案例,也是笔者结合博客和官网写的一片总结,总结的也比较基础,没有包括一些高级的内容,例如事务,最终一致性,重复消息和顺序消息的处理等等。

    相关文章

      网友评论

        本文标题:消息队列之RabbitMQ基础

        本文链接:https://www.haomeiwen.com/subject/qhxyzttx.html