队列是一种先进先出的数据结构
MQ
消息队列:把要传输的数据放在队列中
image.png优点:
- 解耦,支付只需要关注关注重要的支付就行,其他的比如更新用户积分、通知商家发货等交给MQ来做
- 异步,更新用户信息、通知商家都是异步进行,提高了吞吐量
- 削峰,队列的顺序性实现消息的延迟消费
缺点
- 系统可用性降低。依赖服务增多。需要考虑MQ挂了的情况。
- 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性
- 业务一致性。主业务和从属业务一致性的处理
使用消息队列要考虑的问题
- 高可用
消息队列要是集群/分布式的,能够提供现有支持,而不是手动代码实现 - 数据丢失问题
消息队列要能持久化,这样才能减少数据的丢书。 - 消费者怎么的得到消息队列的数据?
生产者->消息队列, - 消息队列有数据了,主动叫消费者拿,push
- 消费者不断轮询,看有没有新的数据,如果有就消费,pull
RabbitMQ
模式:
https://rabbitmq.com/getstarted.html
- 简单模式
- 工作queue
- publish/subject
- Routing
- topic
- RPC
组件介绍
- Broker:一台RabbitMQ机器就是一个Broker
- Exchange:交换机,跟消费者、生产者、Queue交互
- type:
- fanout:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略 RoutingKey
- direct:直接匹配,通过Exchange名称+RountingKey来发送与接收消息
- topic:主题匹配订阅,主题指的是RoutingKey,RoutingKey可以采用通配符
- headers:消息头订阅,只有请求头与消息头匹配,才能接收消息,忽略RoutingKey
- default:RoutingKey就是queue的名称
//设置默认的Exchange会使用amq.direct channel.BasicPublish("", "TaskQueue", properties, bytes);
- durable 设置是否持久化
- autoDelete 设置是否自动删除
- internal 设置是否内置的,客户端无法直接发送消息到内置交换器,只能通过交换器路由到交换器
- type:
- Queue:消息队列,只与Exchage交互
高可用方案:
- 普通集群模式,只有RabbitMQ上有queue,其他的RabbitMQ通过网络去这台实例上获取queue
消费者拉取的机器如果有queue,直接返回,没有,则实例之间会产生网络传输
有queue的机器宕机了,导致其他机器都无法拉取数据 - 镜像集群模式,没个实例都有queue
缺点:性能消耗大,所有机器都要进行消息同步
没有扩展性可言,如果某个实例的queue很大,增加实例也没有用
保证消息不重复消费:
如果消费者消费完信息,这时没来的及ack就挂了,那么就会出现重复消费的问题
保证幂等性
- 内存维护一个set,从消息队列中获取一个消息,先查询是否在set里面,在,就表示已消费,不在,加入set
- 写数据库,拿唯一键去数据库查询下,不存在就写,存在就表示已消费
- 写redis,用set
- 让生产者发送消息时,每条消息加一个全局唯一的id,然后消费时,将id保存到redis中,消费时去redis里面查一下,没有再消费
-
数据库操作可以设置唯一键,防止数据重复的插入。
image.png
生产者消息丢失
-
传入消息时丢失
解决:
1. 使用RabbitMQ提供的事务功能。``` channel.txSelect();//开启事务 try{ //发送消息 }catch(Exection e){ channel.txRollback();//回滚事务 //重新提交 } ```
缺点:事务开启,就会变成同步阻塞操作,生产者会阻塞等待是否发送成功,性能较低
- 开启confirm模式,变成异步。
每次写的消息都会分配一个唯一的id,然后写入rabbitMQ,rabbitMQ会回传一个ack消息。- 如果没有处理这个消息,会回调nack,你可以进行重试。
- 如果超过一定时间没有收到消息回调,你可以进行重发。
//开启confirm channel.confirm(); //发送成功回调 public void ack(String messageId){ } // 发送失败回调 public void nack(String messageId){ //重发该消息 }
- 开启confirm模式,变成异步。
MQ消息丢失
RabbitMQ
收到消息,暂存在内存中,还没存到磁盘,然后机器挂了,导致数据丢失。
解决:消息持久化到磁盘。
分为两步:
- 创建queue的时候将其设置为持久化,这样可以保证rabbitMQ持久化queue的元数据,但是不会持久好queue里面的数据
//Exchage持久化
hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
//queue持久化
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
- 发送消息的时候将消息的deliveryMode设置为2,这样消息就会被设置为持久化方式,rabbitMq就会将消息持久化到磁盘上。
//消息持久化,在投递时制定deliveryMode=2(1是非持久化)
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
设置这两个持久化,并使用confirm机制,只有消息被持久化到磁盘,才会通知生产者ack
消费者消息丢失
RabbitMQ
消费者消费到某条消息的时候,还没处理,就挂了,rabbitMQ认为你消费了,消息丢失
解决:关闭rabbitMQ的自动ack,等处理完毕在手动ack。
//关闭自动ack
//basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(queueName, false, this);
保证消息顺序执行
一个queue,多个consumer消费,造成顺序错误,
consumer从MQ读取是有序的,处理的时间是不固定的,无法保证先读到的先完成,造成顺序错误
例如,读取顺序,a=1,a=2,a=3,执行的顺序,a=1,a=3,a=2,最终存表是a=2
一个queue,一个consumer,consumer里面进行了多线程消费,也会造成消息消费顺序错误。
- 解决1:拆分多个queue,每个queue一个consumer,consumer单线程
缺点:吞吐量降低,queue增多 - 解决2:consumer多线程处理。先将消息保存在内存队列中,将关键值相同的的数据保存到相同的消息队列中,然后分发给底层不同的worker来处理
消息积压
- 解决consumer的问题,确保其恢复消费速度。停掉所有的consumer
- 临时建立好原来10倍20倍的queue数量
- 写一个临时分发消息的consumer程序,消费积压的消息,消费之后不做耗时处理,直接均匀写入临时建好10倍的queue里面去
- 紧急征用10倍的机器来部署consumer,每一批consumer消费一个临时queue消息。相当于将queue资源和consumer扩大10倍,以之前10倍的速度去消费
- 等快消费完后,恢复原来框架,重新用原来的consumer机器来消费信息
设置了过期时间,过期就丢了
rabbitMQ是可以设置过期时间的。
解决:流量低峰期,写程序,手动查询丢失的数据,重新发送到MQ,把丢失的数据补回来
积压消息长时间没处理,MQ放不下?
- 扩容,加queue,加consumer
- 写个临时程序,丢去一部分数据,流量低峰期,补齐数据
Kafka
- Broker:一台kafka服务器就叫Broker
- Topic:把数据丢哪个队列,从哪个队列拿数据。相当于表的概念
- Partition:为了提高一个队列的吞吐量,把Topic分区,这个分区就是Partition
- Partition数据只允许追加写入磁盘,避免缓慢的随机I/O操作(它会先缓存一部分,等到足够多数据量或者一定的时间再批量写入(flush))
高可用方案
创建一个topic,会划分成很多partition,每个partition在不同的broker上
读写只从leader上读取,leader会自动同步到follower
如果broker挂了,恰好某个partition的leader在这台broker上,那会从其他的follower上选举出一个新的leader
保证消息不重复消费
如果消费者消费完信息,这时没来的及提交offset就挂了,那么就会出现重复消费的问题
保证幂等性
生产者消息丢失
kafka
- 设置acks=all,一定不会丢
- ack=0,生产者发送消息就行,不用MQ的ack确认
- ack=1,生产者的leader收到消息,保存到page cache中就返回ack
- ack=-1、ack=all,生产者的leader收到消息,保存到page cache,并存入磁盘,同步到ISR中信任的follower数量的page cache中才返回ack
MQ消息丢失
Kafka
- kafka的broker挂了,partition的leader刚好是在这个broker上,重新选举,而follower刚好有一部分数据没有从leader同步过来,而某个follower成为leader后,就会丢失一部分数据
解决:设置4个参数
- topic设置replication.factor参数:大于1,要求每个partition必须至少2个副本
- kafka服务端设置min.insync.replicas参数:大于1,一个leader至少感知一个follower还跟自己联系,确保leader挂了还有一个follower
- product端设置acks=all:每条数据,必须写入所有replica之后,才认为是成功了
- product端设置retries=MAX:一旦写入失败,就无限重拾,卡在这里
这样能保证kafka的broker端发送故障,leader转移时,不会丢失数据。
消费者消息丢失
Kafka
kafka会自动提交offset,就是消费者还没开始消费,就自动提交了offset,让kafka以为消费好了。
解决:关闭kafka的自动提交offset,处理完毕后再手动提交offset,可以保证数据不丢失。
但是还会出现重复消费的情况,需要自己保证幂等性
保证消息顺序执行
一个topic、一个partition、一个consumer,consumer内存进行了多线程消费,也会出现顺序错乱的问题
具有顺序的数据写入不同的partition,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作
造成消息没有按照顺序执行,造成数据顺序错误。
- 解决1:确保一个消息发送同一个partition以及一个topic,一个partition只有一个consumer,consumer内部消费是单线程。
- 解决2:写N个queue,然后N个现场分别消费一个内存queue
消息积压
- 解决consumer的问题,确保其恢复消费速度。停掉所有的consumer
- 临时新建一个topic,partition是原来的10倍
- 写一个临时分发消息的consumer程序,消费积压的消息,消费之后不做耗时处理,直接均匀写入临时建好10倍的partition里面去
- 紧急征用10倍的机器来部署consumer,每一批consumer消费一个临时partition消息。相当于将partition资源和consumer扩大10倍,以之前10倍的速度去消费
- 等快消费完后,恢复原来框架,重新用原来的consumer机器来消费信息
积压消息长时间没处理,MQ放不下?
- 扩容,加partition,加consumer
- 写个临时程序,丢去一部分数据,流量低峰期,补齐数据
topic分配partition的规则
-
rangeAssignor 默认分配策略
image.png -
roundRobinAssignor 轮询
image.png
会出现问题
image.png -
StickyAssignor
-
自定义分配策略
高并发场景下,如何保证收发消息的性能?
- 生产端批量发送
- broker异步刷盘
- 消费者批量拉取
如何保证消息服务的高可用和高可靠?
- broker集群,服务发现和注册,负载均衡、超时自动重试
- partition一个leader多个follower,并且partition分布在多个broker上,partition故障自动转移
- 存储采用追加日志文件模式+索引查找
如何保证服务是可以水平任意扩展的?
broker集群,如果新的broker,只需要注册到注册中心即可。
如何保证消息存储也是水平可扩展的?
利用分片存储技术,partition分布在多个broker上
各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?
一个topic对应多个partition,一个partition对应一个consumer
网友评论