# encoding: utf-8
from airflow import DAG
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator
brand = "{{ dag_run.conf['brand'] }}"
city = "{{ dag_run.conf['city'] }}"
site = "{{ dag_run.conf['site'] }}"
dt = "{{ dag_run.conf['dt'] }}"
# flag = "dev"
flag = "{{ dag_run.conf['flag'] }}"
# 定义dag中instance的资源池,默认定义使用default_date_warehouse_etl_pool
pool = 'aibee_common_bi_default_analysis'
# dag基本参数配置
args = {
'owner': 'xxx',
'email': ['xxx'],
'phone': ['xx','x','x','x'],
'tel_on_failure': False,
'sms_on_failure': True,
'email_on_failure': True,
'tel_on_retry': False,
'sms_on_retry': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(seconds=60),
'start_date': "2020-02-10 00:00:00", # 定义启动时间 dag创建时间减一天
'pool': pool
}
# dag定义
dag = DAG(
dag_id='zz_aibee_mall_lite_v11_common_bi_pipeline',
description=' Mall-v02 Common DAG',
default_args=args,
schedule_interval=None
)
# submit spark task
def build_spark_submit_command(params):
cmd = ["spark-submit --master yarn --deploy-mode cluster"]
if 'conf' in params:
for key in params['conf']:
cmd += ["--conf", "{}={}".format(key, str(params['conf'][key]))]
if 'queue' in params:
cmd += ["--queue", params['queue']]
if 'jars' in params:
cmd += ["--jars", params['jars']]
if 'num_executors' in params:
cmd += ["--num-executors", str(params['num_executors'])]
if 'total_executor_cores' in params:
cmd += ["--total-executor-cores", str(params['total_executor_cores'])]
if 'executor_cores' in params:
cmd += ["--executor-cores", str(params['executor_cores'])]
if 'executor_memory' in params:
cmd += ["--executor-memory", params['executor_memory']]
if 'driver_memory' in params:
cmd += ["--driver-memory", params['driver_memory']]
if 'name' in params:
cmd += ["--name", params['name']]
if 'java_class' in params:
cmd += ["--class", params['java_class']]
if 'verbose' in params:
cmd += ["--verbose"]
if 'application' in params:
cmd += [params['application']]
if 'application_args' in params:
cmd += params['application_args']
## 新增kerberos认证。默认使用bigdata用户票决认证
if 'spark_yarn_keytab' in params:
cmd += params['spark_yarn_keytab']
else:
cmd += "/tmp/bigdata.keytab"
if 'spark_yarn_principal' in params:
cmd += params['spark_yarn_principal']
else:
cmd += "bigdata@AIBEE.CN"
print("Spark-Submit cmd: %s" % ' '.join(cmd))
return ' '.join(cmd)
task_names_stage3 = 'BaseDataEtlTask'
task_names_stage4 = ['AreaPersonTimesAndCountTask', 'ArriveTimesStatisticTask', 'CustomerDetailTask',
'FloorPersonTimesAndCountTask', 'MallPersonTimesAndCountTask','MallTimeSliceStatisticTask'
]
yarn_queue = 'data-platform'
class_dir = 'com.aibee.mall.tasks'
jar_dir = 'hdfs:///prod/data_platform/date_warehouse/project/bi/common_bi_mall_lite_v11/jars/latest/MallLiteBI-latest.jar'
# bi statistic
submit_task_stage3 = BashOperator(
task_id='task.%s' % task_names_stage3,
bash_command=build_spark_submit_command(dict(
name=task_names_stage3,
driver_memory='4g',
num_executors=1,
executor_cores=1,
executor_memory='4g',
queue=yarn_queue,
java_class='%s.%s' % (class_dir,task_names_stage3),
application=jar_dir,
application_args=[brand, city, site, dt, flag]
)),
dag=dag
)
trigger_date = str(datetime.now().strftime('%Y-%m-%d'))
# 注意StoreCustomerStickTask 客户粘性的时期始终为执行时期,其他为参数传递的执行日期
for task_name in task_names_stage4:
submit_task_stage4 = BashOperator(
task_id='task.%s' % task_name,
bash_command=build_spark_submit_command(dict(
name=task_name,
driver_memory='8g',
num_executors=1,
executor_cores=1,
executor_memory='4g',
queue=yarn_queue,
conf={'spark.driver.maxResultSize': '8g'},
java_class='%s.%s' % (class_dir, task_name),
application=jar_dir,
application_args=[brand, city, site, dt, flag]
)),
dag=dag
) if task_name == 'ArriveTimesStatisticTask' else BashOperator(
task_id='task.%s' % task_name,
bash_command=build_spark_submit_command(dict(
name=task_name,
driver_memory='8g',
num_executors=1,
executor_cores=1,
executor_memory='4g',
queue=yarn_queue,
conf={'spark.driver.maxResultSize': '8g'},
java_class='%s.%s' % (class_dir, task_name),
application=jar_dir,
application_args=[brand, city, site, dt, flag]
)),
dag=dag
)
submit_task_stage3 >> submit_task_stage4
网友评论