1.概述
1.1 队列的使用场景
-
削峰:并发情况下,数据库的压力非常大,此时可以将一部分非必需即可返回结果的操作转为队列来消费,达到对系统的缓冲效果。
-
解耦:A功能需要触发B,C,D功能的一些行为,如果直接在A模块的代码调用B,C,D模块的方法,代码就会慢慢变得混乱不堪,此时是用队列分发来触发各自的行为,达到解耦。
-
异步:比如公众号开发是,注册的时候需要给用户推送通知,如果用同步调用微信接口的方式,可能会因为网络问题或者微信接口方面的问题导致流程阻塞,这时候改成异步的,可以极高的提高响应速度。
1.2 可用于队列设计的有以下各类中间件:
- Redis
- Kafka
- MQ类:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ
2.分别阐述各种中间件的特点
2.1 Redis
redis的本身定位是缓存数据库,并非用于队列。但它提供的数据类型 list 和发布/订阅模式,可以用于队列的设计。
-
lpush/brpop : 基于list的队列设计有个缺陷就是,所有队列相关的失败重试,一致性等问题都由业务方来保证,无法从应用层面来提供解决方案,redis宕机没用持久化的话,会数据丢失,当然集群和持久化设置可避免。
-
发布/订阅:此方式是用于广播的,即一次发布,关注该topic的多个订阅者都收到,如若用于队列,即只能有一个订阅者,多个也没用,只是重复消费,另外此方式并没保存发布数据,如若网络抖动或者redis宕机会产生数据丢失,不可忍受。
缺点:综上,可靠性没保证,且只提高数据结构,逻辑需要自己实现。
redis有另外一个产品——disque,专门作为队列使用的。
2.2 Kafka
kafka的最大的优势就是快,即使普通的服务器,kafka也能轻松支持每秒百万级的写入效率。这么快的原因是:
1)写入方式:以顺序IO代替随机IO, 当然只是这个是不够的,很多软件都已经懂得这么做了,还有一个重要的原因是:它不是实时写入硬盘,充分利用了分页存储来利用内存提高效率。这里采用的是mmap,即内存映射文件,完成映射之后你对物理内存的操作,操作系统都会在适当的时候同步到硬盘上。当然这里也有一个缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。
2)读取方式:基于sendfile实现Zero Copy。以前需要在内核缓存区和用户缓冲区的来来回回4次copy,被优化到只需要2次,即只需要将数据从磁盘读入内核缓冲区,在调用sendfile直接将数据copy到网卡缓存这两步。 还有一个原因,kafka实现了端对端的批量压缩,目前Kafka支持GZIP和Snappy压缩协议。
kafka除了速度快,还支持分组,可以多个生产者,多个消费者,支持负载均衡,支持事务。
缺点:
- Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
- 使用短轮询方式,实时性取决于轮询间隔时间;
- 消费失败不支持重试;
- 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
- 社区更新较慢;
- 运维难度大,它不仅仅需要关注整个集群之内像broker、controller类似的角色,还需要关注其所依赖的一些产品像ZooKeeper等。你需要考虑一些集群产生的问题,比如“脑裂”,ZooKeeper不可用等问题。然一旦涉及集群,这个问题就存在,并非kafka特有。还有磁盘的问题,kafka的消息不是消费完就没了,而是根据的你的设置多久过期,这样就很可能产生磁盘数据量堆积问题。当
2.3 MQ类:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ
MQ类本身就是为队列而生的。队列该有的,这些MQ类产品大多数都有,只是有一些功能上的区别和性能的差异。
这里主要介绍一下RocketMQ 。
根据GitHub上Apache对它的介绍,有以下功能:
- 发布/订阅消息传递模型
- 财务级交易消息
- 各种跨语言客户端,例如Java,C / C ++,Python,Go
- 可插拔的传输协议,例如TCP,SSL,AIO
- 内置的消息跟踪功能,还支持开放式跟踪
- 多功能的大数据和流生态系统集成
- 按时间或偏移量追溯消息
- 可靠的FIFO和严格的有序消息传递在同一队列中
- 高效的推拉消费模型
- 单个队列中的百万级消息累积容量
- 多种消息传递协议,例如JMS和OpenMessaging
- 灵活的分布式横向扩展部署架构
- 快如闪电的批量消息交换系统
- 各种消息过滤器机制,例如SQL和Tag
- 用于隔离测试和云隔离群集的Docker映像
- 功能丰富的管理仪表板,用于配置,指标和监视
- 认证与授权
RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
RocketMQ什么都是集群部署的,这是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave异步复制模式、多 master多slave同步双写模式。
支持负载均衡,支持集群,支持订阅形式和消息分发,支持顺序消息,支持消息确认,支持指定时间点的回溯,支持消息重试。
性能方面也仅次于kafka,zeroMQ。
本身就是为队列定制的,缺点就是对轻量级项目来说太复杂了。最好就是有公司统一的一套队列实现方案,提供公司内的各个团队使用,并支持各种语言栈的调用。
3.队列设计
考虑到成本、运维复杂度、设计的灵活性/自由度、原有项目语言(php)的限制,采用redis来实现。
3.1 常用队列的类型
![](https://img.haomeiwen.com/i3596546/44f8f11d5af019a6.png)
3.2 队列的消费模型
3.2.1 采用php常驻进程模式
通常队列消费,我们是每个队列起一个或多个消费进程,网上有看到这样的一个延时队列的设计模型:
![](https://img.haomeiwen.com/i3596546/3c50837586dd708a.png)
这里是用到了php的一个pcntl 进程控制的扩展,由dq-mster: 主进程,负责管理子进程的创建,销毁,回收以及信号通知。
对于我们来说,这种方案有以下缺点:
- 所有的队列会在一个服务里面消费,没有区分队列的类型,如果某个队列阻塞,只是单纯的加子进程,没有针对性,当然这个问题是有办法解决的;
- 常驻进程会由内存的回收和数据库连接断连的问题(长时间没有数据消费的话,数据库连接会超时失效,之前的遇到过),断连的问题也不难解决;
- 修改到某些代码需要手动取重启进程,常驻进程的代码已经是加载到内存里面的了,修改代码需要重启才生效。
3.2.2 采用yii2-queue
yii2-queue 支持了不少驱动,File, Mysql, Redis, RabbitMq等等。
这里只分析redis驱动。
基于redis的yii2-queue只实现了延时队列和FIFO队列,延时队列采用有序集合结合列表来实现。
话不多说,撸源码:
- 先看入列:
protected function pushMessage($message, $ttr, $delay, $priority)
{
// Redis驱动不支持作业优先级 如果有传值则抛出错误
if ($priority !== null) {
throw new NotSupportedException('Job priority is not supported in the driver.');
}
// 取到上条 message_id + 1 ,作为本条任务的ID
$id = $this->redis->incr("$this->channel.message_id");
// 以新ID将本条任务格存储到hash表 messages 中
$this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
if (!$delay) {
// 如果不需要等待执行 则将任务ID推到hash表 waiting 中
$this->redis->lpush("$this->channel.waiting", $id);
} else {
// 如果需要等待执行 则推送到有序集合 delayed 中
$this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
}
// 返回任务ID
return $id;
}
- 再看出列:
/**
* @param int $wait timeout
* @return array|null payload
*/
protected function reserve($wait)
{
// 将延迟和保留的作业移动到等待列表中 并锁定一秒
if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
$this->moveExpired("$this->channel.delayed");
$this->moveExpired("$this->channel.reserved");
}
// Find a new waiting message
$id = null;
if (!$wait) {
// 从等待列表取1条任务ID待执行
$id = $this->redis->rpop("$this->channel.waiting");
} elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
$id = $result[1];
}
if (!$id) {
return null;
}
// 根据任务ID取出任务
$payload = $this->redis->hget("$this->channel.messages", $id);
list($ttr, $message) = explode(';', $payload, 2);
// 加入到作业列表
$this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
$attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
return [$id, $message, $ttr, $attempt];
}
- 再看执行进程
yii2-queue提供了两种方式,一种是crontab机制:
/**
* Runs all jobs from redis-queue.
* It can be used as cron job.
*
* @return null|int exit code.
*/
public function actionRun()
{
return $this->queue->run(false);
}
一种是常驻进程机制:
/**
* Listens redis-queue and runs new jobs.
* It can be used as daemon process.
*
* @param int $timeout number of seconds to wait a job.
* @throws Exception when params are invalid.
* @return null|int exit code.
*/
public function actionListen($timeout = 3)
{
if (!is_numeric($timeout)) {
throw new Exception('Timeout must be numeric.');
}
if ($timeout < 1) {
throw new Exception('Timeout must be greater than zero.');
}
return $this->queue->run(true, $timeout);
}
本质上都是调用run方法,只是参数不同:
/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @param int $timeout number of seconds to wait for next message.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
*/
public function run($repeat, $timeout = 0)
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
if (($payload = $this->reserve($timeout)) !== null) {
list($id, $message, $ttr, $attempt) = $payload;
if ($this->handleMessage($id, $message, $ttr, $attempt)) {
$this->delete($id);
}
} elseif (!$repeat) {
break;
}
}
});
}
yii2-queue的缺点在于:
- 多个队列分组需要修改配置文件,公司的配置文件修改比较麻烦,需要运维手动去每台机器去改;
- 使用常驻进程依然有前述的问题;
3.2.3 采用php+crontab实现
采用crontab配置+php 进程实现:
![](https://img.haomeiwen.com/i3596546/13e55b2c23d44274.png)
crontab定时10分钟执行一次,php进程只给10分钟生命周期,即php进程只运行十分钟便自行退出。我们有crontab的配置工具,所以用来实现这个模型也比较方便,如果某些进程的队列阻塞,只需要在crontab配置多一些进程即可,例如上图的queueName1。
这个方案可以有效解决上面常驻进程的问题,针对队列名称消费可以有效针对队列增加进程,自动退出不存在内存没回收的问题,10分钟不存在数据库断连的问题,代码更新也只是10分钟内还是走旧代码。
参考yii2-queue,队列Job的状态转换如下:
![](https://img.haomeiwen.com/i3596546/3871a317997bbdc2.png)
- delay 延时态,即还没到设置的时间执行;
- waiting 等待态,等待被消费者取出;
- reserved 就绪态,已取出等待执行;
- done 完成态,已经被执行,实际上这个状态基本不存在,执行成功就会被删掉。
网友评论