美文网首页
Celery 源码学习(一)架构分析

Celery 源码学习(一)架构分析

作者: 星丶雲 | 来源:发表于2020-08-21 15:50 被阅读0次

    1.Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。

    简单说就是分布式的任务队列

    2.消息队列与任务队列区别

    可以看我另一篇文章
    [https://www.jianshu.com/p/cde93d4d00c8](https://www.jianshu.com/p/cde93d4d00c8)
    

    3.在我看来,消息队列和任务队列 主要能解决以下场景的问题:

    非实时数据离线计算业务逻辑拆分,降低耦合异步响应数据,提高用户体验其实在知乎,很多行为,比如你点一个赞,写一篇回答,发表一篇文章,启用匿名等等,背后都会有很多消息发送出去。不同的业务方收到后做不同的处理,比如算法会计算回答、文章的分值,做一些标签的判定,后台系统可能会记录回答,文章的 meta 信息等等。

    4. 什么是 celery?

    接触 python 的同学肯定不会陌生,即便没有用过,但应该也会听说过。celery 是 python 世界中最有名的开源消息队列框架。他之所以特别火爆,主要在于它实现了以下几点:性能高,吞吐量大配置灵活,简单易用文档齐全,配套完善其实除了以上的优点外,celery 的源码也有很大的价值,可以说是软件工程设计模式的典范。不过在这篇文章中我不会涉及源码的解读,更多的会介绍下 celery 架构设计中的一些哲学。

    5. 高性能:多进程事件驱动的异步模型说到高性能

    尤其是对于消息队列来说,很多人不以为然,认为高性能是没有意义的。其实这也不无道理。主要表现为以下几点:离线任务本身不要求实时性,一秒处理 100 和一秒处理 1000 个任务没有本质的区别。如果非要提高吞吐量,可以通过扩容等更加灵活的手段。而且资源的开销并不是很大。吞吐量过高,反而有可能是坏事。比如把上游业务打崩,把 mysql 打崩,把系统连接打满等等。虽然对于离线队列来说,性能不重要。

    但是,这并不妨碍我们从学习的角度看待 celery 的架构原则。

    celery 的高性能主要靠两个方面来保证,一个是多进程一个是事件驱动

    下面我分别来讲一下他们的设计思想。珠玉在前很多人应该对 nginx 不陌生,提到 nginx,我们首先想到的,就是 nginx 是一个高性能的反向代理服务器。而 celery,可以说相当程度上借鉴了 nginx。

    6.消费模型celery 的核心架构,

    分成了调度器(master/main process) 和 工作进程(slaves/worker processes),也就是我们常说的主从。

    celery 的消费模型很简单,调度器负责任务的获取,分发,工作进程(slaves/worker processes)的管理(创建,增加,关闭,重启,丢弃等等),其他辅助模块的维护等等。

    工作进程负责消费从调度器传递过来的任务。具体流程:调度器首先预生成(prefork)工作进程,做为一个进程池(mutiprocessing-pool),之后通过事件驱动(select/poll/epoll)的方式,监听内核的事件(读、写、异常等等),如果监听到就执行对应的回调,源源不断的从 中间人(broker)那里提取任务,并通过 管道(pipe)作为进程间通讯的方式,运用一系列的路由策略(round-robin、weight 等等)交给工作进程。

    工作进程消费(ack)任务,再通过管道向调度器进行状态同步(sync),进程间通讯等等行为。

    当然,这只是一个很粗粒度的描述,其实 celery 内部还实现了很多有趣的功能,比如 prefetch,集群监控与管理,auto-scaler,容灾恢复等等,这些非核心功能的模块暂时还不会涉及,以后可以单独拆出来看他是怎么实现的。

    7.高效的理由可以思考一下,为什么这种架构方式性能非常高。

    首先,我们分析下调度器。调度器是一个事件驱动模型,什么事事件驱动,其实就是它消灭了阻塞。正常的单线程模型,一次只能拿一条消息,每一次都要走一条来和回的链路,并且需要一个 while True 的循环不断的去检测,这样无疑是非常低效且开销大的。而事件驱动则不这样,他可以同时发送多个检测的信号,然后就直接挂起,等待内核进行提示,有提示再去执行对应的回调。这样既优雅的化解了单线程每次都要检测的 while True,又通过多次请求并发降低了重复链路。
    然后,我们看一下工作进程用多进程的优势。业内有经验的工程师,在配置容器的时候,经常会使用 n 核,n*m worker 数的配置。这是因为,多进程可以良好的发挥每个核的计算能力。

    而且多进程良好的分摊了并发请求的处理压力,同时,多进程内部,还可以使用多线程、异步等方式

    这样,可以在充分利用多核计算优势的基础上,再充分利用单个线程非阻塞模型的优势。好,关于 celery 的设计架构大概就讲到这里,之后会从源码的角度分析下上面的那一系列流程是怎么实现的

    [celery执行器--CeleryExecutor]

    由于celery任务执行器不能并行执行,因此开启多进程进行执行

    #
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    """Celery executor."""
    import math
    import os
    import subprocess
    import time
    import traceback
    from multiprocessing import Pool, cpu_count
    from typing import Any, List, Optional, Tuple, Union
    
    from celery import Celery, Task, states as celery_states
    from celery.result import AsyncResult
    
    from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
    from airflow.configuration import conf
    from airflow.exceptions import AirflowException
    from airflow.executors.base_executor import BaseExecutor, CommandType
    from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType
    from airflow.utils.log.logging_mixin import LoggingMixin
    from airflow.utils.module_loading import import_string
    from airflow.utils.timeout import timeout
    
    # Make it constant for unit test.
    CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state'
    
    CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'
    
    '''
    To start the celery worker, run the command:
    airflow celery worker
    '''
    
    if conf.has_option('celery', 'celery_config_options'):
        celery_configuration = import_string(
            conf.get('celery', 'celery_config_options')
        )
    else:
        celery_configuration = DEFAULT_CELERY_CONFIG
    
    
    #app实例
    app = Celery(
        conf.get('celery', 'CELERY_APP_NAME'),
        config_source=celery_configuration)
    
    
    @app.task
    def execute_command(command_to_exec: CommandType) -> None:
        """
        需要执行的命令
        Executes command.
    
        subprocess.check_call(args, *, stdin = None, stdout = None, stderr = None, shell = False)
        与call方法类似,不同在于如果命令行执行成功,check_call返回返回码0,否则抛出subprocess.CalledProcessError异常。
        subprocess.CalledProcessError异常包括returncode、cmd、output等属性,其中returncode是子进程的退出码,cmd是子进程的执行命令,output为None。
    
        当子进程退出异常时,则报错
    
        """
        log = LoggingMixin().log
        log.info("Executing command in Celery: %s", command_to_exec)
        env = os.environ.copy()
        try:
            subprocess.check_call(command_to_exec, stderr=subprocess.STDOUT,
                                  close_fds=True, env=env)
        except subprocess.CalledProcessError as e:
            log.exception('execute_command encountered a CalledProcessError')
            log.error(e.output)
            raise AirflowException('Celery command failed')
    
    
    class ExceptionWithTraceback:
        """
        包装器
        Wrapper class used to propagate exceptions to parent processes from subprocesses.
    
        :param exception: The exception to wrap
        :type exception: Exception
        :param exception_traceback: The stacktrace to wrap
        :type exception_traceback: str
        """
    
        def __init__(self, exception: Exception, exception_traceback: str):
            self.exception = exception
            self.traceback = exception_traceback
    
    
    def fetch_celery_task_state(celery_task: Tuple[TaskInstanceKeyType, AsyncResult]) \
            -> Union[TaskInstanceStateType, ExceptionWithTraceback]:
        """
    
        返回任务状态
    
        Fetch and return the state of the given celery task. The scope of this function is
        global so that it can be called by subprocesses in the pool.
    
        :param celery_task: a tuple of the Celery task key and the async Celery object used
            to fetch the task's state
        :type celery_task: tuple(str, celery.result.AsyncResult)
        :return: a tuple of the Celery task key and the Celery state of the task
        :rtype: tuple[str, str]
        """
    
        try:
            with timeout(seconds=2):
                # Accessing state property of celery task will make actual network request
                # to get the current state of the task.
                return celery_task[0], celery_task[1].state
        except Exception as e:  # pylint: disable=broad-except
            exception_traceback = "Celery Task ID: {}\n{}".format(celery_task[0],
                                                                  traceback.format_exc())
            return ExceptionWithTraceback(e, exception_traceback)
    
    
    # Task instance that is sent over Celery queues
    # TaskInstanceKeyType, SimpleTaskInstance, Command, queue_name, CallableTask
    TaskInstanceInCelery = Tuple[TaskInstanceKeyType, SimpleTaskInstance, CommandType, Optional[str], Task]
    
    
    def send_task_to_executor(task_tuple: TaskInstanceInCelery) \
            -> Tuple[TaskInstanceKeyType, CommandType, Union[AsyncResult, ExceptionWithTraceback]]:
        """
    
        发送任务到celery执行器
        Sends task to executor."""
        key, _, command, queue, task_to_run = task_tuple
        try:
            with timeout(seconds=2):
                #异步执行
                result = task_to_run.apply_async(args=[command], queue=queue)
        except Exception as e:  # pylint: disable=broad-except
            exception_traceback = "Celery Task ID: {}\n{}".format(key, traceback.format_exc())
            result = ExceptionWithTraceback(e, exception_traceback)
    
        return key, command, result
    
    
    class CeleryExecutor(BaseExecutor):
        """
        CeleryExecutor is recommended for production use of Airflow. It allows
        distributing the execution of task instances to multiple worker nodes.
    
        Celery is a simple, flexible and reliable distributed system to process
        vast amounts of messages, while providing operations with the tools
        required to maintain such a system.
        
        celery执行器只能同步执行(不能调用execute_async),不能异步执行;由于开启了多进程, 因此加速了执行。
        由于celery本身是异步的,本质上来说,还是异步执行
        
        """
    
        def __init__(self):
            super().__init__()
    
            # Celery doesn't support querying the state of multiple tasks in parallel
            # (which can become a bottleneck on bigger clusters) so we use
            # a multiprocessing pool to speed this up.
            # How many worker processes are created for checking celery task state.
            self._sync_parallelism = conf.getint('celery', 'SYNC_PARALLELISM')
            if self._sync_parallelism == 0:
                self._sync_parallelism = max(1, cpu_count() - 1)
    
            self._sync_pool = None
            #正在运行的任务
            self.tasks = {}
            #最近的状态
            self.last_state = {}
    
        def start(self) -> None:
            self.log.debug(
                'Starting Celery Executor using %s processes for syncing',
                self._sync_parallelism
            )
    
        def _num_tasks_per_send_process(self, to_send_count: int) -> int:
            """
            每个进程多少个任务
    
            任务数量 / 并行度
            How many Celery tasks should each worker process send.
    
            :return: Number of tasks that should be sent per process
            :rtype: int
            """
            return max(1,
                       int(math.ceil(1.0 * to_send_count / self._sync_parallelism)))
    
        def _num_tasks_per_fetch_process(self) -> int:
            """
            How many Celery tasks should be sent to each worker process.
            一次发送多少任务
    
            :return: Number of tasks that should be used per process
            :rtype: int
            """
            return max(1, int(math.ceil(1.0 * len(self.tasks) / self._sync_parallelism)))
    
        def trigger_tasks(self, open_slots: int) -> None:
            """
    
            触发任务
    
            Overwrite trigger_tasks function from BaseExecutor
    
            :param open_slots: Number of open slots
            :return:
            """
            sorted_queue = self.order_queued_tasks_by_priority()
    
            task_tuples_to_send: List[TaskInstanceInCelery] = []
    
            for _ in range(min((open_slots, len(self.queued_tasks)))):
                key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
                task_tuples_to_send.append((key, simple_ti, command, queue, execute_command))
    
            cached_celery_backend = None
            if task_tuples_to_send:
                tasks = [t[4] for t in task_tuples_to_send]
    
                # Celery state queries will stuck if we do not use one same backend for all tasks.
                cached_celery_backend = tasks[0].backend
    
            if task_tuples_to_send:
                # Use chunks instead of a work queue to reduce context switching
                # since tasks are roughly uniform in size
                chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
                #进程数量
                num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
    
                #进程池
                send_pool = Pool(processes=num_processes)
                key_and_async_results = send_pool.map(
                    send_task_to_executor,
                    task_tuples_to_send,
                    chunksize=chunksize)
    
                send_pool.close()
                send_pool.join()
                self.log.debug('Sent all tasks.')
    
                for key, command, result in key_and_async_results:
                    if isinstance(result, ExceptionWithTraceback):
                        self.log.error(
                            CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
                        )
                    elif result is not None:
                        # Only pops when enqueued successfully, otherwise keep it
                        # and expect scheduler loop to deal with it.
                        self.queued_tasks.pop(key)
                        result.backend = cached_celery_backend
                        self.running.add(key)
                        self.tasks[key] = result
                        self.last_state[key] = celery_states.PENDING
    
        def sync(self) -> None:
            """同步结果"""
            num_processes = min(len(self.tasks), self._sync_parallelism)
            if num_processes == 0:
                self.log.debug("No task to query celery, skipping sync")
                return
    
            self.log.debug("Inquiring about %s celery task(s) using %s processes",
                           len(self.tasks), num_processes)
    
            # Recreate the process pool each sync in case processes in the pool die
            self._sync_pool = Pool(processes=num_processes)
    
            # Use chunks instead of a work queue to reduce context switching since tasks are
            # roughly uniform in size
            chunksize = self._num_tasks_per_fetch_process()
    
            self.log.debug("Waiting for inquiries to complete...")
            #获取任务的结果
            task_keys_to_states = self._sync_pool.map(
                fetch_celery_task_state,
                self.tasks.items(),
                chunksize=chunksize)
            self._sync_pool.close()
            self._sync_pool.join()
            self.log.debug("Inquiries completed.")
    
            self.update_task_states(task_keys_to_states)
    
        def update_task_states(self,
                               task_keys_to_states: List[Union[TaskInstanceStateType,
                                                               ExceptionWithTraceback]]) -> None:
            """
             更新所有任务状态
    
            Updates states of the tasks."""
            for key_and_state in task_keys_to_states:
                if isinstance(key_and_state, ExceptionWithTraceback):
                    self.log.error(
                        CELERY_FETCH_ERR_MSG_HEADER + ", ignoring it:%s\n%s\n",
                        repr(key_and_state.exception), key_and_state.traceback
                    )
                    continue
                key, state = key_and_state
                self.update_task_state(key, state)
    
        def update_task_state(self, key: TaskInstanceKeyType, state: str) -> None:
            """
            更新任务状态
            Updates state of a single task."""
            # noinspection PyBroadException
            try:
                #只有状态发生变化才处理
                if self.last_state[key] != state:
                    if state == celery_states.SUCCESS:
                        self.success(key)
                        del self.tasks[key]
                        del self.last_state[key]
                    elif state == celery_states.FAILURE:
                        self.fail(key)
                        del self.tasks[key]
                        del self.last_state[key]
                    elif state == celery_states.REVOKED:
                        self.fail(key)
                        del self.tasks[key]
                        del self.last_state[key]
                    else:
                        self.log.info("Unexpected state: %s", state)
                        self.last_state[key] = state
            except Exception:  # pylint: disable=broad-except
                self.log.exception("Error syncing the Celery executor, ignoring it.")
    
        def end(self, synchronous: bool = False) -> None:
            if synchronous:
                while any([task.state not in celery_states.READY_STATES for task in self.tasks.values()]):
                    time.sleep(5)
            self.sync()
    
        def execute_async(self,
                          key: TaskInstanceKeyType,
                          command: CommandType,
                          queue: Optional[str] = None,
                          executor_config: Optional[Any] = None):
            """
            不允许异步运行
            Do not allow async execution for Celery executor."""
            raise AirflowException("No Async execution for Celery executor.")
    
        def terminate(self):
            pass
    

    相关文章

      网友评论

          本文标题:Celery 源码学习(一)架构分析

          本文链接:https://www.haomeiwen.com/subject/yngujktx.html