美文网首页
AirFlow_本机课程v 1.0

AirFlow_本机课程v 1.0

作者: 山猪打不过家猪 | 来源:发表于2024-02-08 03:11 被阅读0次

    1.认识airflow

    • airflow的组成


      image.png
    • operator的分类


      image.png

    2. 常用命令

    1.docker 查看当前运行的images,并且找到airflow

    docker ps
    
    image.png
    1. 进入到aiflow的bash命令行里
    docker exec -it 1aa536546bb6 /bin/bash
    

    3.查看目前所有的dags

    airflow dags list
    

    4.查看指定的dag信息

    airflow dags list-runs -d dag_python_v6 
    
    1. 触发之前未运行的dag,airflow dags backfill -s 起始日期 -e 结束日期 Dag_id
    airflow dags backfill -s 2024-02-06 -e 2024-02-08 dag_python_v6
    

    6.查看指定dag的tasksairflow tasks list dag_id

    airflow tasks list dag_python_v6
    

    7.测试指定dag在某天的运行情况airflow tasks test dag_id task_id 日期

    airflow tasks test forex_v1 is_forex_available 2024-02-01
    

    3.第一个DAG

    流程

    image.png

    什么是DAG

    简单理解就是pipeline

    最简单的dag的格式

    from airflow import DAG
    from datetime import datetime,timedelta
    
    #不是dag的参数,而是task任务的参数
    default_args = {
        'owner':'lg',
        'retries':1,
        'retry_delay':timedelta(minutes=2)
    }
    
    
    with DAG(
        dag_id= 'forex_v1',
        default_args=default_args,
        start_date=datetime(2024,2,9),
        schedule_interval="@daily",
        catchup=False #True会运行设定start_date之前的所有任务
    ) as dag:
        pass
    

    catchup = False:True会运行设定start_date之前的所有任务,例如设置的start_date2024-2-1且是@daily执行的方式,如果今天是2024-2-9那么他会执行这8天的所有任务

    什么是operator

    简单理解就是task
    有三种类型的operator:
    1.action: 例如执行Python,bash,sql都有对应的Operator
    2.transfer: 迁移转换,实现source到sink的迁移
    3.sensor:sensor allow you to verify if a condition is mer or not before moving forward.实现了等待某以任务执行后,在执行另一个,例如你等待所有文件到了指定文件夹后,在执行另外一个。

    1.创建一个api的sensor来check api available

    • api的网址是https://gist.github.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b
    1. 创建dag
    from airflow import DAG
    from datetime import datetime,timedelta
    from airflow.providers.http.sensors.http import HttpSensor
    
    #不是dag的参数,而是task任务的参数
    default_args = {
        'owner':'lg',
        'retries':1,
        'retry_delay':timedelta(minutes=2)
    }
    
    
    with DAG(
        dag_id= 'forex_v1',
        default_args=default_args,
        start_date=datetime(2024,2,9),
        schedule_interval="@daily",
        catchup=False #True会运行设定start_date之前的所有任务
    ) as dag:
        is_forex_available = HttpSensor(
            task_id ='is_forex_available' , 
            http_conn_id='forex_api',  #必须和页面里connection的设置一样
            endpoint= 'marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b',
            response_check=lambda response: 'rate' in response.text,
            poke_interval = 5,
            timeout = 10
        )
    
    
        is_forex_available
    
    

    2.去客户端设置connections


    image.png

    3.测试

    airflow@1aa536546bb6:/opt/airflow$ airflow tasks test forex_v1 is_forex_available 2024-02-08
    

    2.使用file sensor检测文件是否存在

    1.添加文件检测的task

        #检测文件是否存在的sensor
        is_forex_csv_available = FileSensor(
            task_id = 'is_forex_csv_available',
            fs_conn_id='forex_path',
            filepath='forex_currencies.csv',
            poke_interval = 5, #检测链接是否可以用5s一次
            timeout = 10  #链接超时时间
        )
    
    

    3.查看flies的位置

    airflow@1aa536546bb6:/opt/airflow$ ls
    

    4.设置好connecions


    image.png

    3.使用python operator执行下载csv的程序

    官方文档地址
    1.添加python的下载程序

    def download_rates():
      pass
    

    2.添加Python operator

        #python operator用来执行python程序
        down_load_csv = PythonOperator(
            task_id = 'down_load_csv',
            python_callable= download_rates, #python function的name;
            ## op_kwargs={'age':20} #用来传递参数,这里无参,不用传递
        )
    
    1. 执行测试,测试成功后,文件将会被下载
    airflow tasks test forex_v1 down_load_csv 2024-02-01
    
    image.png

    4.完整代码

    import csv
    import requests
    import json
    from datetime import datetime,timedelta
    
    from airflow import DAG
    from airflow.providers.http.sensors.http import HttpSensor #http sensor
    from airflow.sensors.filesystem import FileSensor #file sensor
    from airflow.operators.python import PythonOperator #引入python operator
    
    #不是dag的参数,而是task任务的参数
    default_args = {
        'owner':'lg',
        'retries':1,
        'retry_delay':timedelta(minutes=2)
    }
    
    def download_rates():
        BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
        ENDPOINTS = {
            'USD': 'api_forex_exchange_usd.json',
            'EUR': 'api_forex_exchange_eur.json'
        }
        with open('/opt/airflow/dags/__pycache__/dags/files/forex_currencies.csv') as forex_currencies:
            reader = csv.DictReader(forex_currencies, delimiter=';')
            for idx, row in enumerate(reader):
                base = row['base']
                with_pairs = row['with_pairs'].split(' ')
                indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
                outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
                for pair in with_pairs:
                    outdata['rates'][pair] = indata['rates'][pair]
                with open('/opt/airflow/dags/__pycache__/dags/files/forex_rates.json', 'a') as outfile:
                    json.dump(outdata, outfile)
                    outfile.write('\n')
    
    with DAG(
        dag_id= 'forex_v1',
        default_args=default_args,
        start_date=datetime(2024,2,9),
        schedule_interval="@daily",
        catchup=False #True会运行设定start_date之前的所有任务
    ) as dag:
        #检测api是否可用的sensor
        is_forex_api_available = HttpSensor(
            task_id ='is_forex_api_available' , #id
            http_conn_id='forex_api',  #设置链接ID
            endpoint= 'marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b', #域名具体地址
            response_check=lambda response: 'rate' in response.text, #检测是否存在rate字段
            poke_interval = 5, #检测链接是否可以用5s一次
            timeout = 10  #链接超时时间
        )
    
        #检测文件是否存在的sensor
        is_forex_csv_available = FileSensor(
            task_id = 'is_forex_csv_available',
            fs_conn_id='forex_path',
            filepath='forex_currencies.csv',
            poke_interval = 5, #检测链接是否可以用5s一次
            timeout = 10  #链接超时时间
        )
    
        #python operator用来执行python程序
        down_load_csv = PythonOperator(
            task_id = 'down_load_csv',
            python_callable= download_rates, #python function的name;
            ## op_kwargs={'age':20} #用来传递参数,这里无参,不用传递
        )
    
    1. 改进,使用好的项目结构


      image.png

    4.使用bash operator将文件存储到hdfs里

    一般的数据流的文件很大, 我们应该将数据存储去datalake里,将文件

    from airflow.operators.bash import BashOperator
    
        #使用bash operator将json文件存储到hdfs里
        saving_rates_to_hdfs = BashOperator(
            task_id = 'saving_rates_to_hdfs',
            bash_command="""
                hdfs dfs -mkdir /forex && \
                hdfs dfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
            """
        )
    

    5.将数据存放区hive的表里

    pass

    6. 用spark处理数据

    airflow只是一个调度工具,所有的大数据处理都应该用相应的工具完成,所以这里我们需要直接spark处理,
    原始版本的airflow是没有处理spark的包的需要我们自己安装
    1.新建Dockerfile

    FROM apache/airflow:2.8.1
    COPY requirements.txt requirements.txt
    RUN pip install --user --upgrade pip
    RUN pip install apache-airflow-providers-apache-spark
    RUN pip install -r requirements.txt
    
    1. 查看当前的images
    docker images
    
    1. build 新的images,不能和上面的重名
    docker build . --tag extending_airflow:latest
    

    4.安装完成后,引入operator,添加task

        #将数据用spark处理
        forex_processing= SparkSubmitOperator(
            task_id = "forex_processing",
            application="/opt/airflow/dags/source/processing_by_spark.py",#必须是.py文件
            conn_id="spark_conn",
            verbose=False
        )
    

    5.然后编写spark处理的文件

    from os.path import expanduser, join, abspath
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json
    
    warehouse_location = abspath('spark-warehouse')
    
    # Initialize Spark Session
    spark = SparkSession \
        .appName("Forex processing") \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .enableHiveSupport() \
        .getOrCreate()
    
    # Read the file forex_rates.json from the HDFS
    df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
    
    # Drop the duplicated rows based on the base and last_update columns
    forex_rates = df.select('base', 'last_update', 'rates.eur', 'rates.usd', 'rates.cad', 'rates.gbp', 'rates.jpy', 'rates.nzd') \
        .dropDuplicates(['base', 'last_update']) \
        .fillna(0, subset=['EUR', 'USD', 'JPY', 'CAD', 'GBP', 'NZD'])
    
    # Export the dataframe into the Hive table forex_rates
    forex_rates.write.mode("append").insertInto("forex_rates")
    

    7.任务失败发送邮件

    • 需要配置airflow.cfg文件,但是docker里是没有的,我们需要将docker环境下的airflow.cfg重定向到本地
    /opt/airflow$ airflow config list --defaults >  ${AIRFLOW_PROJ_DIR:-.}/config/airflow.cfg
    
    • 设置airflow.cfg
      image.png
    • 添加email的Operator
        #添加email operator
        send_email = EmailOperator(
            task_id = "send_email",
            to = "airflow@gmail.com",
            subject= "forex_pipeline_status",
            html_content="<h>forex_pipeline_fail</h>"
        )
    

    8.设置slack提醒

    pass

    4.DAG的设置

    4.1DAG的时间设置

    夏令时,如果你想让dag在UTC早上5点执行的话,需要设置到4点,其实际运行时间是,NZ表显的UTC 5点中

    4.2 Dag之间的依赖关系

    如果一个DAG有3个任务,第一天运行正常,第二天运行task2失败了,导致之后的task都没运行,第三天,第四天又成功了,第五天因为task2任务失败,又运行成功,这样会导致程序非常不稳定,所以我们可以设置
    depends_on_past = True的话,如果任何一天的task2失败了,之后的所有日期都不会执行,直到task2的问题解决。

    4.3 管理Dag文件夹

    将需要运行的dag和需要处理的scripts脚本文件一起打包,注意scripts和my_dags.py都是在同一个目录下,scripts文件夹里应该有__init__文件

    zip -rm package_dag.zip my_dag.py scripts/
    

    5. Dag出错处理

    添加超时

    • 比如一个dag平时运行的时间是15s,组多不会超过25s,我们就可以设置dag的超时时间dagrun_timeout = timedelta(seconds = 30)
    with DAG(
        dag_id = 'our_first_dag_lg_v1',
        default_args= _default_args,
        start_date = datetime(2024,2,3,1),
        schedule_interval = '@daily',
        dagrun_timeout= timedelta(seconds=30) #添加timeout根据dag的平均运行时间
    ) as dag:
    
    

    添加失败后的回调函数

    def _on_success_callback(dict):
        print("on_success_callback")
        print(dict)
    
    def _on_failure_callback(dict):
        print("on_success_callback")
        print(dict)
    
    with DAG(
        dag_id = 'our_first_dag_lg_v1',
        default_args= _default_args,
        start_date = datetime(2024,2,3,1),
        schedule_interval = '@daily',
        dagrun_timeout= timedelta(seconds=30), #添加timeout根据dag的平均运行时间
        on_success_callback=_on_success_callback, #成功后的回调函数
        on_failure_callback=_on_failure_callback  #失败后的回调函数
        
    ) as dag:
    

    添加失败后的retry

    _default_args= {
        'owner':'lg',
        'retries':'2',  #retry次数
        'retry_delay':timedelta(minutes=3) #retry的delay
    }
    

    添加邮件发送

    • 首先需要早airflow.cfg里面配置smtp
    _default_args= {
        'owner':'lg',
        'retries':'2',  #retry次数
        'retry_delay':timedelta(minutes=3), #retry的delay
        'emails':['pjj@gmail.com'],
        'email_on_failure':True,
        'email_on_retry':False
    }
    

    测试DAGs

    • 使用Pytest来进行dags的单元测试
      1.validation test:用来测试,是否有拼写错误,参数设置错误,等常规错误
      2.pipeline definition test:测试DAGs的数据,依赖关系是否正确
      3.unit test: 检查程序逻辑
      4.integration test: 测试task是否意义获取正确的数据,外部资源是否正确
      5.end to end piplien test:


      image.png

      生产环境流程


      image.png

    pytest学习

    6. 变量

    variable

    1.在页面里添加变量


    image.png

    2.获取变量

    from airflow.models import Variable
    
    support_email = Variable.get("support_email"),
    

    xcom 传递dag之间的数据

    上传xcom的2种方式
    • 使用return直接push
    def _training_model():
        accuracy = uniform(0.1, 10.0)
        print(f"model's accuracy: {accuracy}")
        #push xcom
        return accuracy
    
    image.png
    • 自定义xcom的名称
    def _training_model(ti):
        accuracy = uniform(0.1, 10.0)
        print(f"model's accuracy: {accuracy}")
        ti.xcom_push(key='model_accuracy',value =accuracy)
    
    image.png
    • bash的operator会自动上传一个空的xcom,我们需要关闭他
        downloading_data = BashOperator(
            task_id='downloading_data',
            bash_command='sleep 3',
            do_xcom_push = False
        )
    
    image.png
    拉去xcom的值
    def _choose_best_model(ti):
        print('choose best model')
        #一个id用task_id,多个id用task_ids
        all_accuracies = ti.xcom_pull(key='model_accuracy',task_ids =['training_model_A','training_model_B','training_model_C'])
        print(all_accuracies)
    

    7.发送邮件

    • 配置airflow.cfg
      image.png
    • 添加emailoperator
    from airflow.operators.email import EmailOperator
    
        send_email = EmailOperator(
            task_id = 'send_email',
            to = 'qq394967886@163.com',
            subject='dag error',
            html_content="""
                <h3>Dag has error</h3>
                    """
        )
    

    8. airflow dependency

    1.使用dataset建立denpendency

    使用dataset可以直观的通过数据集来建立dag之间的依赖关系。
    例如:现在有2个dag, dag_A的功能是更新两个s3的数据集,dag_B的任务需要dag_A两个数据集都更新后才可以执行,如果使用传统的方式会比较麻烦,所以我们使用新的方式
    producer.py 用来处理2个数据集的dag

    from airflow.decorators import dag, task
    from datetime import datetime
    
    data_a = Dataset("s3://bucket_a/data_a") #数据集1
    data_b = Dataset("s3://bucket_b/data_b") #数据集2
    
    @dag(start_date=datetime(2023, 1 ,1), schedule='@daily', catchup=False)
    def producer():
    
        @task(outlets=[data_a])
        def update_a():
            print("update A")
    
        @task(outlets=[data_b])
        def update_b():
            print("update B")
    
        update_a() >> update_b()
    
    producer()
    

    costomer.py使用处理好的2个数据集,进行下一步任务的dag

    from airflow.decorators import dag, task
    from datetime import datetime
    
    data_a = Dataset("s3://bucket_a/data_a")
    data_b = Dataset("s3://bucket_b/data_b")
    
    @dag(start_date=datetime(2023, 1 ,1), schedule=[data_a, data_b], catchup=False)
    def consumer():
    
        @task
        def run():
            print('run')
    
        run()
    
    consumer()
    

    这里的schedule=[data_a, data_b]就是必须等待2个数据集outlets输出后,才能执行这个

    2.TriggerDagRunOperator

    假设:有一个dag_A和dag_B,其中dag_A里面有3个task分别是task1获取API得数据,task2 trigger dag_B,和保存数据 ,dag_ B则是一个down_load数据的task里面也有众多的task,可能执行5分钟,也可能执行十几分钟,只想完dag_B之后,需要在dag_A里,清洗数据

    • 我们通过设置TriggerDagRunOperator来设置两个dag的依赖
        #一般情况下,下面的参数都需要
        trigger_download = TriggerDagRunOperator(
            task_id='trigger_target',
            trigger_dag_id='down_loading_v1',
            execution_date='{{ ds }}', #只有2个dag在相同日期的时候才能执行
            reset_dag_run=True, #允许同一个日期多次执行
            wait_for_completion=True, #等待down_loading_v1完成后,执行后面的dag,如果不加的话后面的storagin等三个任务会一直执行
            poke_interval= 10  #检测down_loading_v1是否完成10s一次
        )
    

    down_load_data.py:用来下载的程序,里面也可以多个task

    from airflow import DAG
    from airflow.decorators import dag,task
    from airflow.operators.python import PythonOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    import time
    from datetime import datetime,timedelta
    
    _default_args= {
        'owner':'lg',
        'retries':'2',  #retry次数
        'retry_delay':timedelta(minutes=3), #retry的delay
    }
    
    @dag(_default_args)
    def down_loading_v1():
        @task
        def down_load_ali_data():
            time.sleep(10)
            print('down_loading_ali_data successed')
    
        down_load_ali_data()
    
    down_loading_v1()
    

    pl_ali_data.py:data pipeline 数据下载,迁移,存储,清洗的dag

    from airflow import DAG
    from airflow.decorators import dag,task
    from airflow.operators.python import PythonOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    from datetime import datetime,timedelta
    
    _default_args= {
        'owner':'lg',
        'retries':'2',  #retry次数
        'retry_delay':timedelta(minutes=3), #retry的delay
    }
    
    def _get_api():
        print('getting down_load data api')
    
    def _storaging():
        print('Storaging data from down_loading_dag ')
    
    def _cleaning():
        print('Clearning data')
    
    
    with DAG(
        dag_id='pl_ali_data_v1',
        start_date=datetime(2024,2,12),
        schedule='@daily',
        catchup=False,
        default_args= _default_args
    ) as dag:
    
        get_api_task = PythonOperator(
            task_id= 'get_api_task',
            python_callable=_get_api,
        )
    
        storaging_task = PythonOperator(
            task_id= 'storaging_task',
            python_callable=_storaging,
        )
    
        cleaning_task = PythonOperator(
            task_id= 'cleaning_task',
            python_callable=_cleaning,
        )
    
        #一般情况下,下面的参数都需要
        trigger_download = TriggerDagRunOperator(
            task_id='trigger_target',
            trigger_dag_id='down_loading_v1',
            execution_date='{{ ds }}', #只有2个dag在相同日期的时候才能执行
            reset_dag_run=True, #允许同一个日期多次执行
            wait_for_completion=True, #等待down_loading_v1完成后,执行后面的dag,如果不加的话后面的storagin等三个任务会一直执行
            poke_interval= 10  #检测down_loading_v1是否完成10s一次
        )
    
        get_api_task >> trigger_download >> storaging_task >> cleaning_task
    

    注意:①装饰器的写法和普通的写法不能混用;

    3. depend_on_past = True

    • 设置task2的depend_on_past = True有且仅当task2出错的时候,之后剩下的schedule都会在task2时不执行,当解决了一个出错的task2之后,会自动补全所有的schedule
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        'start_date': datetime(2024, 1, 1),
        'owner': 'lg'
    }
    
    def second_task():
        print('Hello from second_task')
        # raise ValueError('This will turns the python task in failed state')
    
    def third_task():
        print('Hello from third_task')
        # raise ValueError('This will turns the python task in failed state')
    
    with DAG(dag_id='depends_task', schedule_interval="@daily", default_args=default_args,catchup=False) as dag:
        
        # Task 1
        bash_task_1 = BashOperator(task_id='bash_task_1', bash_command="echo 'first task'")
        
        # Task 2
        python_task_2 = PythonOperator(task_id='python_task_2', python_callable=second_task,depends_on_past = True)
    
        # Task 3
        python_task_3 = PythonOperator(task_id='python_task_3', python_callable=third_task)
    
        bash_task_1 >> python_task_2 >> python_task_3
    

    4. wait_for_downstream = True

    设置task2的wait_for_downstream = True,如果任何一个task出现了问题,将不会在执行包括task2之后的所有task,但是task1会执行

        python_task_2 = PythonOperator(task_id='python_task_2', python_callable=second_task,wait_for_downstream= True)
    

    5. trigger rules

    在Apache Airflow中,trigger_rule参数用于指定任务的触发规则,决定了任务何时被触发。下面是Airflow中常用的触发规则(trigger_rule):

    1. all_success: 所有父任务都成功完成时触发。
    2. all_failed: 所有父任务都失败时触发。
    3. all_done: 所有父任务都完成时触发(无论成功或失败)。
    4. one_success: 至少有一个父任务成功完成时触发。
    5. one_failed: 至少有一个父任务失败时触发。
    6. dummy: 永远不触发,这在构建DAG时很有用,以防止不希望被触发的任务被错误地执行。
    • 例如:现有一个从2个api 下载数据的pipeline,当两个现在成功,则发从成功的邮件,任务失败则发送失败邮件,流程如下


      image.png
    • 示例代码:
    import airflow
    from airflow.models import DAG
    from airflow.operators.python import BranchPythonOperator, PythonOperator
    
    default_args = {
        'owner': 'Airflow',
        'start_date': airflow.utils.dates.days_ago(1),
    }
    
    def download_website_a():
        # print("download_website_a")
        raise ValueError("error")
    
    def download_website_b():
        print("download_website_b")
        #raise ValueError("error")
    
    def download_failed():
        print("download_failed")
        #raise ValueError("error")
    
    def download_status():
        print('This is download status')
    
    def download_succeed():
        print("download_succeed")
        #raise ValueError("error")
    
    def process():
        print("process data")
        #raise ValueError("error")
    
    def notif_a():
        print("all_success")
        #raise ValueError("error")
    
    def notif_b():
        print("one_failed")
        #raise ValueError("error")
    
    with DAG(dag_id='trigger_rules', 
        default_args=default_args, 
        schedule_interval="@daily") as dag:
    
        download_status_task = PythonOperator(
            task_id='download_status',
            python_callable=download_status,
            trigger_rule = 'all_success'
        )
    
        download_website_a_task = PythonOperator(
            task_id='download_website_a',
            python_callable=download_website_a,
        )
    
        download_website_b_task = PythonOperator(
            task_id='download_website_b',
            python_callable=download_website_b,
        )
    
        process_task = PythonOperator(
            task_id='process',
            python_callable=process,
            trigger_rule = 'one_success'
        )
    
        notif_all_success_task = PythonOperator(
            task_id='notif_all_success_task',
            python_callable=notif_a,
            trigger_rule = 'all_success'
        )
    
        notif_one_failed_task = PythonOperator(
            task_id='notif_one_failed_task',
            python_callable=notif_b,
            trigger_rule = 'one_failed'
        )
    
        [download_website_a_task,download_website_b_task] >> download_status_task >> process_task>>[notif_all_success_task,notif_one_failed_task]
        
    

    6. ExternalTaskSensor

    -假设有两个DAG,DAG A和DAG B。DAG A的某个任务(task)需要等待DAG B中的某个任务(task)完成后才能执行,那么就可以在DAG A中使用ExternalTaskSensor来等待DAG B的特定任务完成。task和task之间的依赖,所以并不需要等待另一个dag完全执行成功,只需要里面的一个task完成即可。TriggerRunOperator则是dag之间的依赖

    image.png
    import airflow.utils.dates
    from airflow import DAG
    from airflow.sensors.external_task import ExternalTaskSensor
    from airflow.operators.dummy_operator import DummyOperator
    from datetime import datetime, timedelta
    
    default_args = {
            "owner": "airflow", 
            "start_date": airflow.utils.dates.days_ago(1)
        }
    
    with DAG(dag_id="externaltasksensor_dag", default_args=default_args, schedule_interval="@daily") as dag:
        sensor = ExternalTaskSensor(
            task_id='sensor',  
            external_dag_id='sleep_dag',
            external_task_id='t2'    
        )
    
        last_task = DummyOperator(task_id="last_task")
    
        sensor >> last_task
    

    相关文章

      网友评论

          本文标题:AirFlow_本机课程v 1.0

          本文链接:https://www.haomeiwen.com/subject/lwkkadtx.html