RabbitMQ

作者: 蜀山_竹君子 | 来源:发表于2020-09-13 15:15 被阅读0次

    MQ简介

    什么是MQ?

    消息总线(Message Queue),一组跨进程、异步的通信机制,用于上下游消息传递。由消息系统来确保消息的可靠性。
    MQ作用
    应用解耦、异步通信、流量削峰、数据分发、错峰流控、日志收集等。
    MQ衡量标准
    服务性能、数据存储、集群架构

    主流竞品分析

    当前市场MQ产品很多,比如RocketMQ、ActiveMQ、Kafka、RabbitMQ、ZeroMQ,甚至redis也支持MQ功能。

    ActiveMQ

    apache出品的开源消息总线,完全支持JMS规范的消息中间件,提供丰富API,在中小企业应用广泛。
    性能
    ActiveMQ在性能方便相对于其他MQ产品,性能较差,在高并发场景会出现阻塞、延迟、堆积。
    存储
    Active默认基于内存的KahaDB存储,如果要保证消息的可靠性,可以采用关系数据库存储。
    集群
    ActiveMQ支持master-salave模式、和NETWORK模式。其集群高可用性借助Zookeeper实现。

    master-salave模式
    master-salave:通过Zookeeper进行集群管理,由Master节点对外提供服务,Master节点出现故障,由Zookeeper会高效的下掉Master节点,由Salave节点提供服务。
    NETWORK模式:两套master-salave,由网络通联,组成分布式集群。
    NETWORK模式

    Kafka

    LinkedIn开源的发布-订阅系统,目前是归属于apache顶级项目。Kafka基于Pull模式来处理消息消费,追求高吞吐量,设计目的就是用于日志收集和处理。Kafka不支持事务,对消息的重复、丢失、错误没有严格要求。适用于大量数据的互联网服务的数据收集。Kafk基于操作系统的底层Page Cache实现高效的读写,仅使用内存管理,不存在内存与磁盘之前的IO操作。Kafka借助Zookeeper实现高可用性。


    kafka集群

    RokectMQ

    阿里开源消息中间件,孵化为apache顶级项目。Rocket使用纯java开发,具有高吞吐、高可靠性、适合大型分布式系统应用特点。RoekectMQ 2.X集群基于Zookeeper管理,3.X集群基于NameServer管理。
    RokectMQ能够保证消息的顺序消费,提供了丰富的消息拉取等处理模式,开发者可以高效的进行水平扩展,能够承载上亿数据量级。
    RockerMQ集群支持Master-Salave、双Master-Salave、多主多从模式。
    存储技术支持同步双写、异步复制,使用零拷贝技术。
    RockerMQ缺点是上下游交互核心不开源,社区提供的JMS其性能、兼容性、可靠性无法保证。


    Rocket

    RabbitMQ简介

    RabbitMQ是开源的消息代理和队列服务器,用于通过普通协议在不同应用间共享数据。RabbitMQ使用Erlang语言,基于AMQP协议实现。

    AMQP

    AMQP(Advanced Messae Queueing Protocol)高级消息队列协议: 面向消息中间件的开放式标准应用层协议,定义了以下特性:

    • 消息方向
    • 消息队列
    • 消息路由(包括点对点、发布订阅)
    • 可靠性
    • 安全性

    AMQP要求消息的提供者和客户端接受者的行为实现对不同供应商可以使用相同的方式进行互相操作。AMQP增减了Exchange和Bindings角色。生产者(Priducer)把消息发布到Exchange上,消息最终达到队列并被消费者接受,而Bindings决定Exchange消息应该达到那个队列。


    JMS

    JMS(Java Message Service): java消息服务应用程序接口,是java平台中关于面向消息中间件(MOM)的API,JMS定义了一个API以及一组消息收发必须实现的行为。

    RabbitMQ优势

    • 可靠性:使用了如持久化、传输确认、发布确认等机制来保证消息的可靠性;
    • 灵活的路由:在消息进入队列之前,通过Exchange来路由消息。Rabbit通过内置的Exchange已经实现了典型的路由。针对复杂路由可以通过将多个Exchange绑定组合或者通过插件机制实现自己的路由规则;
    • 消息集群(Clustering):多个MQ服务器可以组成一个集群,形成一个逻辑Broker;
    • 高可用性(HA):队列可以在集群集群上进行镜像,使得其中一个节点故障情况,队列仍可以使用;
    • 多协议支持:支持多种消息队列协议,如STOMP、MQTT等;
    • 管理界面:提供易于使用的管理界面,可以方便监控和管理消息队列;
    • 追踪机制:Rabbit提供了消息追踪机制,可以找出消息出现的问题;
    • 插件机制:提供许多插件,也可以自己实现插件。

    基本概念

    • Broker:消息队列服务器的实体,多个Rabbit服务器可以组成一个逻辑Broker,负责接收生产者的消息,并将消息转发给其他Broker或消息消费者;
    • Exchange:消息交换机,消息第一个到达的地方,消息通过Exchange指定的路由规则分发到不同的队列;
    • Queue:消息队列,消息通过发送和路由到达的地方,到达Queue后,消息进入等待消费状态。每个消息可以发送到一个或多个队列;
    • Binding:把Exchange和Queue按照路由规则绑定起来,Exchange和Queue之间的虚拟连接;
    • Routing Key:路由关键字,Exchange通过这个关键字进行消息传递;
    • Virtual host: 虚拟主机,对Broker的虚拟划分,将消费者、生产者和他们依赖的- AMQP服务进行隔离,每个vHost都可以看做是独立的mini版RabbitMQ服务器。vHost是AMQP的基础,在链接时必须指定,RabbitMQ 默认是 /;
    • Channel:消费通道,连接生产者和消费者的逻辑结构,多路复用连接中的一条双向数据流通道。Channel是建立在真实TCP连接内的虚拟连接,AMQP命令都是通过Channel发出,发布消息、订阅队列、接受消息这些动作都是通过Channel完成;
    • Producer: 消息生产者;
    • Customer:消息消费者;
    • Collection:生产者和消费者之间通信的物理网络。


    Exchange类型

    Exchange分发消息时,各种类型不同分发策略也有所不同。目前主要有三种类型:

    • Direct Exchange:完全根据路由Key投递消息,路由key与队列名一致。消息中的Routing Key与Binding的 binding key 一致时,交换器就将消息分发到对应队列。
    • Fanout Exchange:完全不适用key,采用广播模式,一有消息进来就会分发到所有绑定的队列。Fanout类型交换机转发消息时最快的。
    • Topic Exchange:对key进行模糊匹配后投递。它将Routing Key和Binding Key的字符串切分为单词,单词之间用点隔开。Topic只识别#号和号两个通配符,#号匹配一个或多个单词,号匹配一个单词。例如,abc.#匹配abc.ghi.chf,abc.*匹配abc.ghi。

    RabbitMQ持久化

    RabbitMQ支持消息的持久化,消息持久化包括三部分:

    • Exchange持久化:在声明时指定durable =>1;
    • Queue持久化:在声明时指定durable =>1;
    • 消息持久化:在投递消息时指定delivery_mode=>2(1是非持久化)

    如果Exchange和Queue都是持久化的,那么Binding也是持久化的,如果Exchange和Queue之间有一个是持久化,一个是非持久化那么不允许绑定。

    TTL

    TTL(Time To Live):生存时间,RabbitMQ支持消息的过期时间,一共两种:

    • 在发送消息时指定,通过配置消息体的properties,可以指定消息的过期时间;
    • 在创建Exchange是指定,消息进入队列开始计算,超过队列的超时时间配置,那么消息就会自动清除。

    死信队列DLX

    死信队列(DLX Dead-Letter-Exchange):利用DLX,当消息在一个队列变成死信(Dead-Message)之后,它能重新pulish到另一个Exchange,这个Exchange就是DLX。DLX也是正常Exchange,和一般Exchang没有区别,能在任何队列被指定,实际就是设定某个队列属性。
    消息变成死信的场景

    • 消息被拒绝(basic.reject/basic.nack)并且requeue=false;
    • 消息TTL过期;
    • 队列达到最大长度

    死信队列设置
    需要设置死信队列的Exchange和Queue,然后通过Routing Key进行绑定。只是在队列加上一个参数即可。

           Map<String, Object> arguments = Maps.newHashMapWithExpectedSize(3);
           arguments.put("x-message-ttl", dlx-ttl);
           arguments.put("x-dead-letter-exchange", "exchange-name");
           arguments.put("x-dead-letter-routing-key", "routing-key");
           Queue ret = QueueBuilder.burable("queue-name".withArguments(arguments).build();
    

    只需要监听该死信队列即可处理死信消息。还可以通过死信队列实现延迟消息。

    消费端ACK和NACK

    消费端进行消费时,如果由于业务异常可以进行日志的记录,然后进行补偿。由于服务器宕机严重,我们需要手动进行ACK保障消费端消费成功。
    消费端重回队列是为了对没有处理成功消息,把消息重新返回Broker。一般,在实际应用中都会关闭重回队列,也就是设置false。
    生产者确认机制(confirm)

    • 消息的确认:当生产者投递消息后,如果Broker收到消息,则会给生产者一个应答。
    • 生产者进行接收应答,用来确认这条消息是否正常到达Broker,这种方式也是消息可靠性的核心保障。


      AFK
    1. 开Channel开启确认模式,Channel.confirmSelect();
    2. 在Channe上开启监听,addConfrimListener,监听成功和失败的处理结果,对消息进行重发或记录日志等待进一步处理。

    Return消息机制
    Return Listener用于处理一些不可路由消息。
    在MQ正常情况下,我们生产者将消息通过指定Exchange和Routing,把消息送达到某一个队列,然后消费者监听这个队列进行消息的处理。
    但是在某种情况,我们发送消息时,当前Exchange不存在或指定的路由Key路由不到,这时我们监听不可到达消息就需要Return Listener。
    在MQ基础API中有个关键配置:Mandatory,如果设置true,那么监听器会收到不可到达消息,然后处理,如果设置false,那么Broker默认会自动删除不可到达消息。
    消费端自定义监听(推模式和拉模式pull/push)

    • 通过while循环进行consumer.nextDelivery()方法获取下一条消息进行消费。(通过死循环将拉模式模拟成推模式,死循环会消耗CPU资源)。
    • 自定义consumer,实现更加方便、可读性更强、解耦性更强的方式。(默认使用的模式,直接订阅到Queue,等待MQ推送消息)。

    如果是高并发场景,要实现高吞吐量,消费者应该使用basic.Consume方法,直接订阅队列,将信道设置为接收模式,直到取消队列的订阅。在订阅期间,MQ会不间断推送消息到消费者。推送的消息受到Basic.Qos限制。
    如果只想从队列获取单条消息,那么应该使用Basic.Get,但是不能讲Basic.Get放到死循环中,这样会严重影响MQ性能。

    消息幂等性保证

    消费者实现了消息的幂等性,可以防止消息被多次消费。

    • 利用数据库唯一约束去重。
    1. 创建消息去重库,把全局唯一ID+指纹码作为唯一约束,如果插入成功则表示没有消费这条消息,插入失败则消息已消费。
    2. 优点:实现简单
    3. 缺点:高并发场景存在数据库写入瓶颈
    4. 解决方案:根据ID进行分库分表进行算法路由
    • 使用状态机或者版本号,基于数据库乐观锁CAS方式
    • 版本号机制
    update t_payment set orderId = #{orderId} , money=#{money},  payStatus=#{payStatus}  version=#{ version } +1 
    where id=#{id} and version=#{version}
    
    • 状态机机制
    update table set status = 2... where status = 1
    if(!update)
    进行业务处理
    else
    重复处理
    
    • 利用redis原子性实现
      redis是单线程的,且性能非常好,并且提供许多原子命名。利用redis.setnx命令,将消息唯一ID作为主键执行setnx,
      如果执行成功,那么消息未被消费,如果执行失败表示消息已消费

    消息可靠性保证

    • 消息成功发出
    • 保障MQ节点成功接受到消息
    • 发送端收到MQ节点(Broker)确认应答
    • 完善补偿机制

    解决方案

    1. 消息落库,对消息状态进行变更



      缺点是对数据库有多次操作,不适合高并发场景。

    2. 消息的延迟投递,做二次确认,回调检查


    拆分出一个回调服务,将落库、状态检查等操作安排到回调服务中。
    1:消息发送者发送信息到MQ,消费者为下游业务方。
    1.1 成功后,作为发送方发送消息到MQ,消费者为回调服务
    1.1.1 回调服务接受数据后,落库。
    2.2 失败,等待发送者的延时投递信息
    2:发送者发送延时投递信息到MQ,消费者为回调服务
    2.1 查库,确认下游消费已成功
    2.2 确认下游消费已失败,通过RPC接口调用发送者的接口重新发送
    回调模式减少了数据库操作,但是不能保证消息的百分之百可靠。

    MQ限流

    当海量消息瞬时推送过来,消费者无法同时处理那么多数据,严重甚至导致宕机,这时需要流量削峰。
    RabbitMQ提供了一直Qos(服务质量保证)功能。即在非自动确认消息的前提下(非ACK),如果一定数目的消息(通过基于consume或channel的qos参数设置)未被确认前,不进行消费新的消息。

    RabbitMQ集群

    RabbitMQ最大的优势就是内建集群,其设计目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及添加更多节点来线性扩展消息通信的吞吐量。
    RabbitMQ会始终记录四种内部元数据:

    • 队列元数据:包括队列名称和属性,比如是否支持持久化、是否自动删除。
    • 交换器元数据:交换器名称、类型、属性。
    • 绑定元数据:内部是一张表格,记录如何将消息路由到队列。
    • vhost元数据: 为vhost内部的队列、交换器、绑定提供命名空间和安全属性。
      RabbitMQ会将这些信息全部保存到内存中,同时将标记为可持久化的队列、交换器、绑定存储到磁盘中。存储到磁盘保证节点重启队列和交换器能够重建。
      集群配置方式
      https://www.jianshu.com/p/b7cc32b94d2a

    相关文章

      网友评论

          本文标题:RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/vninektx.html