美文网首页
airflow-scheduler_job源码分析

airflow-scheduler_job源码分析

作者: 陈先生_9e91 | 来源:发表于2020-07-17 11:57 被阅读0次

airflow-scheduler_job源码分析

基本概念

https://airflow.apache.org/docs/stable/concepts.html#

  • DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python.
  • DAG Run: An instance of a DAG for a particular logical date and time.
  • Operator: A class that acts as a template for carrying out some work.
  • Task: Defines work by implementing an operator, written in Python.
  • Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date).
  • execution_date: The logical date and time for a DAG Run and its Task Instances.

状态机

_images/task_lifecycle_diagram.png
  1. No status (scheduler created empty task instance)
  2. Scheduled (scheduler determined task instance needs to run)
  3. Queued (scheduler sent task to executor to run on the queue)
  4. Running (worker picked up a task and is now running it)
  5. Success (task completed)

code

代码部分略有删减,只保留关键code & log
/Users/yangxue.chen/code/python/airflow/airflow/jobs/scheduler_job.py

scheduler_loop.jpg
"""
This SchedulerJob runs for a specific time interval and schedules the jobs
that are ready to run. It figures out the latest runs for each
task and sees if the dependencies for the next schedules are met.
If so, it creates appropriate TaskInstances and sends run commands to the
executor. It does this for each task in each DAG and repeats.
"""

    def _execute(self):
        self.log.info("Starting the scheduler")

        def processor_factory(file_path, zombies):
            return DagFileProcessor()        
        
        self.processor_agent = DagFileProcessorAgent(processor_factory)        
        self._execute_helper()
    
    def _execute_helper(self):
        """
        The actual scheduler loop. The main steps in the loop are:
            #. Harvest DAG parsing results through DagFileProcessorAgent
            #. Find and queue executable tasks
                #. Change task instance state in DB
                #. Queue tasks in executor
            #. Heartbeat executor
                #. Execute queued tasks in executor asynchronously
                #. Sync on the states of running tasks 
        """
        self.executor.start()
        
        self.processor_agent.start()
        
        # For the execute duration, parse and schedule DAGs
        while (timezone.utcnow() - execute_start_time).total_seconds() < \
                self.run_duration or self.run_duration < 0:
            
            # Send tasks for execution if available
            if not self._validate_and_run_task_instances():
                continue   
  • 启动executor
  • 启动agent
  • 发送可运行的task

DagFileProcessorAgent

/Users/yangxue.chen/code/python/airflow/airflow/utils/dag_processing.py

"""
Agent for DAG file processing. It is responsible for all DAG parsing
related jobs in scheduler process. Mainly it can spin up DagFileProcessorManager
in a subprocess, collect DAG parsing results from it and communicate
signal/DAG parsing stat with it.

This class runs in the main `airflow scheduler` process.
"""
    def start(self):
        """
        Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
        """
        self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
        self._process = multiprocessing.Process(
            target=type(self)._run_processor_manager,
            args=(child_signal_conn)
        )
        self._process.start()
    
    @staticmethod
    def _run_processor_manager():
        processor_manager = DagFileProcessorManager()
        processor_manager.start()

DagFileProcessorManager

"""
Given a list of DAG definition files, this kicks off several processors
in parallel to process them and put the results to a multiprocessing.Queue
for DagFileProcessorAgent to harvest. The parallelism is limited and as the
processors finish, more are launched. The files are processed over and
over again, but no more often than the specified interval.
"""
    def start(self):
        """
        Use multiple processes to parse and generate tasks for the
        DAGs in parallel. By processing them in separate processes,
        we can get parallelism and isolation from potentially harmful
        user code.
        """
        while True:
            self._refresh_dag_dir()
            self._find_zombies()
            
            simple_dags = self.heartbeat()
        # How often to scan the DAGs directory for new files. Default to 5 minutes.
        self.dag_dir_list_interval = conf.getint('scheduler',
                                                 'dag_dir_list_interval')

def _refresh_dag_dir(self):
    """
    Refresh file paths from dag dir if we haven't done it for too long.
    """
    now = timezone.utcnow()
    elapsed_time_since_refresh = (now - self.last_dag_dir_refresh_time).total_seconds()
    if elapsed_time_since_refresh > self.dag_dir_list_interval:
        self._file_paths = list_py_file_paths(self._dag_directory)
        
        self.last_dag_dir_refresh_time = now

        self.set_file_paths(self._file_paths)

根据配置dag_dir_list_interval周期性地刷新dag文件目录

    self._parallelism = conf.getint('scheduler', 'max_threads')

def heartbeat(self):
    """
    This should be periodically called by the manager loop. This method will
    kick off new processes to process DAG definition files and read the
    results from the finished processors.
    """
    simple_dags = self.collect_results()
    
    if len(self._file_path_queue) == 0:
            files_paths_to_queue = list(set(self._file_paths) -
                                        set(file_paths_in_progress) -
                                        set(file_paths_recently_processed) -
                                        set(files_paths_at_run_limit))

    self.log.debug(
         "Queuing the following files for processing:\n\t%s",
         "\n\t".join(files_paths_to_queue)
    )            
    
    while (self._parallelism - len(self._processors) > 0 and
               len(self._file_path_queue) > 0):       

        file_path = self._file_path_queue.pop(0)
        processor = self._processor_factory(file_path, self._zombies)
        processor.start()        
        
    return simple_dags  

根据配置max_threads并行处理dag文件
DagFileProcessor

"""
Helps call SchedulerJob.process_file() in a separate process
"""
def start(self):
    self._parent_channel, _child_channel = multiprocessing.Pipe()
    self._process = multiprocessing.Process(
        target=type(self)._run_file_processor,
        args=()
    )
    self._process.start()
    
def _run_file_processor():
    scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
    result = scheduler_job.process_file()    
    result_channel.send(result)   
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
    """
    Process a Python file containing Airflow DAGs.

    This includes:

    1. Execute the file and look for DAG objects in the namespace.
    2. Pickle the DAG and save it to the DB (if necessary).
    3. For each DAG, see what tasks should run and create appropriate task
    instances in the DB.
    4. Record any errors importing the file into ORM
    5. Kill (in ORM) any task instances belonging to the DAGs that haven't
    issued a heartbeat in a while.

    Returns a list of SimpleDag objects that represent the DAGs found in
    the file
    """    
    dagbag = models.DagBag(file_path, include_examples=False)  

    # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
    for dag in dagbag.dags.values():
       dag.sync_to_db() 
    
    paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() if dag.is_paused]
    
    for dag_id in dagbag.dags:
    # Only return DAGs that are not paused
       if dag_id not in paused_dag_ids:
          dag = dagbag.get_dag(dag_id)
          simple_dags.append(SimpleDag(dag)
    
    ti_keys_to_schedule = []
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
    
    for ti_key in ti_keys_to_schedule:
        # Only schedule tasks that have their dependencies met, e.g. to avoid
        # a task that recently got its state changed to RUNNING from somewhere
        # other than the scheduler from getting its state overwritten.
        if ti.are_dependencies_met():
            # Task starts out in the scheduled state. All tasks in the
            # scheduled state will be sent to the executor
            ti.state = State.SCHEDULED

找到需要调度的task,将task状态置为scheduled,等待发送到executor执行,State.SCHEDULED

def _process_dags(self, dagbag, dags, tis_out):
    """
    Iterates over the dags and processes them. Processing includes:

    1. Create appropriate DagRun(s) in the DB.
    2. Create appropriate TaskInstance(s) in the DB.
    3. Send emails for tasks that have missed SLAs.
    """
    for dag in dags:
        if dag.is_paused:
            continue        
        
        dag_run = self.create_dag_run(dag)   
        self._process_task_instances(dag, tis_out) 
def create_dag_run(self, dag, session=None):
    """
    This method checks whether a new DagRun needs to be created
    for a DAG based on scheduling interval.
    Returns DagRun if one is scheduled. Otherwise returns None.
    """
    next_run = dag.create_dagrun()
    return next_run

def create_dagrun():
    run = DagRun()
    session.add(run)
    session.commit()
       
    # create the associated task instances
    # state is None at the moment of creation
    run.verify_integrity(session=session)
    run.refresh_from_db()
    return run

def verify_integrity(self, session=None):
    """
    Verifies the DagRun by checking for removed tasks or tasks that are not in the
    database yet. It will set state to removed or add the task if required.
    """

    # check for removed or restored tasks
    task_ids = []
    for ti in tis:
        task_ids.append(ti.task_id)
        
        try:
            task = dag.get_task(ti.task_id)
        except AirflowException:
            if ti.state == State.REMOVED:
                pass  # ti has already been removed, just ignore it
            elif self.state i`s not State.RUNNING and not dag.partial:
                ti.state = State.REMOVED
        
        is_task_in_dag = task is not None
        should_restore_task = is_task_in_dag and ti.state == State.REMOVED
        if should_restore_task:
            ti.state = State.NONE
            
   # check for missing tasks
    # 全部task -> ti,一次创建dag下所有的ti,状态none
   for task in six.itervalues(dag.task_dict):
       if task.start_date > self.execution_date and not self.is_backfill:
          continue

       if task.task_id not in task_ids:
          ti = TaskInstance(task, self.execution_date)
          session.add(ti)

   session.commit()  

创建dag_run,并检查dag_run task完整性, State.NONE

# In order to be able to get queued a task must have one of these states
SCHEDULEABLE_STATES = {
    State.NONE,
    State.UP_FOR_RETRY,
    State.UP_FOR_RESCHEDULE,
}

def _process_task_instances(self, dag, task_instances_list, session=None):
    """
    This method schedules the tasks for a single DAG by looking at the
    active DAG runs and adding task instances that should run to the
    queue.
    """
    dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
    active_dag_runs = []
    for run in dag_runs:   
        # todo: preferably the integrity check happens at dag collection time
        run.verify_integrity(session=session)
        
        if run.state == State.RUNNING:
            active_dag_runs.append(run)    
            
    for run in active_dag_runs:
        tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
        
        # this loop is quite slow as it uses are_dependencies_met for
        # every task (in ti.is_runnable). This is also called in
        # update_state above which has already checked these tasks        
        for ti in tis:
            if ti.are_dependencies_met()
                task_instances_list.append(ti.key)
                
def are_dependencies_met():
    """
    Returns whether or not all the conditions are met for this task instance to be run
    given the context for the dependencies (e.g. a task instance being force run from
    the UI will ignore some dependencies).
    """

找到running的dag_run下SCHEDULEABLE_STATES task,并校验执行依赖

exec

/Users/yangxue.chen/code/python/airflow/airflow/jobs/scheduler_job.py

def _validate_and_run_task_instances(self, simple_dag_bag):
    self._process_and_execute_tasks(simple_dag_bag)

    # Call heartbeats
    self.log.debug("Heartbeating the executor")
    self.executor.heartbeat()

    self._change_state_for_tasks_failed_to_execute()

    # Process events from the executor
    self._process_executor_events(simple_dag_bag)
    return True

def _process_and_execute_tasks(self, simple_dag_bag):
    # Handle cases where a DAG run state is set (perhaps manually) to
    # a non-running state. Handle task instances that belong to
    # DAG runs in those states
    # If a task instance is up for retry but the corresponding DAG run
    # isn't running, mark the task instance as FAILED so we don't try
    # to re-run it.
    self._change_state_for_tis_without_dagrun()
    # If a task instance is scheduled or queued or up for reschedule,
    # but the corresponding DAG run isn't running, set the state to
    # NONE so we don't try to re-run it.
    self._change_state_for_tis_without_dagrun()
    
    self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,))

def _execute_task_instances(simple_dag_bag,states):
    """
    Attempts to execute TaskInstances that should be executed by the scheduler.

    There are three steps:
    1. Pick TIs by priority with the constraint that they are in the expected states
    and that we do exceed max_active_runs or pool limits.
    2. Change the state for the TIs above atomically.
    3. Enqueue the TIs in the executor.    
    """
    executable_tis = self._find_executable_task_instances(states)
    
    def query(result, items):
        simple_tis_with_state_changed = \
            self._change_state_for_executable_task_instances(items,states)

        self._enqueue_task_instances_with_queued_state(
                simple_dag_bag,
                simple_tis_with_state_changed)
        session.commit()
        return result + len(simple_tis_with_state_changed)    

    return helpers.reduce_in_chunks(query, executable_tis, 0, self.max_tis_per_query)

def _change_state_for_executable_task_instances(task_instances,acceptable_states):
    """
    Changes the state of task instances in the list with one of the given states
    to QUEUED atomically, and returns the TIs changed in SimpleTaskInstance format.
    """
    
    # set TIs to queued state
    for task_instance in tis_to_set_to_queued:
        task_instance.state = State.QUEUED
        task_instance.queued_dttm = timezone.utcnow()
    
    simple_task_instances = [SimpleTaskInstance(ti) for ti in tis_to_set_to_queued]
    self.log.info("Setting the following %s tasks to queued state:\n\t%s")    
    return simple_task_instances

def _enqueue_task_instances_with_queued_state(simple_task_instances):
    """
    Takes task_instances, which should have been set to queued, and enqueues them
    with the executor.
    """
    TI = models.TaskInstance
    # actually enqueue them
    for simple_task_instance in simple_task_instances:
        command = TI.generate_command()
        log.info("Sending %s to executor with priority %s and queue %s")

        self.executor.queue_command()

找到scheduled的task,将状态置为queued,并执行,State.QUEUED

/Users/yangxue.chen/code/python/airflow/airflow/executors/base_executor.py

    def queue_command(self, simple_task_instance, command, priority=1, queue=None):
        key = simple_task_instance.key
        if key not in self.queued_tasks and key not in self.running:
            self.log.info("Adding to queue: %s", command)
            self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
        else:
            self.log.info("could not queue task %s", key)

send task to executor的实现就是将task加到executor的queued_tasks中排队

相关文章

网友评论

      本文标题:airflow-scheduler_job源码分析

      本文链接:https://www.haomeiwen.com/subject/vggwhktx.html