美文网首页
Airflow 动态指定DAG运行队列

Airflow 动态指定DAG运行队列

作者: 默默无痕 | 来源:发表于2021-03-25 18:57 被阅读0次

    Airflow版本 2.0.1
    根据官网提示,airflow 支持hook函数,可以在DAG运行开始之前 根据我们触发的参数 指定队列
    https://github.com/apache/airflow/discussions/14987
    新建 airflow_local_settings.py 文件, 放在${AIRFLOW_HOME}/config/airflow_local_settings.py
    无需重启scheduler 即可加载。

    def task_instance_mutation_hook(task_instance):
        dag_run = task_instance.get_dagrun()
        conf = dag_run.conf
        # print(conf) 
        # conf为 trigger_dag 的 --conf  参数
        if conf['key4']:
            if 'change_queue' in conf:
                return
            else:
                task_instance.queue = conf['key4']
                conf["change_queue"] = 'true'
    

    上面代码表示,在DAG 刚刚要运行的时候,会执行上面的hook函数 拿到conf 根据conf中的 key4 重新执行DAG的queue。

    DAG触发参数为

     airflow dags trigger 'dag_test_bash' -r `uuidgen` --conf '{"key4":"test_param"}'
    

    传入的参数 key4 的value 会赋值到 task_instance.queue 也就是在DAG运行前 为其动态分配了队列,实际生产中 可以用该种方式去路由DAG到指定ip的机器去运行,我们可以在每台机器起自己IP+业务前缀的worker,然后在这里指定,如指定queue为 prod_10.0.0.1的queue。

    相关文章

      网友评论

          本文标题:Airflow 动态指定DAG运行队列

          本文链接:https://www.haomeiwen.com/subject/liyrhltx.html