美文网首页
Celery 源码学习(二)多进程模型

Celery 源码学习(二)多进程模型

作者: 星丶雲 | 来源:发表于2020-08-24 16:23 被阅读0次

    如前文 Celery 源码学习(一)架构分析 所言,celery 能保证高吞吐量和高性能,主要依托两个方面:1. 多进程,2.事件驱动。

    在 celery 中,多进程主要指的是一个主进程负责调度,然后多个从进程负责消费具体的任务。也就是前文中我说的调度器(Main process)和工作进程(Worker processes)。我们今天主要就是看来一下 celery 这部分的源码。

    我先说明下版本,celery 是 4.2.0,broker 和 result backend 都是用的 redis。

    当我们在命令行执行一个常见的启动命令:

    # 在包含任务文件的目录下
    ➜ celery -A  tasks.example  worker  -c  2 --l info
    

    然后我们用 linux 的 ps 命令查看此时的相关进程

    # 在包含任务文件的目录下
    ➜ ps -ef | grep -E "celery|PID"
      UID  PID   PPID   C  STIME  TTY        TIME    CMD
      501  1344   331   0  6:27PM ??         0:00.98 celery -A tasks.example worker -c 2 --l info
      501  1348  1344   0  6:27PM ??         0:00.01 celery -A tasks.example worker -c 2 --l info
      501  1349  1344   0  6:27PM ??         0:00.01 celery -A tasks.example worker -c 2 --l info
    

    可以清楚的看到 celery 为我们启动了 3 个进程,PID 分别为 1344,1348,1349。

    我们可以做一个明显的推断,PID 为 1344 的进程,应当是主进程,也就是调度器。
    PID 为 1348,1349 的 PPID 为 1344,说明这两个进程就是主进程派生(fork)出来的从进程。

    常见的手段多进程不是 celery 原创的,对于任何一个大型项目,基于主从的多进程模式都是十分常见的,这是一套十分成熟的工业化做法。

    为什么这么做?
    其实就是如前文所说,为了充分发挥多核计算的优势,并在一定程度上提升程序的并发能力,缓解 IO 的压力。

    怎么做?
    业内的常见方案叫做 prefork,也就是预生成。预生成指的是,主进程在执行具体的业务逻辑之前,先提前 fork 出来一堆子进程,并把他们存起来集中管理,形成一个进程池。平常的时候这些子进程都是 休眠(asleep) 状态,只有当主进程派发任务的时候,会唤醒(awake)其中的一个子进程,并通过进程间通讯的手段,向子进程传输相应的任务数据。

    我们先假设一下,如果不使用预生成,会有什么问题?
    每当一个任务到来,主进程都会去临时产生一个子进程,复制一份上下文数据,然后传输任务给这个子进程。当子进程执行后,主进程再去销毁掉这个子进程的所有上下文数据。频繁的对内存数据进行操作,上下文切换,会导致系统的性能很差。所以,人们基本都会使用预生成的方式。

    celery 源码结构

    .
    ./bootsteps.py   # 流程控制相关的数据结构
    ./signals.py     # 基于各组件之间观察者模式的数据结构
    ./app            # 各种基础组件,粒度较细
    ./platforms.py
    ./bin            # celery 命令行的启动命令需要用到的模块引导文件
    ./security       
    ./local.py       
    ./backends       # 存放任务结果的相关数据结构
    ./__init__.py
    ./five.py        
    ./utils
    ./contrib
    ./result.py
    ./concurrency    # 并发模式的相关数据结构
    ./_state.py      
    ./task           # 任务的数据结构
    ./exceptions.py 
    ./fixups
    ./worker         # 消费者相关的数据结构
    ./events         # 集群内监听事件的相关数据结构
    ./states.py      
    ./apps           # 按功能拆分出来的三个基础模块的数据结构,分别为 worker,multi,beat
    ./loaders
    ./__main__.py    # 程序主入口
    ./beat.py
    ./canvas.py
    ./schedules.py
    

    具体实现因为我们的主角是 celery,我们会更侧重这一套流程应该如何用 python 去实现。因为 celery 代码很多,有各种各样的功能组件混杂其中,所以我只会挑取我认为有必要讲的源码实现。对于整个流程,读者有兴趣的话可以自行去研究。

    当我们在命令行敲下 :

    celery -A tasks.example worker -c 2 --l info
    

    首先,会通过一系列的命令行解析的方法,提取出我们上面那个命令需要运行的模块 (即Worker 模块,具体流程因为过于复杂,就不展开讲了),解析到一个 Worker 的数据结构,并创建对应的实例,其中我们主要关注 start 方法。

    # celery/apps/worker.py
    
    # ...省略
    
    class WorkController(object):
        """Unmanaged worker instance."""
    
    # ...省略
    
        class Blueprint(bootsteps.Blueprint):
            """Worker bootstep blueprint."""
            # 这是默认的 worker DAG 流程,会根据传入的命令行参数不同有不同的执行流程
            name = 'Worker'
            default_steps = {
                'celery.worker.components:Hub',
                'celery.worker.components:Pool',
                'celery.worker.components:Beat',
                'celery.worker.components:Timer',
                'celery.worker.components:StateDB',
                'celery.worker.components:Consumer',
                'celery.worker.autoscale:WorkerComponent',
            }
    
    # ... 省略
    class Worker(WorkController):
        """Worker as a program."""
    
    # ... 省略
    
        def start(self):
            try:
                self.blueprint.start(self)   # 重点关注!
            except WorkerTerminate:
                self.terminate()
            except Exception as exc:
                logger.critical('Unrecoverable error: %r', exc, exc_info=True)
                self.stop(exitcode=EX_FAILURE)
            except SystemExit as exc:
                self.stop(exitcode=exc.code)
            except KeyboardInterrupt:
                self.stop(exitcode=EX_FAILURE)
    
    # ... 省略
    

    Worker 的 start 方法中,其实就是执行了一个 self.blueprint 的 start 方法,这里面的 blueprint,是 celery 自己实现的一个 有向无环图(DAG)的数据结构,说起来复杂,其实功能简单描述下就是:根据命令行传入的不同参数,初始化不同的组件(step),并执行这些组件的初始化方法。其实就是一个对流程控制的面向对象的封装。

    我们的这个启动命令产生的 DAG,会按顺序加载三个组件,Hub,Pool,Consumer(这些组件的数据结构可以在 celery/worker/components.py 找到)。Consumer 和 Hub 是我之后会详细讲的,我们这次主要讲一下 Pool 这个组件。这个组件基本囊括了 celery 多进程 prefork 的实现。

    self.blueprint.start(self) 中,这个 blueprint 的数据结构定义如下,其中我们重点关注 start 方法:

    # celery/bootsteps.py
    
    # ... 省略
    
    class Blueprint(object):
        """Blueprint containing bootsteps that can be applied to objects.
    
        Arguments:
            steps Sequence[Union[str, Step]]: List of steps.
            name (str): Set explicit name for this blueprint.
            on_start (Callable): Optional callback applied after blueprint start.
            on_close (Callable): Optional callback applied before blueprint close.
            on_stopped (Callable): Optional callback applied after
                blueprint stopped.
        """
    
        GraphFormatter = StepFormatter
    
        name = None
        state = None
        started = 0
        default_steps = set()
        state_to_name = {
            0: 'initializing',
            RUN: 'running',
            CLOSE: 'closing',
            TERMINATE: 'terminating',
        }
    
        def __init__(self, steps=None, name=None,
                     on_start=None, on_close=None, on_stopped=None):
            self.name = name or self.name or qualname(type(self))
            self.types = set(steps or []) | set(self.default_steps)
            self.on_start = on_start
            self.on_close = on_close
            self.on_stopped = on_stopped
            self.shutdown_complete = Event()
            self.steps = {}
    
        def start(self, parent):  # 重点关注!
            self.state = RUN
            if self.on_start:
                self.on_start()   
            for i, step in enumerate(s for s in parent.steps if s is not None):
                self._debug('Starting %s', step.alias)
                self.started = i + 1
                step.start(parent)
                logger.debug('^-- substep ok')
    
    # ...省略
    

    start 方法中的 parent.steps,其实就是 Hub,Pool,Consumer 这三个组件的实例组成的列表。我们可以看到,其实就是依次调用这三个组件实例的 start 方法(。。。celery 的作者特别喜欢把方法名叫做 start)我们直接去 components.py 文件中查看 class Pool(bootsteps.StartStopStep) 组件的源码,会发现这个 start 方法还是藏的很隐蔽的。

    因为不是很直观,且过程非常曲折,我这里就不详细描述具体过程了,直接说结论:这个 start 方法最终会调用 celery/concurrency/prefork.py中的TaskPool 类下的 on_start 方法。我们可以看下这个 on_start 方法:

    # celery/concurrency/prefork.py
    # ...省略
    
    class TaskPool(BasePool):
        """Multiprocessing Pool implementation."""
    
        Pool = AsynPool
        BlockingPool = BlockingPool
    
        uses_semaphore = True
        write_stats = None
    
        def on_start(self):
            forking_enable(self.forking_enable)
            Pool = (self.BlockingPool if self.options.get('threads', True)
                    else self.Pool)
    
            # 重点关注下面这个!
            P = self._pool = Pool(processes=self.limit,
                                  initializer=process_initializer,
                                  on_process_exit=process_destructor,
                                  enable_timeouts=True,
                                  synack=False,
                                  **self.options)
    
            # Create proxy methods
            self.on_apply = P.apply_async
            self.maintain_pool = P.maintain_pool
            self.terminate_job = P.terminate_job
            self.grow = P.grow
            self.shrink = P.shrink
            self.flush = getattr(P, 'flush', None)  # FIXME add to billiard
    
    # ...省略
    
    

    到了这里我们就清楚多了,主要是执行了 Pool 的实例化。其实这个实例化就是 prefork 的具体实现。这个 Pool 其实就是 AsyncPool,源码在下面:

    # celery/concurrency/asynpool.py
    # ...省略
    class AsynPool(_pool.Pool):
        """AsyncIO Pool (no threads)."""
    
        ResultHandler = ResultHandler
        Worker = Worker
    
        def WorkerProcess(self, worker):
            worker = super(AsynPool, self).WorkerProcess(worker)
            worker.dead = False
            return worker
    
        def __init__(self, processes=None, synack=False,
                     sched_strategy=None, *args, **kwargs):
            self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
                                                       sched_strategy)
            processes = self.cpu_count() if processes is None else processes
            self.synack = synack
            # create queue-pairs for all our processes in advance.
            # 重点!创建多个读写的管道
            self._queues = {
                self.create_process_queues(): None for _ in range(processes)
            }
    
            # 省略
    
            super(AsynPool, self).__init__(processes, *args, **kwargs) # 重点
    
            for proc in self._pool:   # 重点
                # create initial mappings, these will be updated
                # as processes are recycled, or found lost elsewhere.
                self._fileno_to_outq[proc.outqR_fd] = proc
                self._fileno_to_synq[proc.synqW_fd] = proc
    
            # 省略
    
    # ... 省略
    

    看到这里,可能有的小伙伴就懵了,说好的 fork 呢?说好的 进程间通讯呢?

    别急,其实 fork 和进程间通讯都藏在上面那一坨代码里了processes = self.cpu_count() if processes is None else processes 这个 processes 的值,就是需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值,在本例子中,为 2。

    self.create_process_queues(): None for _ in range(processes) 其实就是创建出来了一堆读和写的管道,具体逻辑在 billiard/connection.py 文件中,因为逻辑较复杂,所以本文就省略了。

    根据流向的不同和主进程与子进程的不同,之后会分别关闭对应的的一端的管道,比如父进程把写关闭,子进程就把读关闭。并会用抽象的数据结构进行封装以便于管理。这个数据结构的实例用来为主进程和即将 fork 的子进程提供双向的数据传输。

    同样的,会根据子进程的数量创建出多个管道实例来。其中有个比较奇怪的一点就是,我在父进程关闭了一端的管道,fork 了之后,结果在子进程还是可以用这一端。

    这个也许是 fork 的子进程不继承父进程的管道关闭状态?其中最重要的方法是 super(AsynPool, self).init(processes, *args, **kwargs) 中执行的 self._create_worker_process(i),这里面就是 fork 的关键所在。相关源码如下:

    # 这个类在 celery 的依赖库 billiard 中的 pool.py 文件中
    # billiard/pool.py
    
    class Pool(object):
        '''
        Class which supports an async version of applying functions to arguments.
        '''
        # 省略
    
        def __init__(self, processes=None, initializer=None, initargs=(),
                     maxtasksperchild=None, timeout=None, soft_timeout=None,
                     lost_worker_timeout=None,
                     max_restarts=None, max_restart_freq=1,
                     on_process_up=None,
                     on_process_down=None,
                     on_timeout_set=None,
                     on_timeout_cancel=None,
                     threads=True,
                     semaphore=None,
                     putlocks=False,
                     allow_restart=False,
                     synack=False,
                     on_process_exit=None,
                     context=None,
                     max_memory_per_child=None,
                     enable_timeouts=False,
                     **kwargs):
            
            # 省略
            # 重点关注!
            for i in range(self._processes):   #cityblack !important
                self._create_worker_process(i)
    
        def _create_worker_process(self, i):
            sentinel = self._ctx.Event() if self.allow_restart else None
            inq, outq, synq = self.get_process_queues()
            w = self.WorkerProcess(self.Worker(
                inq, outq, synq, self._initializer, self._initargs,
                self._maxtasksperchild, sentinel, self._on_process_exit,
                # Need to handle all signals if using the ipc semaphore,
                # to make sure the semaphore is released.
                sigprotection=self.threads,
                wrap_exception=self._wrap_exception,
                max_memory_per_child=self._max_memory_per_child,
            ))
            self._pool.append(w)
            self._process_register_queues(w, (inq, outq, synq))
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.index = i
            w.start()   # 重点关注!
            self._poolctrl[w.pid] = sentinel
            if self.on_process_up:
                self.on_process_up(w)
            return w
    

    inq, outq, synq = self.get_process_queues() 拿到的是一个读和写的管道的抽象对象。这个管道是之前预先创建好的(就是上面 self.create_process_queues() 创建的)。

    主要是给即将 fork 的子进程用的,子进程会监听这管道数据结构抽象实例中的读事件,还可以从写管道写数据。

    w,也就是 self.WorkerProcess 的实例,其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发。w.start() 中包含具体的 fork 过程,相关源码在:

    #billiard/process.py
    # 省略
    class BaseProcess(object):
        # 省略
        def start(self):
            '''
            Start child process
            '''
            assert self._popen is None, 'cannot start a process twice'
            assert self._parent_pid == os.getpid(), \
                'can only start a process object created by current process'
            _cleanup()
            self._popen = self._Popen(self)   # 重点关注!
            self._sentinel = self._popen.sentinel
            _children.add(self)
    # 省略
    

    我们看到其中主要是 self._popen = self._Popen(self) 比较重要,我们看下 Popen 的源码:\

    # billiard/popen_fork.py
    # 省略
    class Popen(object):
        method = 'fork'
        sentinel = None
    
        def __init__(self, process_obj):
            sys.stdout.flush()
            sys.stderr.flush()
            self.returncode = None
            self._launch(process_obj)
    
        # 省略
        def _launch(self, process_obj):
            code = 1
            parent_r, child_w = os.pipe()
            self.pid = os.fork()
            if self.pid == 0:
                try:
                    os.close(parent_r)
                    if 'random' in sys.modules:
                        import random
                        random.seed()
                    code = process_obj._bootstrap()
                finally:
                    os._exit(code)
            else:
                os.close(child_w)
                self.sentinel = parent_r
    

    看到这里我们应该明白了。在执行 launch 方法的时候,会使用 os.fork() 派生出一个子进程,并且使用 ps.pipe() 创建出一对读写的管道,之后通过比较 self.pid 是否为 0,在主进程和子进程中执行不同的逻辑。子进程关闭 读 管道,之后执行 process_obj._bootstrap() 方法。

    然后就是 process_obj._bootstrap(),这个方法就是子进程执行的最后一个方法。当子进程执行完这个方法后,这个子进程已经进入了可用状态,随时等待着从主进程的管道接受任务。具体的流程比较复杂,我直接展示 process_obj._bootstrap() 的最后一步的源码,他会执行 workloop 方法,进入一个无限的循环:

    # billiard/pool.py
    # 省略
    #
    # Code run by worker processes
    #
    
    class Worker(object):
        # 省略
        def workloop(self, debug=debug, now=monotonic, pid=None):
            pid = pid or os.getpid()
            put = self.outq.put
            inqW_fd = self.inqW_fd
            synqW_fd = self.synqW_fd
            maxtasks = self.maxtasks
            max_memory_per_child = self.max_memory_per_child or 0
            prepare_result = self.prepare_result
    
            wait_for_job = self.wait_for_job
            _wait_for_syn = self.wait_for_syn
    
            def wait_for_syn(jid):
                i = 0
                while 1:
                    if i > 60:
                        error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!',
                              jid, self.synq._reader.fileno(), exc_info=1)
                    req = _wait_for_syn()
                    if req:
                        type_, args = req
                        if type_ == NACK:
                            return False
                        assert type_ == ACK
                        return True
                    i += 1
    
            completed = 0
            while maxtasks is None or (maxtasks and completed < maxtasks):
                req = wait_for_job()
                if req:
                    type_, args_ = req
                    assert type_ == TASK
                    job, i, fun, args, kwargs = args_
                    put((ACK, (job, i, now(), pid, synqW_fd)))
                    if _wait_for_syn:
                        confirm = wait_for_syn(job)
                        if not confirm:
                            continue  # received NACK
                    try:
                        result = (True, prepare_result(fun(*args, **kwargs)))
                    except Exception:
                        result = (False, ExceptionInfo())
                    try:
                        put((READY, (job, i, result, inqW_fd)))
                    except Exception as exc:
                        _, _, tb = sys.exc_info()
                        try:
                            wrapped = MaybeEncodingError(exc, result[1])
                            einfo = ExceptionInfo((
                                MaybeEncodingError, wrapped, tb,
                            ))
                            put((READY, (job, i, (False, einfo), inqW_fd)))
                        finally:
                            del(tb)
                    completed += 1
                    if max_memory_per_child > 0:
                        used_kb = mem_rss()
                        if used_kb <= 0:
                            error('worker unable to determine memory usage')
                        if used_kb > 0 and used_kb > max_memory_per_child:
                            error(MAXMEM_USED_FMT.format(
                                used_kb, max_memory_per_child))
                            return EX_RECYCLE
    
            debug('worker exiting after %d tasks', completed)
            if maxtasks:
                return EX_RECYCLE if completed == maxtasks else EX_FAILURE
            return EX_OK
    # 省略
    
    

    这个 workloop 其实很明显,就是监听读管道的数据(主进程从这个管道的另一端写),然后执行对应的回调,期间会调用 put 方法,往写管道同步状态(主进程可以从管道的另一端读这个数据).

    相关文章

      网友评论

          本文标题:Celery 源码学习(二)多进程模型

          本文链接:https://www.haomeiwen.com/subject/jnpnjktx.html