airflow经常会调度shell文件,同时需要传递参数到shell文件。调度任务通常需要和执行时间相关,采用宏来生成参数并把参数传递到shell文件。
python的dag脚本如下:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': '******',
'depends_on_past': False,
'start_date': datetime(2019, 6, 1),
'email': ['**@**.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
#每小时跑一次,catchup=False不回补以前历史任务,只跑当前时间开始的任务。
dag = DAG('sqoop_hourly', default_args=default_args, schedule_interval='14 * * * *', catchup=False)
# 使用宏构造参数,传递宏的参数到shell脚本,跳转到执行文件路径执行。
bash_cmd = '''
cd /usr/local/airflow/script/***/base_report
echo {{ds}}
echo {{macros.ds_format(ds,"%Y-%m-%d", "%Y%m")}}
sh ./sqoop_hourly.sh {{macros.ds_format(ds,"%Y-%m-%d", "%Y%m")}}
'''
print (bash_cmd)
# t1, t2 and t3 are examples of tasks created by instantiating operators
# ${ym}=yyyyMM
#
t1 = BashOperator(
task_id='sqoop_hourly',
bash_command=bash_cmd,
dag=dag)
sqoop_hourly.sh的shell文件如下:
#小时级更新报表
ym=$1
echo ${ym}
网友评论