美文网首页
队列相关知识整理

队列相关知识整理

作者: 10xjzheng | 来源:发表于2020-04-28 15:26 被阅读0次

1.概述

1.1 队列的使用场景

  • 削峰:并发情况下,数据库的压力非常大,此时可以将一部分非必需即可返回结果的操作转为队列来消费,达到对系统的缓冲效果。

  • 解耦:A功能需要触发B,C,D功能的一些行为,如果直接在A模块的代码调用B,C,D模块的方法,代码就会慢慢变得混乱不堪,此时是用队列分发来触发各自的行为,达到解耦。

  • 异步:比如公众号开发是,注册的时候需要给用户推送通知,如果用同步调用微信接口的方式,可能会因为网络问题或者微信接口方面的问题导致流程阻塞,这时候改成异步的,可以极高的提高响应速度。

1.2 可用于队列设计的有以下各类中间件:

  • Redis
  • Kafka
  • MQ类:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ

2.分别阐述各种中间件的特点

2.1 Redis

redis的本身定位是缓存数据库,并非用于队列。但它提供的数据类型 list 和发布/订阅模式,可以用于队列的设计。

  • lpush/brpop : 基于list的队列设计有个缺陷就是,所有队列相关的失败重试,一致性等问题都由业务方来保证,无法从应用层面来提供解决方案,redis宕机没用持久化的话,会数据丢失,当然集群和持久化设置可避免。

  • 发布/订阅:此方式是用于广播的,即一次发布,关注该topic的多个订阅者都收到,如若用于队列,即只能有一个订阅者,多个也没用,只是重复消费,另外此方式并没保存发布数据,如若网络抖动或者redis宕机会产生数据丢失,不可忍受。

缺点:综上,可靠性没保证,且只提高数据结构,逻辑需要自己实现。

redis有另外一个产品——disque,专门作为队列使用的。

2.2 Kafka

kafka的最大的优势就是快,即使普通的服务器,kafka也能轻松支持每秒百万级的写入效率。这么快的原因是:
1)写入方式:以顺序IO代替随机IO, 当然只是这个是不够的,很多软件都已经懂得这么做了,还有一个重要的原因是:它不是实时写入硬盘,充分利用了分页存储来利用内存提高效率。这里采用的是mmap,即内存映射文件,完成映射之后你对物理内存的操作,操作系统都会在适当的时候同步到硬盘上。当然这里也有一个缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。
2)读取方式:基于sendfile实现Zero Copy。以前需要在内核缓存区和用户缓冲区的来来回回4次copy,被优化到只需要2次,即只需要将数据从磁盘读入内核缓冲区,在调用sendfile直接将数据copy到网卡缓存这两步。 还有一个原因,kafka实现了端对端的批量压缩,目前Kafka支持GZIP和Snappy压缩协议。

kafka除了速度快,还支持分组,可以多个生产者,多个消费者,支持负载均衡,支持事务。

缺点:

  • Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
  • 使用短轮询方式,实时性取决于轮询间隔时间;
  • 消费失败不支持重试;
  • 支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
  • 社区更新较慢;
  • 运维难度大,它不仅仅需要关注整个集群之内像broker、controller类似的角色,还需要关注其所依赖的一些产品像ZooKeeper等。你需要考虑一些集群产生的问题,比如“脑裂”,ZooKeeper不可用等问题。然一旦涉及集群,这个问题就存在,并非kafka特有。还有磁盘的问题,kafka的消息不是消费完就没了,而是根据的你的设置多久过期,这样就很可能产生磁盘数据量堆积问题。当

2.3 MQ类:RocketMQ && RabbitMQ && ActiveMQ && ZeroMQ

MQ类本身就是为队列而生的。队列该有的,这些MQ类产品大多数都有,只是有一些功能上的区别和性能的差异。

这里主要介绍一下RocketMQ 。
根据GitHub上Apache对它的介绍,有以下功能:

  • 发布/订阅消息传递模型
  • 财务级交易消息
  • 各种跨语言客户端,例如Java,C / C ++,Python,Go
  • 可插拔的传输协议,例如TCP,SSL,AIO
  • 内置的消息跟踪功能,还支持开放式跟踪
  • 多功能的大数据和流生态系统集成
  • 按时间或偏移量追溯消息
  • 可靠的FIFO和严格的有序消息传递在同一队列中
  • 高效的推拉消费模型
  • 单个队列中的百万级消息累积容量
  • 多种消息传递协议,例如JMS和OpenMessaging
  • 灵活的分布式横向扩展部署架构
  • 快如闪电的批量消息交换系统
  • 各种消息过滤器机制,例如SQL和Tag
  • 用于隔离测试和云隔离群集的Docker映像
  • 功能丰富的管理仪表板,用于配置,指标和监视
  • 认证与授权

RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。

RocketMQ什么都是集群部署的,这是他吞吐量大,高可用的原因之一,集群的模式也很花哨,可以支持多master 模式、多master多slave异步复制模式、多 master多slave同步双写模式。

支持负载均衡,支持集群,支持订阅形式和消息分发,支持顺序消息,支持消息确认,支持指定时间点的回溯,支持消息重试。

性能方面也仅次于kafka,zeroMQ。

本身就是为队列定制的,缺点就是对轻量级项目来说太复杂了。最好就是有公司统一的一套队列实现方案,提供公司内的各个团队使用,并支持各种语言栈的调用。

3.队列设计

考虑到成本、运维复杂度、设计的灵活性/自由度、原有项目语言(php)的限制,采用redis来实现。

3.1 常用队列的类型

image.png

3.2 队列的消费模型

3.2.1 采用php常驻进程模式

通常队列消费,我们是每个队列起一个或多个消费进程,网上有看到这样的一个延时队列的设计模型:


image.png

这里是用到了php的一个pcntl 进程控制的扩展,由dq-mster: 主进程,负责管理子进程的创建,销毁,回收以及信号通知。
对于我们来说,这种方案有以下缺点:

  • 所有的队列会在一个服务里面消费,没有区分队列的类型,如果某个队列阻塞,只是单纯的加子进程,没有针对性,当然这个问题是有办法解决的;
  • 常驻进程会由内存的回收和数据库连接断连的问题(长时间没有数据消费的话,数据库连接会超时失效,之前的遇到过),断连的问题也不难解决;
  • 修改到某些代码需要手动取重启进程,常驻进程的代码已经是加载到内存里面的了,修改代码需要重启才生效。

3.2.2 采用yii2-queue

yii2-queue 支持了不少驱动,File, Mysql, Redis, RabbitMq等等。
这里只分析redis驱动。
基于redis的yii2-queue只实现了延时队列和FIFO队列,延时队列采用有序集合结合列表来实现。
话不多说,撸源码:

  • 先看入列:
 protected function pushMessage($message, $ttr, $delay, $priority)
    {
        // Redis驱动不支持作业优先级 如果有传值则抛出错误
        if ($priority !== null) {
            throw new NotSupportedException('Job priority is not supported in the driver.');
        }
        // 取到上条 message_id + 1 ,作为本条任务的ID
        $id = $this->redis->incr("$this->channel.message_id");
        // 以新ID将本条任务格存储到hash表 messages 中
        $this->redis->hset("$this->channel.messages", $id, "$ttr;$message");
        if (!$delay) {
            // 如果不需要等待执行 则将任务ID推到hash表 waiting 中
            $this->redis->lpush("$this->channel.waiting", $id);
        } else {
            // 如果需要等待执行 则推送到有序集合 delayed 中
            $this->redis->zadd("$this->channel.delayed", time() + $delay, $id);
        }
        // 返回任务ID
        return $id;
    }
  • 再看出列:
 /**
     * @param int $wait timeout
     * @return array|null payload
     */
    protected function reserve($wait)
    {
        // 将延迟和保留的作业移动到等待列表中 并锁定一秒
        if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) {
            $this->moveExpired("$this->channel.delayed");
            $this->moveExpired("$this->channel.reserved");
        }
        // Find a new waiting message
        $id = null;
        if (!$wait) {
            // 从等待列表取1条任务ID待执行
            $id = $this->redis->rpop("$this->channel.waiting");
        } elseif ($result = $this->redis->brpop("$this->channel.waiting", $wait)) {
            $id = $result[1];
        }
        if (!$id) {
            return null;
        }
        // 根据任务ID取出任务
        $payload = $this->redis->hget("$this->channel.messages", $id);
        list($ttr, $message) = explode(';', $payload, 2);
        // 加入到作业列表
        $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id);
        $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1);
        return [$id, $message, $ttr, $attempt];
    }
  • 再看执行进程
    yii2-queue提供了两种方式,一种是crontab机制:
 /**
     * Runs all jobs from redis-queue.
     * It can be used as cron job.
     *
     * @return null|int exit code.
     */
    public function actionRun()
    {
        return $this->queue->run(false);
    }

一种是常驻进程机制:

 /**
     * Listens redis-queue and runs new jobs.
     * It can be used as daemon process.
     *
     * @param int $timeout number of seconds to wait a job.
     * @throws Exception when params are invalid.
     * @return null|int exit code.
     */
    public function actionListen($timeout = 3)
    {
        if (!is_numeric($timeout)) {
            throw new Exception('Timeout must be numeric.');
        }
        if ($timeout < 1) {
            throw new Exception('Timeout must be greater than zero.');
        }

        return $this->queue->run(true, $timeout);
    }

本质上都是调用run方法,只是参数不同:

  /**
     * Listens queue and runs each job.
     *
     * @param bool $repeat whether to continue listening when queue is empty.
     * @param int $timeout number of seconds to wait for next message.
     * @return null|int exit code.
     * @internal for worker command only.
     * @since 2.0.2
     */
    public function run($repeat, $timeout = 0)
    {
        return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
            while ($canContinue()) {
                if (($payload = $this->reserve($timeout)) !== null) {
                    list($id, $message, $ttr, $attempt) = $payload;
                    if ($this->handleMessage($id, $message, $ttr, $attempt)) {
                        $this->delete($id);
                    }
                } elseif (!$repeat) {
                    break;
                }
            }
        });
    }

yii2-queue的缺点在于:

  • 多个队列分组需要修改配置文件,公司的配置文件修改比较麻烦,需要运维手动去每台机器去改;
  • 使用常驻进程依然有前述的问题;

3.2.3 采用php+crontab实现

采用crontab配置+php 进程实现:


image.png

crontab定时10分钟执行一次,php进程只给10分钟生命周期,即php进程只运行十分钟便自行退出。我们有crontab的配置工具,所以用来实现这个模型也比较方便,如果某些进程的队列阻塞,只需要在crontab配置多一些进程即可,例如上图的queueName1。

这个方案可以有效解决上面常驻进程的问题,针对队列名称消费可以有效针对队列增加进程,自动退出不存在内存没回收的问题,10分钟不存在数据库断连的问题,代码更新也只是10分钟内还是走旧代码。

参考yii2-queue,队列Job的状态转换如下:


image.png
  • delay 延时态,即还没到设置的时间执行;
  • waiting 等待态,等待被消费者取出;
  • reserved 就绪态,已取出等待执行;
  • done 完成态,已经被执行,实际上这个状态基本不存在,执行成功就会被删掉。

相关文章

  • 队列相关知识整理

    1.概述 1.1 队列的使用场景 削峰:并发情况下,数据库的压力非常大,此时可以将一部分非必需即可返回结果的操作转...

  • SpringBoot整合RabbitMQ——消息的发送和接收

    上篇博文我们整理了RabbitMQ的交换机、队列以及路由绑定等相关知识,并且了解了RabbitMQ是如何发送消息给...

  • GCD一些应用的知识点

    本篇各种零散的点,是自己在看教学视频时整理的知识点。 GCD队列 主队列 串行队列 并行队列 在串行队列里disp...

  • C语言数据结构——线性表链式循环队列(链表实现方式)

    队列相关知识及操作请参看上一章 C语言数据结构——线性表循环队列(动态数组实现方式) 一、链式队列 链式队列 : ...

  • podspec

    podSpec文件相关知识整理

  • HTTP相关知识整理

    来源于慕课网进击node.js基础 什么是HTTP 计算机之间的通信协议,通常流程为 HTTP客户端发起请求,创建...

  • http相关知识整理

    网络七层模型和tcp/ip四层模型? 常用的框架有哪些?okhttp,volley,retrofit。区别?各种框...

  • TestFlight 相关知识整理

    https://help.apple.com/app-store-connect/#/devdc42b26b8

  • 鱼粉相关知识整理

    除了越南巴沙鱼粉(淡水鱼粉)外,其他都是海水鱼粉。现在做的都是属于红鱼粉,白鱼粉(俄罗斯和美国)只有鳗鱼料用,...

  • Websocket相关知识整理

    Websocket 协议格式说明 最近整理项目中关于websocket中的部分,由于之前代码中websocket的...

网友评论

      本文标题:队列相关知识整理

      本文链接:https://www.haomeiwen.com/subject/uhoewhtx.html