美文网首页
celery 源码笔记(一)

celery 源码笔记(一)

作者: pc123455 | 来源:发表于2018-08-02 22:34 被阅读0次

    由于工作需要,开一个celery源码笔记的坑。

    启动

    从github上下载源码打开后,可看到源码的结构如下:

    celery目录结构
    打开setup.py文件,在文件的最后可以看到
    1533043004390.png
    因此,可以分析出celery的入口是celery/__main__.py文件的main函数,函数定义如下
    def main():
        """Entrypoint to the ``celery`` umbrella command."""
        if 'multi' not in sys.argv:
            maybe_patch_concurrency()
        from celery.bin.celery import main as _main
        _main()
    

    这里可以看到,main函数会调用celery.bin.celery模块的main()函数,转到定义,在代码中可以看到这里的主要逻辑为

    cmd = CeleryCommand() # 创建CeleryComman对象
    cmd.execute_from_commandline(argv) # 从命令行启动
    

    CeleryCommand对象在celery/bin/celery.py文件,这里可以看到CeleryCommand继承自Command类(该类的声明celery/bin/base.py文件,在很多类都是由该类派生出来,以后会提到),由于CeleryCommand并没有实现自己的__init__函数,因此会调用Command类的__init__函数进行初始化

        def __init__(self, app=None, get_app=None, no_color=False,
                     stdout=None, stderr=None, quiet=False, on_error=None,
                     on_usage_error=None):
            self.app = app
            self.get_app = get_app or self._get_default_app
            self.stdout = stdout or sys.stdout
            self.stderr = stderr or sys.stderr
            self._colored = None
            self._no_color = no_color
            self.quiet = quiet
            if not self.description:
                self.description = self._strip_restructeredtext(self.__doc__)
            if on_error:
                self.on_error = on_error
            if on_usage_error:
                self.on_usage_error = on_usage_error
    

    这里我们可以看到__init__执行进行了一些简单的初始化工作。接下来分析execute_from_commandline函数

        def execute_from_commandline(self, argv=None):
            argv = sys.argv if argv is None else argv
            if 'multi' in argv[1:3]:  # Issue 1008
                self.respects_app_option = False
            try:
                sys.exit(determine_exit_status(
                    super(CeleryCommand, self).execute_from_commandline(argv)))
            except KeyboardInterrupt:
                sys.exit(EX_FAILURE)
    

    CeleryCommandexecute_from_commandline函数中,我们可以看到这里调用了Command类的execute_from_commandline函数

        def execute_from_commandline(self, argv=None):
            """Execute application from command-line.
    
            Arguments:
                argv (List[str]): The list of command-line arguments.
                    Defaults to ``sys.argv``.
            """
            if argv is None:
                argv = list(sys.argv)
            # Should we load any special concurrency environment?
            self.maybe_patch_concurrency(argv)
            self.on_concurrency_setup()
    
            # Dump version and exit if '--version' arg set.
            self.early_version(argv)
            argv = self.setup_app_from_commandline(argv) # 解析命令行参数并创建Celery实例
            self.prog_name = os.path.basename(argv[0])
            return self.handle_argv(self.prog_name, argv[1:]) # 调用当前对象的handle_argv函数
    

    在该函数中会调用setup_app_from_commandline解析命令行参数并创建应用(用户的app也是在这一步被加载),之后调用handle_argv函数继续处理,这里需要注意,代码中调用的handle_argv函数是CeleryCommand中定义的,接下来我们分析handle_argv函数。

        def handle_argv(self, prog_name, argv, **kwargs):
            self.prog_name = self.prepare_prog_name(prog_name)
            argv = self._relocate_args_from_start(argv)
            _, argv = self.prepare_args(None, argv)
            try:
                command = argv[0]
            except IndexError:
                command, argv = 'help', ['help']
            return self.execute(command, argv)
    

    这里可以看到,在解析了参数之后,调用了execute函数,其中第一个参数为命令行参数中解析出来的,按照官网的示例,这里的字符串为"worker",(后面的分析都暂时认为command的值是"worker")。之后进入到execute函数中

        def execute(self, command, argv=None):
            try:
                cls = self.commands[command]
            except KeyError:
                cls, argv = self.commands['help'], ['help']
            cls = self.commands.get(command) or self.commands['help'] # 根据传入的command字符串获取对应的类
            try:
                return cls(
                    app=self.app, on_error=self.on_error,
                    no_color=self.no_color, quiet=self.quiet,
                    on_usage_error=partial(self.on_usage_error, command=command),
                ).run_from_argv(self.prog_name, argv[1:], command=argv[0]) # 初始化并启动实例
            except self.UsageError as exc:
                self.on_usage_error(exc)
                return exc.status
            except self.Error as exc:
                self.on_error(exc)
                return exc.status
    

    这里我们看到execute函数主要做了两件事,一是根据传入的command查找类;二是创建上一步的类的实例并启动。
    转到worker类的定义,在文件celery/bin/worker.py中,可以看到,该类也是继承自Command类,worker类实例的初始化也是调用Command类的__init__,初始化完成后会调用run_from_argv启动,该函数只是回调了一下当前对象的handle_argv函数。由于worker类没有重写handle_argv,因此会调用Command类中的该函数。

        def handle_argv(self, prog_name, argv, command=None):
            """Parse arguments from argv and dispatch to :meth:`run`.
    
            Warning:
                Exits with an error message if :attr:`supports_args` is disabled
                and ``argv`` contains positional arguments.
    
            Arguments:
                prog_name (str): The program name (``argv[0]``).
                argv (List[str]): Rest of command-line arguments.
            """
            options, args = self.prepare_args(
                *self.parse_options(prog_name, argv, command))
            return self(*args, **options)
    

    在该函数中,会调用当前对象的__call__函数,同样地,这里也是调用Command类中定义的该函数。该函数中,会调用当前对象的run函数,这里调用的便是worker类中定义的run函数。在该函数中,会首先进行一些配置,之后便是创建真正的Worker类的对象之后调用start函数启动。
    本阶段的调用时序图可以整理如下:

    启动时序图

    相关文章

      网友评论

          本文标题:celery 源码笔记(一)

          本文链接:https://www.haomeiwen.com/subject/jykevftx.html