美文网首页
airflow 动态创建task

airflow 动态创建task

作者: young_____ | 来源:发表于2021-03-01 15:43 被阅读0次

airflow 动态创建task

通过http接口获取一个列表结果,遍历列表值,每条记录动态创建一个task

实现方式

动态创建task需要写两个dag实现,auto_rebuild_cube通过http的task获取到需要遍历的列表,提取name到xcom中。
第二个dag文件auto_build 通过 XCom.get_one 方法指定dag文件和execution_date,其中execution_date因为需要指定,所以我这里通过pendulum.now('Asia/Shanghai')直接拿的当前时间。

文件:auto_rebuild_cube

# -*- coding: utf-8 -*-
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.http_operator import SimpleHttpOperator

from airflow import DAG
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import json

import common
from common import china_days_ago

result = []

dag = DAG(
    dag_id='auto_rebuild_cube',
    default_args=common.default_args,
    start_date=china_days_ago(1),
    description='获取遍历列表',
    schedule_interval="9 * * * *",
)

get_all_cubes = SimpleHttpOperator(
    task_id="get_all_cubes",
    endpoint='kylin/api/cubes',
    headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
    data={"limit": "100"},
    method='GET',
    xcom_push=True,
    http_conn_id=common.global_kylin_http_id
)


def multitasking_task(**kwargs):
    xcom = kwargs['task_instance'].xcom_pull(task_ids="get_all_cubes")
    for data in json.loads(xcom):
        result.append(data['name'])
    kwargs['task_instance'].xcom_push(key="cubeNames", value=result)


multi_task = PythonOperator(
    task_id='multi_task',
    python_callable=multitasking_task,
    dag=dag,
    provide_context=True
)

push_task = BashOperator(
    task_id='push_task',
    bash_command="echo {{ task_instance.xcom_pull(key='cubeNames') }} ",
    dag=dag
)

trigger_build_cube = TriggerDagRunOperator(
    task_id='trigger_build_cube',
    trigger_dag_id="build_cube",
    python_callable=common.conditionally_trigger,
    params={'condition_param': True, 'message': '获取列表成功,即将开始动态创建task'},
    dag=dag
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

latest_only >> get_all_cubes >> multi_task >> push_task >> trigger_build_cube

文件:auto_cube

import json

from airflow import DAG
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.http_operator import SimpleHttpOperator
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum

import common
from common import china_days_ago

dag = DAG(dag_id='build_cube',
          default_args=common.default_args,
          start_date=china_days_ago(1),
          schedule_interval=None)


def get_data():
    execution_date = pendulum.now('Asia/Shanghai')
    print("the execution_date is {}", execution_date)
    cube_names = XCom.get_one(dag_id='auto_rebuild_cube', key='cubeNames', execution_date=execution_date,
                              include_prior_dates=True)
    print("cubeNames is {}", cube_names)
    return cube_names


def multitasking_task(data):
    return SimpleHttpOperator(
        task_id="rebuild_cube_{}".format(data),
        endpoint='kylin/api/cubes/{}/rebuild'.format(data),
        headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
        data=json.dumps({"buildType": "BUILD"}),
        method='PUT',
        http_conn_id=common.global_kylin_http_id
    )


start = DummyOperator(
    task_id="start",
    dag=dag
)

end = DummyOperator(
    task_id="end",
    dag=dag
)

for data in get_data():
    start >> [multitasking_task(data)] >> end

相关文章

网友评论

      本文标题:airflow 动态创建task

      本文链接:https://www.haomeiwen.com/subject/zpeafltx.html