美文网首页
airflow scheduler 执行过程

airflow scheduler 执行过程

作者: 三须 | 来源:发表于2020-03-20 16:51 被阅读0次
    image.png

    0、airflow scheduler命令执行,创建schedulerJob并开始执行

    1、根据conf初始化Executor,执行器启动 (executor.start())
    初始化DAG处理管理器代理(DagFileProcessorAgent)并启动
    2、创建一个新的进程,DagFileProcessorManager初始化并启动

    self._process = multiprocessing.Process(
                target=type(self)._run_processor_manager,
                args=(
                    self._dag_directory,
                    self._file_paths,
                    self._max_runs,
                    self._processor_factory,
                    self._processor_timeout,
                    child_signal_conn,
                    self._async_mode,
                )
            )
            self._process.start()
    

    4、遍历Dags文件一个文件创建一个进程
    DagFileProcessor.start() →DagFileProcessor._run_file_processor()
    5、定时任务类(SchedulerJob)处理单个DAG文件,判断是否满足执行条件;创建DAGRun和TaskInstance,修改DAGRun状态

    SchedulerJob.process_file(file_path, pickle_dags)
                         self._process_dags(dagbag, dags, ti_keys_to_schedule)
                         self._process_task_instances(dag, tis_out)
                             -> run.verify_integrity(session=session) 创建及校验TaskInstance,
                                run.update_state(session=session)   根据TI修改DAGRun的状态
           遍历DAGRun中TaskInstance将满足依赖条件修改TI运行状态,将已处理完的DAG发送到队列中
    

    查询可执行TI

    for ti in tis:
        task = dag.get_task(ti.task_id)
     
        # fixme: ti.task is transient but needs to be set
        ti.task = task
     
        if ti.are_dependencies_met(  ##判断是否满足依赖条件
                dep_context=DepContext(flag_upstream_failed=True),
                session=session):
            self.log.debug('Queuing task: %s', ti)
            task_instances_list.append(ti.key)
    

    发送已处理完成的DAG

    result = scheduler_job.process_file(file_path, pickle_dags)
    result_channel.send(result)
    

    7、SchedulerJob,循环收集各个DAG文件处理器进程中处理完成的DAG
    simple_dags = self.processor_agent.harvest_simple_dags()
    根据已处理完的DAG,根据pool的大小和Task的权重,执行器将TaskInstance发送到队列中
    self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,session))
    发送TaskInstance到队列中

    # actually enqueue them
           for simple_task_instance in simple_task_instances:
               simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
               command = TI.generate_command(
                   simple_task_instance.dag_id,
                   simple_task_instance.task_id,
                   simple_task_instance.execution_date,
                   local=True,
                   mark_success=False,
                   ignore_all_deps=False,
                   ignore_depends_on_past=False,
                   ignore_task_deps=False,
                   ignore_ti_state=False,
                   pool=simple_task_instance.pool,
                   file_path=simple_dag.full_filepath,
                   pickle_id=simple_dag.pickle_id)
     
               priority = simple_task_instance.priority_weight
               queue = simple_task_instance.queue
               self.log.info(
                   "Sending %s to executor with priority %s and queue %s",
                   simple_task_instance.key, priority, queue
               )
     
               self.executor.queue_command(
                   simple_task_instance,
                   command,
                   priority=priority,
                   queue=queue)
    

    相关文章

      网友评论

          本文标题:airflow scheduler 执行过程

          本文链接:https://www.haomeiwen.com/subject/geojyhtx.html