美文网首页
Spark使用Airflow提交Job实例Dag

Spark使用Airflow提交Job实例Dag

作者: john瀚 | 来源:发表于2020-03-30 16:41 被阅读0次
# encoding: utf-8
from airflow import DAG
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator

brand = "{{ dag_run.conf['brand'] }}"
city = "{{ dag_run.conf['city'] }}"
site = "{{ dag_run.conf['site'] }}"
dt = "{{ dag_run.conf['dt'] }}"
# flag = "dev"
flag = "{{ dag_run.conf['flag'] }}"

# 定义dag中instance的资源池,默认定义使用default_date_warehouse_etl_pool
pool = 'aibee_common_bi_default_analysis'

# dag基本参数配置
args = {
    'owner': 'xxx',
    'email': ['xxx'],
    'phone': ['xx','x','x','x'],
    'tel_on_failure': False,
    'sms_on_failure': True,
    'email_on_failure': True,
    'tel_on_retry': False,
    'sms_on_retry': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(seconds=60),
    'start_date': "2020-02-10 00:00:00",            # 定义启动时间 dag创建时间减一天
    'pool': pool
}

# dag定义
dag = DAG(
    dag_id='zz_aibee_mall_lite_v11_common_bi_pipeline',
    description=' Mall-v02 Common DAG',
    default_args=args,
    schedule_interval=None
)

# submit spark task
def build_spark_submit_command(params):
    cmd = ["spark-submit --master yarn --deploy-mode cluster"]
    if 'conf' in params:
        for key in params['conf']:
            cmd += ["--conf", "{}={}".format(key, str(params['conf'][key]))]
    if 'queue' in params:
        cmd += ["--queue", params['queue']]
    if 'jars' in params:
        cmd += ["--jars", params['jars']]
    if 'num_executors' in params:
        cmd += ["--num-executors", str(params['num_executors'])]
    if 'total_executor_cores' in params:
        cmd += ["--total-executor-cores", str(params['total_executor_cores'])]
    if 'executor_cores' in params:
        cmd += ["--executor-cores", str(params['executor_cores'])]
    if 'executor_memory' in params:
        cmd += ["--executor-memory", params['executor_memory']]
    if 'driver_memory' in params:
        cmd += ["--driver-memory", params['driver_memory']]
    if 'name' in params:
        cmd += ["--name", params['name']]
    if 'java_class' in params:
        cmd += ["--class", params['java_class']]
    if 'verbose' in params:
        cmd += ["--verbose"]
    if 'application' in params:
        cmd += [params['application']]
    if 'application_args' in params:
        cmd += params['application_args']
    ## 新增kerberos认证。默认使用bigdata用户票决认证
    if 'spark_yarn_keytab' in params:
        cmd += params['spark_yarn_keytab']
    else:
        cmd += "/tmp/bigdata.keytab"
    if 'spark_yarn_principal' in params:
        cmd += params['spark_yarn_principal']
    else:
        cmd += "bigdata@AIBEE.CN"

    print("Spark-Submit cmd: %s" % ' '.join(cmd))
    return ' '.join(cmd)

task_names_stage3 = 'BaseDataEtlTask'
task_names_stage4 = ['AreaPersonTimesAndCountTask', 'ArriveTimesStatisticTask', 'CustomerDetailTask',
                     'FloorPersonTimesAndCountTask', 'MallPersonTimesAndCountTask','MallTimeSliceStatisticTask'
                    ]
yarn_queue = 'data-platform'
class_dir = 'com.aibee.mall.tasks'
jar_dir = 'hdfs:///prod/data_platform/date_warehouse/project/bi/common_bi_mall_lite_v11/jars/latest/MallLiteBI-latest.jar'


# bi statistic
submit_task_stage3 = BashOperator(
    task_id='task.%s' % task_names_stage3,
    bash_command=build_spark_submit_command(dict(
        name=task_names_stage3,
        driver_memory='4g',
        num_executors=1,
        executor_cores=1,
        executor_memory='4g',
        queue=yarn_queue,
        java_class='%s.%s' % (class_dir,task_names_stage3),
        application=jar_dir,
        application_args=[brand, city, site, dt, flag]
    )),
    dag=dag
)


trigger_date = str(datetime.now().strftime('%Y-%m-%d'))

# 注意StoreCustomerStickTask 客户粘性的时期始终为执行时期,其他为参数传递的执行日期
for task_name in task_names_stage4:
    submit_task_stage4 = BashOperator(
        task_id='task.%s' % task_name,
        bash_command=build_spark_submit_command(dict(
            name=task_name,
            driver_memory='8g',
            num_executors=1,
            executor_cores=1,
            executor_memory='4g',
            queue=yarn_queue,
            conf={'spark.driver.maxResultSize': '8g'},
            java_class='%s.%s' % (class_dir, task_name),
            application=jar_dir,
            application_args=[brand, city, site, dt, flag]
        )),
        dag=dag
    ) if task_name == 'ArriveTimesStatisticTask'  else BashOperator(
        task_id='task.%s' % task_name,
        bash_command=build_spark_submit_command(dict(
            name=task_name,
            driver_memory='8g',
            num_executors=1,
            executor_cores=1,
            executor_memory='4g',
            queue=yarn_queue,
            conf={'spark.driver.maxResultSize': '8g'},
            java_class='%s.%s' % (class_dir, task_name),
            application=jar_dir,
            application_args=[brand, city, site, dt, flag]
        )),
        dag=dag
    )
    submit_task_stage3 >> submit_task_stage4

相关文章

网友评论

      本文标题:Spark使用Airflow提交Job实例Dag

      本文链接:https://www.haomeiwen.com/subject/nebvuhtx.html