PySpark SQL常用语法

作者: 真依然很拉风 | 来源:发表于2018-05-14 21:21 被阅读1563次

    许多数据分析师都是用HIVE SQL跑数,这里我建议转向PySpark:

    • PySpark的语法是从左到右串行的,便于阅读、理解和修正;SQL的语法是从内到外嵌套的,不方便维护;
    • PySpark继承Python优美、简洁的语法,同样的效果,代码行数可能只有SQL的十分之一;
    • Spark分转化操作和行动操作,只在行动操作时才真正计算,所以可以减少不必要的计算时间;
    • 相对于SQL层层嵌套的一个整体,PySpark可以拆分成多步,并可以十分方便地把中间结果保存为变量,更有利于调试和修改;
    • PySpark可以与Python中的其他模块结合使用,可以将多种功能有机结合成一个系统
    • PySpark SQL模块许多函数、方法与SQL中关键字一样,可以以比较低的学习成本切换
    • 最重要的,Spark是基于内存计算的,计算速度本身比Hive快很多倍

    PySpark的安装配置

    如果只是在单机上练习下,照着网上的帖子在Linux系统安装一下就可以了;如果想真正在集群上实战,还是找运维搭建吧。

    PySpark SQL语法

    最好的学习资料当然是官方文档,不过官方文档是按函数名排序的,这对于新手不太友好,所以这里整理一下。

    数据拉取

    第一步是拉取数据,与SQL、Pandas、R一样,在SparkSQL中,我们以DataFrame以基本的数据结构(不过要注意,SparkSQL DataFrame与Pandas的DataFrame是两种数据结构,虽然相互转换也很容易)。

    加载包
    from __future__ import print_function
    import pandas as pd
    from pyspark.sql import HiveContext
    from pyspark import SparkContext,SparkConf
    from sqlalchemy import create_engine
    import datetime
    import pyspark.sql.functions as F
    
    conf = SparkConf().setAppName("abc")
    sc = SparkContext(conf=conf)
    hiveCtx = HiveContext(sc)
    

    手工创建一个DataFrame
    d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}]
    df = sqlContext.createDataFrame(d)
    df.show() 
    
    从集群里运行SQL生成DataFrame

    实际工作中往往是从集群中拉数,然后处理;还是执行SQL(尽管仍是SQL,但是不必写复杂的SQL;用基本的SQL先把源数据拉出来,复杂的处理和计算交给Spark来做),以下是用Hive拉数:

    sql = "" # 拉数SQL
    df  = hiveCtx.sql(sql)
    

    缓存与清除缓存

    Spark每次作行动操作时,都是从最初的转化操作开始计算;如果不想从头开始计算,想保存中间结果表,就应该把数据载入缓存。

    df.cache()
    

    与之相对的,清除缓存为

    sqlContext.clearCache()
    

    数据探索
    展示
    df.show() # 不加参数默认展示前20行
    
    统计行数
    df.count() 
    
    查看schema
    df.printSchema() 
    
    查看字段
    df.columns
    
    查看字段类型
    df.dtypes
    

    数据处理
    查询
    df.select('age','name') # 带show才能看到结果
    
    别名
    df.select(df.age.alias('age_value'),'name')
    
    筛选
    df.filter(df.name=='Alice')
    
    增加列

    增加列有2种方法,一种是基于现在的列计算;一种是用pyspark.sql.functions的lit()增加常数列。

    df.select(df.age+1,'age','name')
    df.select(F.lit(0).alias('id'),'age','name')
    
    增加行
    df.unionAll(df2)
    
    删除重复记录
    df.drop_duplicates()
    
    去重
    df.distinct()
    
    删除列
    df.drop('id')
    
    删除存在缺失值的记录
    df.dropna(subset=['age', 'name'])  # 传入一个list,删除指定字段中存在缺失的记录
    
    填补缺失值
    df.fillna({'age':10,'name':'abc'})  # 传一个dict进去,对指定的字段填充
    
    分组计算
    df.groupby('name').agg(F.max(df['age']))
    
    join
    df.groupby('name').agg(F.max(df['age']))
    

    函数和UDF

    pyspark.sql.functions里有许多常用的函数,可以满足日常绝大多数的数据处理需求;当然也支持自己写的UDF,直接拿来用。

    自带函数

    根据官方文档,以下是部分函数说明:

    'lit': 'Creates a :class:`Column` of literal value.',
    'col': 'Returns a :class:`Column` based on the given column name.',
    'column': 'Returns a :class:`Column` based on the given column name.',
    'asc': 'Returns a sort expression based on the ascending order of the given column name.',
    'desc': 'Returns a sort expression based on the descending order of the given column name.',
    
    'upper': 'Converts a string expression to upper case.',
    'lower': 'Converts a string expression to upper case.',
    'sqrt': 'Computes the square root of the specified float value.',
    'abs': 'Computes the absolutle value.',
    
    'max': 'Aggregate function: returns the maximum value of the expression in a group.',
    'min': 'Aggregate function: returns the minimum value of the expression in a group.',
    'first': 'Aggregate function: returns the first value in a group.',
    'last': 'Aggregate function: returns the last value in a group.',
    'count': 'Aggregate function: returns the number of items in a group.',
    'sum': 'Aggregate function: returns the sum of all values in the expression.',
    'avg': 'Aggregate function: returns the average of the values in a group.',
    'mean': 'Aggregate function: returns the average of the values in a group.',
    'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
    
    df.select(F.max(df.age))
    df.select(F.min(df.age))
    df.select(F.avg(df.age)) # 也可以用mean,一样的效果
    df.select(F.countDistinct(df.age)) # 去重后统计
    df.select(F.count(df.age)) # 直接统计,经试验,这个函数会去掉缺失值会再统计
    
    from pyspark.sql import Window
    df.withColumn("row_number", F.row_number().over(Window.partitionBy("a","b","c","d").orderBy("time"))).show() # row_number()函数
    

    日期相关函数参考:pyspark系列--日期函数

    UDF

    统计计算
    描述性统计分析
    df.describe('age').show() # describe()相当于R里的summary()
    

    数据写出

    数据写出有以下几种情况——

    • 写入集群分区表
    all_bike.rdd.map(lambda line: u','.join(map(lambda x:unicode(x),line))).saveAsTextFile('/user/hive/warehouse/bi.db/bikeid_without_3codes_a_d/dt={}'.format(t0_uf)) #转化为RDD写入HDFS路径
    

    还有一种方法,是先把dataframe创建成一个临时表,再用hive sql的语句写入表的分区。

    bike_change_2days.registerTempTable('bike_change_2days')
    sqlContext.sql("insert into bi.bike_changes_2days_a_d partition(dt='%s') select citycode,biketype,detain_bike_flag,bike_tag_onday,bike_tag_yesterday,bike_num from bike_change_2days"%(date))
    
    • 写入集群非分区表
    df_spark.write.mode("append").insertInto('bi.pesudo_bike_white_list') # 直接使用write.mode方法insert到指定的集群表
    
    • 写入数据库
      可以先将PySpark DataFrame转化成Pandas DataFrame,然后用pandas的to_sql方法插入数据库

    • 写出本地

    df.write.csv()
    

    与Pandas DataFrame互相转换

    如果你熟悉Pandas包,并且PySpark处理的中间数据量不是太大,那么可以直接转换成pandas DataFrame,然后转化成常规操作。

    df.toPandas() # PySpark DataFrame转化成Pandas DataFrame
    
    import pandas as pd
    df_p = pd.DataFrame(dict(num=range(3),char=['a','b','c']))
    df_s = sqlContext.createDataFrame(df_p) # pandas dataframe转化成PySpark DataFrame
    type(df_s)
    
    机器学习

    关于机器学习,在以后的文章里再单独讨论。

    相关文章

      网友评论

      本文标题:PySpark SQL常用语法

      本文链接:https://www.haomeiwen.com/subject/wqejdftx.html