美文网首页
pyspark基础入门demo

pyspark基础入门demo

作者: 欧呆哈哈哈 | 来源:发表于2020-11-25 14:54 被阅读0次

    0. 前言

    • spark python提供丰富的库函数,比较容易学习。但是对于新手来说,如何完成一个完整的数据查询和处理的spark,存在一些迷惑
    • 因此本文将详细的对一个入门demo讲述各个部分的作用

    1. 基础操作

    • 启动spark任务
    #python脚本里
    spark = SparkSession.builder.appName(job_name).getOrCreate()
    
    - spark-submit 设置运行参数
    
    #spark安装地址
    spark_home="xxx/spark-2.3/"
    spark_submit=${spark_home}/bin/spark-submit
    #要执行的Python脚本
    py_file=$1
    
    ${spark_submit} \
    --master yarn \
    --queue xxxxx \
    --num-executors 250 \
    --executor-cores 4 \  #executor的核数,每个核可运行一个进程,核越多说明可并行程度越高
    --executor-memory 16G \ #executor所占内存
    --files adapter.py \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.dynamicAllocation.enable=false \
    --conf spark.yarn.priority=NORMAL \
    --conf spark.default.parallelism=1200 \
    $py_file 
    
    • 读取文件
      • sql 读取
    #平常执行的sql语句
    sql_str = "" 
    #执行sql语句
    spark.sql(sql_str)
    
    - 读取文本
    
    #定义文本的schema 表示文本的结构
    midlog_schema = T.StructType([
    T.StructField("q_stra", T.StringType(), True),
    T.StructField("query", T.StringType(), True),
    T.StructField("qfreq", T.StringType(), True),
    T.StructField("date", T.StringType(), True),
    ])
    #读取文本为DataFrame对象
    midlog_data = spark.read.csv(text_path_str, sep='\001', schema=midlog_schema, header=None, inferSchema=False, mode='FAILFAST')
    
    • 处理数据
      • 一般为处理特征,过滤无效值。包括很多函数,涉及到udf。udf是用户自定义的函数,灵活性相比spark提供的函数更高,但是pyspark的udf性能较低(dataframe自带的函数可以绕过python对象->java对象->spark底层通信,但是udf避免不了,因此会存在多次文件的序列化,性能不高)
      • 声明udf
    #udf(函数,返回类型)
    disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
    
    - 调用udf
    
    # 由于spark内部不支持一次性传入多个参数,使用struct 可以传入多个参数
    data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
    
    - udf 定义
    
    def get_disp_info(self, disp_args):
      #F.struct在函数中是元组,根据元组方式获取对应的参数
    disp_result = disp_args[0]
    day = disp_args[1]
    disp_info = []
    if disp_result is None:
        return disp_info
    #处理其他步骤
    
    • 查询对应的数据
      • 无固定的方式,根据自己的目的。
      • 分组求和,获取某个条件的所有数据等等
    • demo 代码
    
    # pyspark 相关的库
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    from pyspark.sql import SQLContext
    
    reload(sys)
    sys.setdefaultencoding('utf-8')
    job_name = "%s_monitor_data_%s" % (user_name, day)
    #启动spark任务,可在该语句增加spark任务配置(executor memory,executor个数等)
    #具体配置参数可查找spark文档
    spark = SparkSession.builder.appName(job_name).getOrCreate()
    #定义sql语句
    #读取文件有两种方式:sql读表;读取文本(见1.2)
    sql_str = "select event_day, search_id, " \
    "disp_result " \
    "from data_table " \
    "where event_day = %s " \
    "and is_spam != '1' " \
    "and page_no = '1' "  % day
    #执行sql语句
    data = spark.sql(sql_str).cache()
    #定义udf
    disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
    #调用udf
    data = data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
    data\
        .groupBy(['event_day'])\
        .agg(
            F.countDistinct('search_id').alias('pv')
        ).coalesce(1)\
        .write.csv("/user/%s/tmp_table/search_pv/%s' % (user_name, day), sep='\t', mode='overwrite')
    

    相关文章

      网友评论

          本文标题:pyspark基础入门demo

          本文链接:https://www.haomeiwen.com/subject/jfdniktx.html