美文网首页
pyspark基础入门demo

pyspark基础入门demo

作者: 欧呆哈哈哈 | 来源:发表于2020-11-25 14:54 被阅读0次

0. 前言

  • spark python提供丰富的库函数,比较容易学习。但是对于新手来说,如何完成一个完整的数据查询和处理的spark,存在一些迷惑
  • 因此本文将详细的对一个入门demo讲述各个部分的作用

1. 基础操作

  • 启动spark任务
#python脚本里
spark = SparkSession.builder.appName(job_name).getOrCreate()
- spark-submit 设置运行参数
#spark安装地址
spark_home="xxx/spark-2.3/"
spark_submit=${spark_home}/bin/spark-submit
#要执行的Python脚本
py_file=$1

${spark_submit} \
--master yarn \
--queue xxxxx \
--num-executors 250 \
--executor-cores 4 \  #executor的核数,每个核可运行一个进程,核越多说明可并行程度越高
--executor-memory 16G \ #executor所占内存
--files adapter.py \
--conf spark.sql.catalogImplementation=hive \
--conf spark.dynamicAllocation.enable=false \
--conf spark.yarn.priority=NORMAL \
--conf spark.default.parallelism=1200 \
$py_file 
  • 读取文件
    • sql 读取
#平常执行的sql语句
sql_str = "" 
#执行sql语句
spark.sql(sql_str)
- 读取文本
#定义文本的schema 表示文本的结构
midlog_schema = T.StructType([
T.StructField("q_stra", T.StringType(), True),
T.StructField("query", T.StringType(), True),
T.StructField("qfreq", T.StringType(), True),
T.StructField("date", T.StringType(), True),
])
#读取文本为DataFrame对象
midlog_data = spark.read.csv(text_path_str, sep='\001', schema=midlog_schema, header=None, inferSchema=False, mode='FAILFAST')
  • 处理数据
    • 一般为处理特征,过滤无效值。包括很多函数,涉及到udf。udf是用户自定义的函数,灵活性相比spark提供的函数更高,但是pyspark的udf性能较低(dataframe自带的函数可以绕过python对象->java对象->spark底层通信,但是udf避免不了,因此会存在多次文件的序列化,性能不高)
    • 声明udf
#udf(函数,返回类型)
disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
- 调用udf
# 由于spark内部不支持一次性传入多个参数,使用struct 可以传入多个参数
data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
- udf 定义
def get_disp_info(self, disp_args):
  #F.struct在函数中是元组,根据元组方式获取对应的参数
disp_result = disp_args[0]
day = disp_args[1]
disp_info = []
if disp_result is None:
    return disp_info
#处理其他步骤
  • 查询对应的数据
    • 无固定的方式,根据自己的目的。
    • 分组求和,获取某个条件的所有数据等等
  • demo 代码

# pyspark 相关的库
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SQLContext

reload(sys)
sys.setdefaultencoding('utf-8')
job_name = "%s_monitor_data_%s" % (user_name, day)
#启动spark任务,可在该语句增加spark任务配置(executor memory,executor个数等)
#具体配置参数可查找spark文档
spark = SparkSession.builder.appName(job_name).getOrCreate()
#定义sql语句
#读取文件有两种方式:sql读表;读取文本(见1.2)
sql_str = "select event_day, search_id, " \
"disp_result " \
"from data_table " \
"where event_day = %s " \
"and is_spam != '1' " \
"and page_no = '1' "  % day
#执行sql语句
data = spark.sql(sql_str).cache()
#定义udf
disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
#调用udf
data = data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
data\
    .groupBy(['event_day'])\
    .agg(
        F.countDistinct('search_id').alias('pv')
    ).coalesce(1)\
    .write.csv("/user/%s/tmp_table/search_pv/%s' % (user_name, day), sep='\t', mode='overwrite')

相关文章

  • pyspark基础入门demo

    0. 前言 spark python提供丰富的库函数,比较容易学习。但是对于新手来说,如何完成一个完整的数据查询和...

  • pyspark整理

    pyspark入门资料 公众号回复:pyspark (会有pyspark资料大礼包:Learning PySpar...

  • iOS - Swift 浅析

    Swift开发入门-基础知识,附demo - 简书 Swift开发入门-进阶知识(一),附demo - 简书 Sw...

  • Android Studio 2.2 NDK入门(二) 增量更新

    基础回顾 Android Studio 2.2 NDK入门(一) 官方DEMO解析 http://www.jian...

  • 入门Leaflet之小Demo

    入门 Leaflet 之小 Demo 写在前面 ---- WebGIS 开发基础之 LeafletGIS 基本概念...

  • 手把手带你入门PySpark!

    PySpark数据科学入门 PySpark是一种很好的语言,可以大规模地进行探索性数据分析、构建机器学习管道以及为...

  • 一个 Demo 入门 Flutter

    一个 Demo 入门 Flutter 一个 Demo 入门 Flutter

  • SpringBoot(入门demo)

    以下包含SpringBoot的入门Demo和 SpringBoot+Mybatis的整合入门demo(使用IDE是...

  • pySpark DataFrame入门

    DataFrame是一种不可变的分布式数据集,这种数据被组织成指定的列,类似于关系数据库中的表。Spark Dat...

  • PySpark DataFrame 入门

    1 创建数据 2 Distinct 去重 3 聚合Group by 4 Filter/ Where 按条件删选 5...

网友评论

      本文标题:pyspark基础入门demo

      本文链接:https://www.haomeiwen.com/subject/jfdniktx.html