美文网首页
airflow-kubernetes_executor源码分析

airflow-kubernetes_executor源码分析

作者: 陈先生_9e91 | 来源:发表于2020-08-05 10:06 被阅读0次

    airflow-kubernetes_executor源码分析

    上一篇分析了调度部分,今天继续分析执行器

    KubernetesExecutor

    /Users/yangxue.chen/code/python/airflow/airflow/contrib/executors/kubernetes_executor.py

        def start(self):
            """
            Executors may need to get things started. For example LocalExecutor
            starts N workers.
            """
            """Starts the executor"""
            self.log.info('Start Kubernetes executor')
            self.task_queue = self._manager.Queue()
            self.result_queue = self._manager.Queue()   
            self.kube_client = get_kube_client()
            self.kube_scheduler = AirflowKubernetesScheduler(
                self.kube_config, self.task_queue, self.result_queue,
                self.kube_client, self.worker_uuid
            )
    

    init初始化

    /Users/yangxue.chen/code/python/airflow/airflow/executors/base_executor.py

    PARALLELISM = conf.getint('core', 'PARALLELISM')
    
    def heartbeat(self):
            # Triggering new jobs
            if not self.parallelism:
                open_slots = len(self.queued_tasks)
            else:
                open_slots = self.parallelism - len(self.running)
    
            num_running_tasks = len(self.running)
            num_queued_tasks = len(self.queued_tasks)
    
            self.log.debug("%s running task instances", num_running_tasks)
            self.log.debug("%s in queue", num_queued_tasks)
            self.log.debug("%s open slots", open_slots)
            
            self.trigger_tasks(open_slots)
    
            # Calling child class sync method
            self.log.debug("Calling the %s sync method", self.__class__)
            self.sync()
            
        def trigger_tasks(self, open_slots):
            """
            Trigger tasks
    
            :param open_slots: Number of open slots
            :return:
            """
            sorted_queue = sorted(
                [(k, v) for k, v in self.queued_tasks.items()],
                key=lambda x: x[1][1],
                reverse=True)
            for i in range(min((open_slots, len(self.queued_tasks)))):
                key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
                self.queued_tasks.pop(key)            
                self.running[key] = command            
                self.execute_async()        
                
        def execute_async(self, key, command, queue=None, executor_config=None):
            """Executes task asynchronously"""
            # Patch kube_scheduler worker, so we can mount a dmp bucket when pod start
            kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
            self.kube_scheduler.worker_configuration = WorkerConfiguration(kube_config=self.kube_config)
    
            self.task_queue.put((key, command, kube_executor_config))            
    
    1. 根据并行度“AIRFLOW__CORE__PARALLELISM”和运行任务数确定剩余slot
    2. 异步执行排队的task,将task入队等待执行
        def sync(self):
            """Synchronize task state."""
            if self.running:
                self.log.debug('self.running: %s', self.running)
            if self.queued_tasks:
                self.log.debug('self.queued: %s', self.queued_tasks)
    
            self.kube_scheduler.sync()
    
            while True:
                try:
                    results = self.result_queue.get_nowait()
                    try:
                        key, state, pod_id, resource_version = results
                        self.log.info('Changing state of %s to %s', results, state)
                        self._change_state(key, state, pod_id)
                    finally:
                        self.result_queue.task_done()
                except Empty:
                    break
    
            for _ in range(self.kube_config.worker_pods_creation_batch_size):
                    task = self.task_queue.get_nowait()
                    self.kube_scheduler.run_next(task)
    
    1. Scheduler.sync
    2. 从result_queue中获取result
    3. 根据result改变task状态
    4. 从task_queue中获取task,交给scheuler执行
    def _change_state(self, key, state, pod_id):
        if state != State.RUNNING:
            if self.kube_config.delete_worker_pods:
                self.kube_scheduler.delete_pod(pod_id)
                self.log.info('Deleted pod: %s', str(key))
            try:
                self.running.pop(key)
            except KeyError:
                self.log.debug('Could not find key: %s', str(key))
        self.event_buffer[key] = state
    

    删除将非运行状态的任务pod

    AirflowKubernetesScheduler

    def sync(self):
        """
        The sync function checks the status of all currently running kubernetes jobs.
        If a job is completed, it's status is placed in the result queue to
        be sent back to the scheduler.
    
        :return:
    
        """
        while True:
            try:
                task = self.watcher_queue.get_nowait()
                try:
                    self.process_watcher_task(task)
                finally:
                    self.watcher_queue.task_done()
            except Empty:
                break
       
    def process_watcher_task(self, task):
            """Process the task by watcher."""
            pod_id, state, labels, resource_version = task
            self.log.info(
                'Attempting to finish pod; pod_id: %s; state: %s; labels: %s',
                pod_id, state, labels
            )
            key = self._labels_to_key(labels=labels)
            if key:
                self.log.debug('finishing job %s - %s (%s)', key, state, pod_id)
                self.result_queue.put((key, state, pod_id, resource_version))            
    
    1. 从watch_queue获取task变更
    2. 放入result_queue

    这样就和调用处executor.sync result_queu取值串联起来了

    def run_next(self, next_job):
        """
        The run_next command will check the task_queue for any un-run jobs.
        It will then create a unique job-id, launch that job in the cluster,
        and store relevant info in the current_jobs map so we can track the job's
        status
        """
        self.log.info('Kubernetes job is %s', str(next_job))
        key, command, kube_executor_config = next_job
        dag_id, task_id, execution_date, try_number = key
        self.log.debug("Kubernetes running for command %s", command)
        self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image)
    
        pod = self.worker_configuration.make_pod()
        
        # the watcher will monitor pods, so we do not block.
        self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
        
    def run_pod_async(self, pod, **kwargs):
        req = self.kube_req_factory.create(pod)
        self.log.debug('Pod Creation Request: \n%s', json.dumps(req, indent=2))
        resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace, **kwargs)
        self.log.debug('Pod Creation Response: %s', resp)
    
        return resp
    

    根据task创建pod。需要注意创建不会block,利用k8s list-watch机制对pod生命周期进行管理

    KubernetesJobWatcher

    k8s list-watch机制

    def _run(self, kube_client, resource_version, worker_uuid, kube_config):
        watcher = watch.Watch()
    
        kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}
        if resource_version:
            kwargs['resource_version'] = resource_version
        if kube_config.kube_client_request_args:
            for key, value in kube_config.kube_client_request_args.items():
                kwargs[key] = value
    
        last_resource_version = None
        for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
                                    **kwargs):
            task = event['object']
            self.log.info(
                'Event: %s had an event of type %s',
                task.metadata.name, event['type']
            )
            if event['type'] == 'ERROR':
                return self.process_error(event)
            self.process_status(
                task.metadata.name, task.status.phase, task.metadata.labels,
                task.metadata.resource_version
            )
    
        return last_resource_version
    

    pod过滤条件:{'label_selector': 'airflow-worker={}'.format(worker_uuid)}。重点关注两个状态处理了函数

        def process_error(self, event):
            """Process error response"""
            self.log.error(
                'Encountered Error response from k8s list namespaced pod stream => %s',
                event
            )
            raw_object = event['raw_object']
            if raw_object['code'] == 410:
                self.log.info(
                    'Kubernetes resource version is too old, must reset to 0 => %s',
                    (raw_object['message'],)
                )
                # Return resource version 0
                return '0'
            raise AirflowException(
                'Kubernetes failure for %s with code %s and message: %s' %
                (raw_object['reason'], raw_object['code'], raw_object['message'])
            )
    

    error没什么好注意的

        def process_status(self, pod_id, status, labels, resource_version):
            """Process status response"""
            if status == 'Pending':
                self.log.info('Event: %s Pending', pod_id)
            elif status == 'Failed':
                self.log.info('Event: %s Failed', pod_id)
                self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
            elif status == 'Succeeded':
                self.log.info('Event: %s Succeeded', pod_id)
                self.watcher_queue.put((pod_id, None, labels, resource_version))
            elif status == 'Running':
                self.log.info('Event: %s is Running', pod_id)
            else:
                self.log.warning(
                    'Event: Invalid state: %s on pod: %s with labels: %s with '
                    'resource_version: %s', status, pod_id, labels, resource_version
                )
    
    

    着重关注Failed & Succeeded ,只有这两个结束状态才会puh到watcher_queuq,这里就和之前watcher_queue取值串联起来了

    相关文章

      网友评论

          本文标题:airflow-kubernetes_executor源码分析

          本文链接:https://www.haomeiwen.com/subject/umovrktx.html