美文网首页
Airflow 教程

Airflow 教程

作者: 焉知非鱼 | 来源:发表于2017-07-06 17:36 被阅读2006次

    安装 AirFlow

    sudo pip install airflow --upgrade --ignore-installed
    mkdir -p /Users/ohmycloud/airflow/dags
    

    dags 目录中放入该文件:

    # -*- coding:utf-8 -*-
    # airflowPysparkDagTest.py
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    import os
    
    sparkSubmit = '/Users/ohmycloud/opt/spark-2.1.0-bin-hadoop2.6/bin/spark-submit'
    
    ## Define the DAG object
    default_args = {
        'owner': '焉知非鱼',
        'depends_on_past': False,
        'start_date': datetime(2017, 12, 27),
        'retries': 5,
        'retry_delay': timedelta(minutes=1),
    }
    dag = DAG('PysparkTest', default_args=default_args, schedule_interval=timedelta(1))
    
    numUniqueAuthors = BashOperator(
        task_id='unique-event',
        bash_command=sparkSubmit + ' ' + '/Users/ohmycloud/scripts/Python/spark-json/test.py',
        dag=dag)
    

    test.py 里面写入:

    # -*- coding:utf-8 -*-
    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    from pyspark.sql.functions import countDistinct,count,sum,when,col,current_date,date_sub,unix_timestamp,lit,from_unixtime
    import sys
    import time
    import datetime
    
    # 从 json 数据源读取数据
    def read_from_json(spark):
        df=spark.read.json("/Users/ohmycloud/scripts/Python/ald_py/etl.json")
        event_logs_df=df.filter(df.ev=="event") \
                             .select(
                                 df.ak.alias("app_key"),
                                 df.at.alias("access_token"),
                                 df.tp.alias("ev")
                             )
        event_logs_df.show()
    
    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .appName("aldstat") \
            .config("spark.sql.shuffle.partitions", 48) \
            .master("local") \
            .getOrCreate()
        read_from_json(spark)
        spark.stop()
    

    测试一次:

    airflow list_dags
    airflow backfill PysparkTest -s 2017-12-27
    

    在 UI 界面中会看到运行成功了一次, 日志中也能看见打印出来了结果:

    WX20171227-154224@2x.png

    相关文章

      网友评论

          本文标题:Airflow 教程

          本文链接:https://www.haomeiwen.com/subject/lsxkhxtx.html