美文网首页
MQ:Kafka、RocketMQ、RabbitMQ

MQ:Kafka、RocketMQ、RabbitMQ

作者: 请叫我平爷 | 来源:发表于2022-02-18 08:53 被阅读0次

    队列是一种先进先出的数据结构

    MQ

    消息队列:把要传输的数据放在队列中

    image.png

    优点:

    1. 解耦,支付只需要关注关注重要的支付就行,其他的比如更新用户积分、通知商家发货等交给MQ来做
    2. 异步,更新用户信息、通知商家都是异步进行,提高了吞吐量
    3. 削峰,队列的顺序性实现消息的延迟消费

    缺点

    1. 系统可用性降低。依赖服务增多。需要考虑MQ挂了的情况。
    2. 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性
    3. 业务一致性。主业务和从属业务一致性的处理

    使用消息队列要考虑的问题

    1. 高可用
      消息队列要是集群/分布式的,能够提供现有支持,而不是手动代码实现
    2. 数据丢失问题
      消息队列要能持久化,这样才能减少数据的丢书。
    3. 消费者怎么的得到消息队列的数据?
      生产者->消息队列,
    4. 消息队列有数据了,主动叫消费者拿,push
    5. 消费者不断轮询,看有没有新的数据,如果有就消费,pull

    RabbitMQ

    模式:

    https://rabbitmq.com/getstarted.html

    • 简单模式
    image.png
    • 工作queue
    image.png
    • publish/subject
    image.png
    • Routing
    image.png
    • topic
    image.png
    • RPC
    image.png

    组件介绍

    • Broker:一台RabbitMQ机器就是一个Broker
    • Exchange:交换机,跟消费者、生产者、Queue交互
      • type:
        • fanout:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略 RoutingKey
        • direct:直接匹配,通过Exchange名称+RountingKey来发送与接收消息
        • topic:主题匹配订阅,主题指的是RoutingKey,RoutingKey可以采用通配符
        • headers:消息头订阅,只有请求头与消息头匹配,才能接收消息,忽略RoutingKey
        • default:RoutingKey就是queue的名称
         //设置默认的Exchange会使用amq.direct
         channel.BasicPublish("", "TaskQueue", properties, bytes);
        
      • durable 设置是否持久化
      • autoDelete 设置是否自动删除
      • internal 设置是否内置的,客户端无法直接发送消息到内置交换器,只能通过交换器路由到交换器
    • Queue:消息队列,只与Exchage交互

    高可用方案:

    1. 普通集群模式,只有RabbitMQ上有queue,其他的RabbitMQ通过网络去这台实例上获取queue
      消费者拉取的机器如果有queue,直接返回,没有,则实例之间会产生网络传输
      有queue的机器宕机了,导致其他机器都无法拉取数据
    2. 镜像集群模式,没个实例都有queue
      缺点:性能消耗大,所有机器都要进行消息同步
      没有扩展性可言,如果某个实例的queue很大,增加实例也没有用

    保证消息不重复消费:

    如果消费者消费完信息,这时没来的及ack就挂了,那么就会出现重复消费的问题
    保证幂等性

    1. 内存维护一个set,从消息队列中获取一个消息,先查询是否在set里面,在,就表示已消费,不在,加入set
    2. 写数据库,拿唯一键去数据库查询下,不存在就写,存在就表示已消费
    3. 写redis,用set
    4. 让生产者发送消息时,每条消息加一个全局唯一的id,然后消费时,将id保存到redis中,消费时去redis里面查一下,没有再消费
    5. 数据库操作可以设置唯一键,防止数据重复的插入。


      image.png

    生产者消息丢失

    1. 传入消息时丢失
      解决:
      1. 使用RabbitMQ提供的事务功能。

       ```
       channel.txSelect();//开启事务
       try{
          //发送消息
      }catch(Exection e){
         channel.txRollback();//回滚事务
         //重新提交
      }
       ```
      

      缺点:事务开启,就会变成同步阻塞操作,生产者会阻塞等待是否发送成功,性能较低

      1. 开启confirm模式,变成异步。
        每次写的消息都会分配一个唯一的id,然后写入rabbitMQ,rabbitMQ会回传一个ack消息。
        • 如果没有处理这个消息,会回调nack,你可以进行重试。
        • 如果超过一定时间没有收到消息回调,你可以进行重发。
        //开启confirm
        channel.confirm();
        //发送成功回调
        public void ack(String messageId){
        
         }
              // 发送失败回调
        public void nack(String messageId){
            //重发该消息
        }
        

    MQ消息丢失

    RabbitMQ
    收到消息,暂存在内存中,还没存到磁盘,然后机器挂了,导致数据丢失。

    解决:消息持久化到磁盘。
    分为两步:

    1. 创建queue的时候将其设置为持久化,这样可以保证rabbitMQ持久化queue的元数据,但是不会持久好queue里面的数据
    //Exchage持久化
    hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
    //queue持久化
    channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
    
    1. 发送消息的时候将消息的deliveryMode设置为2,这样消息就会被设置为持久化方式,rabbitMq就会将消息持久化到磁盘上。
    //消息持久化,在投递时制定deliveryMode=2(1是非持久化)
    channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());  
    

    设置这两个持久化,并使用confirm机制,只有消息被持久化到磁盘,才会通知生产者ack

    消费者消息丢失

    RabbitMQ
    消费者消费到某条消息的时候,还没处理,就挂了,rabbitMQ认为你消费了,消息丢失
    解决:关闭rabbitMQ的自动ack,等处理完毕在手动ack。

    //关闭自动ack
    //basicConsume(String queue, boolean autoAck, Consumer callback)
    channel.basicConsume(queueName, false, this);
    

    保证消息顺序执行
    一个queue,多个consumer消费,造成顺序错误,
    consumer从MQ读取是有序的,处理的时间是不固定的,无法保证先读到的先完成,造成顺序错误
    例如,读取顺序,a=1,a=2,a=3,执行的顺序,a=1,a=3,a=2,最终存表是a=2

    一个queue,一个consumer,consumer里面进行了多线程消费,也会造成消息消费顺序错误。

    1. 解决1:拆分多个queue,每个queue一个consumer,consumer单线程
      缺点:吞吐量降低,queue增多
    2. 解决2:consumer多线程处理。先将消息保存在内存队列中,将关键值相同的的数据保存到相同的消息队列中,然后分发给底层不同的worker来处理

    消息积压

    1. 解决consumer的问题,确保其恢复消费速度。停掉所有的consumer
    2. 临时建立好原来10倍20倍的queue数量
    3. 写一个临时分发消息的consumer程序,消费积压的消息,消费之后不做耗时处理,直接均匀写入临时建好10倍的queue里面去
    4. 紧急征用10倍的机器来部署consumer,每一批consumer消费一个临时queue消息。相当于将queue资源和consumer扩大10倍,以之前10倍的速度去消费
    5. 等快消费完后,恢复原来框架,重新用原来的consumer机器来消费信息

    设置了过期时间,过期就丢了

    rabbitMQ是可以设置过期时间的。
    解决:流量低峰期,写程序,手动查询丢失的数据,重新发送到MQ,把丢失的数据补回来

    积压消息长时间没处理,MQ放不下?

    1. 扩容,加queue,加consumer
    2. 写个临时程序,丢去一部分数据,流量低峰期,补齐数据

    Kafka

    • Broker:一台kafka服务器就叫Broker
    • Topic:把数据丢哪个队列,从哪个队列拿数据。相当于表的概念
    • Partition:为了提高一个队列的吞吐量,把Topic分区,这个分区就是Partition
    • Partition数据只允许追加写入磁盘,避免缓慢的随机I/O操作(它会先缓存一部分,等到足够多数据量或者一定的时间再批量写入(flush))

    高可用方案

    创建一个topic,会划分成很多partition,每个partition在不同的broker上
    读写只从leader上读取,leader会自动同步到follower
    如果broker挂了,恰好某个partition的leader在这台broker上,那会从其他的follower上选举出一个新的leader

    保证消息不重复消费

    如果消费者消费完信息,这时没来的及提交offset就挂了,那么就会出现重复消费的问题
    保证幂等性

    生产者消息丢失

    kafka

    • 设置acks=all,一定不会丢
    • ack=0,生产者发送消息就行,不用MQ的ack确认
    • ack=1,生产者的leader收到消息,保存到page cache中就返回ack
    • ack=-1、ack=all,生产者的leader收到消息,保存到page cache,并存入磁盘,同步到ISR中信任的follower数量的page cache中才返回ack

    MQ消息丢失
    Kafka

    • kafka的broker挂了,partition的leader刚好是在这个broker上,重新选举,而follower刚好有一部分数据没有从leader同步过来,而某个follower成为leader后,就会丢失一部分数据

    解决:设置4个参数

    1. topic设置replication.factor参数:大于1,要求每个partition必须至少2个副本
    2. kafka服务端设置min.insync.replicas参数:大于1,一个leader至少感知一个follower还跟自己联系,确保leader挂了还有一个follower
    3. product端设置acks=all:每条数据,必须写入所有replica之后,才认为是成功了
    4. product端设置retries=MAX:一旦写入失败,就无限重拾,卡在这里

    这样能保证kafka的broker端发送故障,leader转移时,不会丢失数据。

    消费者消息丢失

    Kafka

    kafka会自动提交offset,就是消费者还没开始消费,就自动提交了offset,让kafka以为消费好了。

    解决:关闭kafka的自动提交offset,处理完毕后再手动提交offset,可以保证数据不丢失。
    但是还会出现重复消费的情况,需要自己保证幂等性

    保证消息顺序执行

    一个topic、一个partition、一个consumer,consumer内存进行了多线程消费,也会出现顺序错乱的问题

    具有顺序的数据写入不同的partition,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作
    造成消息没有按照顺序执行,造成数据顺序错误。

    1. 解决1:确保一个消息发送同一个partition以及一个topic,一个partition只有一个consumer,consumer内部消费是单线程。
    2. 解决2:写N个queue,然后N个现场分别消费一个内存queue

    消息积压

    1. 解决consumer的问题,确保其恢复消费速度。停掉所有的consumer
    2. 临时新建一个topic,partition是原来的10倍
    3. 写一个临时分发消息的consumer程序,消费积压的消息,消费之后不做耗时处理,直接均匀写入临时建好10倍的partition里面去
    4. 紧急征用10倍的机器来部署consumer,每一批consumer消费一个临时partition消息。相当于将partition资源和consumer扩大10倍,以之前10倍的速度去消费
    5. 等快消费完后,恢复原来框架,重新用原来的consumer机器来消费信息

    积压消息长时间没处理,MQ放不下?

    1. 扩容,加partition,加consumer
    2. 写个临时程序,丢去一部分数据,流量低峰期,补齐数据

    topic分配partition的规则

    1. rangeAssignor 默认分配策略


      image.png
    2. roundRobinAssignor 轮询


      image.png

      会出现问题


      image.png
    3. StickyAssignor

    4. 自定义分配策略

    高并发场景下,如何保证收发消息的性能?

    1. 生产端批量发送
    2. broker异步刷盘
    3. 消费者批量拉取

    如何保证消息服务的高可用和高可靠?

    1. broker集群,服务发现和注册,负载均衡、超时自动重试
    2. partition一个leader多个follower,并且partition分布在多个broker上,partition故障自动转移
    3. 存储采用追加日志文件模式+索引查找

    如何保证服务是可以水平任意扩展的?

    broker集群,如果新的broker,只需要注册到注册中心即可。

    如何保证消息存储也是水平可扩展的?

    利用分片存储技术,partition分布在多个broker上

    各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?

    一个topic对应多个partition,一个partition对应一个consumer

    相关文章

      网友评论

          本文标题:MQ:Kafka、RocketMQ、RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/nvsslrtx.html