美文网首页
Spark使用Airflow提交Job实例Dag

Spark使用Airflow提交Job实例Dag

作者: john瀚 | 来源:发表于2020-03-30 16:41 被阅读0次
    # encoding: utf-8
    from airflow import DAG
    from datetime import datetime
    from datetime import timedelta
    from airflow.operators.bash_operator import BashOperator
    
    brand = "{{ dag_run.conf['brand'] }}"
    city = "{{ dag_run.conf['city'] }}"
    site = "{{ dag_run.conf['site'] }}"
    dt = "{{ dag_run.conf['dt'] }}"
    # flag = "dev"
    flag = "{{ dag_run.conf['flag'] }}"
    
    # 定义dag中instance的资源池,默认定义使用default_date_warehouse_etl_pool
    pool = 'aibee_common_bi_default_analysis'
    
    # dag基本参数配置
    args = {
        'owner': 'xxx',
        'email': ['xxx'],
        'phone': ['xx','x','x','x'],
        'tel_on_failure': False,
        'sms_on_failure': True,
        'email_on_failure': True,
        'tel_on_retry': False,
        'sms_on_retry': False,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(seconds=60),
        'start_date': "2020-02-10 00:00:00",            # 定义启动时间 dag创建时间减一天
        'pool': pool
    }
    
    # dag定义
    dag = DAG(
        dag_id='zz_aibee_mall_lite_v11_common_bi_pipeline',
        description=' Mall-v02 Common DAG',
        default_args=args,
        schedule_interval=None
    )
    
    # submit spark task
    def build_spark_submit_command(params):
        cmd = ["spark-submit --master yarn --deploy-mode cluster"]
        if 'conf' in params:
            for key in params['conf']:
                cmd += ["--conf", "{}={}".format(key, str(params['conf'][key]))]
        if 'queue' in params:
            cmd += ["--queue", params['queue']]
        if 'jars' in params:
            cmd += ["--jars", params['jars']]
        if 'num_executors' in params:
            cmd += ["--num-executors", str(params['num_executors'])]
        if 'total_executor_cores' in params:
            cmd += ["--total-executor-cores", str(params['total_executor_cores'])]
        if 'executor_cores' in params:
            cmd += ["--executor-cores", str(params['executor_cores'])]
        if 'executor_memory' in params:
            cmd += ["--executor-memory", params['executor_memory']]
        if 'driver_memory' in params:
            cmd += ["--driver-memory", params['driver_memory']]
        if 'name' in params:
            cmd += ["--name", params['name']]
        if 'java_class' in params:
            cmd += ["--class", params['java_class']]
        if 'verbose' in params:
            cmd += ["--verbose"]
        if 'application' in params:
            cmd += [params['application']]
        if 'application_args' in params:
            cmd += params['application_args']
        ## 新增kerberos认证。默认使用bigdata用户票决认证
        if 'spark_yarn_keytab' in params:
            cmd += params['spark_yarn_keytab']
        else:
            cmd += "/tmp/bigdata.keytab"
        if 'spark_yarn_principal' in params:
            cmd += params['spark_yarn_principal']
        else:
            cmd += "bigdata@AIBEE.CN"
    
        print("Spark-Submit cmd: %s" % ' '.join(cmd))
        return ' '.join(cmd)
    
    task_names_stage3 = 'BaseDataEtlTask'
    task_names_stage4 = ['AreaPersonTimesAndCountTask', 'ArriveTimesStatisticTask', 'CustomerDetailTask',
                         'FloorPersonTimesAndCountTask', 'MallPersonTimesAndCountTask','MallTimeSliceStatisticTask'
                        ]
    yarn_queue = 'data-platform'
    class_dir = 'com.aibee.mall.tasks'
    jar_dir = 'hdfs:///prod/data_platform/date_warehouse/project/bi/common_bi_mall_lite_v11/jars/latest/MallLiteBI-latest.jar'
    
    
    # bi statistic
    submit_task_stage3 = BashOperator(
        task_id='task.%s' % task_names_stage3,
        bash_command=build_spark_submit_command(dict(
            name=task_names_stage3,
            driver_memory='4g',
            num_executors=1,
            executor_cores=1,
            executor_memory='4g',
            queue=yarn_queue,
            java_class='%s.%s' % (class_dir,task_names_stage3),
            application=jar_dir,
            application_args=[brand, city, site, dt, flag]
        )),
        dag=dag
    )
    
    
    trigger_date = str(datetime.now().strftime('%Y-%m-%d'))
    
    # 注意StoreCustomerStickTask 客户粘性的时期始终为执行时期,其他为参数传递的执行日期
    for task_name in task_names_stage4:
        submit_task_stage4 = BashOperator(
            task_id='task.%s' % task_name,
            bash_command=build_spark_submit_command(dict(
                name=task_name,
                driver_memory='8g',
                num_executors=1,
                executor_cores=1,
                executor_memory='4g',
                queue=yarn_queue,
                conf={'spark.driver.maxResultSize': '8g'},
                java_class='%s.%s' % (class_dir, task_name),
                application=jar_dir,
                application_args=[brand, city, site, dt, flag]
            )),
            dag=dag
        ) if task_name == 'ArriveTimesStatisticTask'  else BashOperator(
            task_id='task.%s' % task_name,
            bash_command=build_spark_submit_command(dict(
                name=task_name,
                driver_memory='8g',
                num_executors=1,
                executor_cores=1,
                executor_memory='4g',
                queue=yarn_queue,
                conf={'spark.driver.maxResultSize': '8g'},
                java_class='%s.%s' % (class_dir, task_name),
                application=jar_dir,
                application_args=[brand, city, site, dt, flag]
            )),
            dag=dag
        )
        submit_task_stage3 >> submit_task_stage4
    

    相关文章

      网友评论

          本文标题:Spark使用Airflow提交Job实例Dag

          本文链接:https://www.haomeiwen.com/subject/nebvuhtx.html