SparkSQL、RDD和DataFrame基本操作

作者: 路人乙yh | 来源:发表于2019-03-23 18:29 被阅读0次

    1 三者比较

    易操作程度 SparkSQL > DataFrame > RDD

    2 创建RDD、DataFrame和SparkSQL

    2.1 创建RDD

    rawUserData = sc.textFile("file:/home/badou/Downloads/u.user")
    rawUserData.count()`

    输出:943,说明有943条数据。

    userRDD = rawUserData.map(lambda line: line.split("|"))这是因为每条数据都是由|分隔的。

    2.2 创建DataFrame

    1. 创建sqlContext
      sqlContext = SparkSession.builder.getOrCreate()

    2. 定义schema

    user_Rows = userRDD.map(lambda p:
        Row(
            userid = int(p[0]),
            age = int(p[1]),
            gender = p[2],
            occupation = p[3],
            zipcode = p[4]
        )
    )
    user_Rows.take(5)
    

    out:[Row(age=24, gender=u'M', occupation=u'technician', userid=1, zipcode=u'85711'),]

    1. 创建DataFrame
      user_df = sqlContext.createDataFrame(user_Rows)
      user_df.printSchema() #打印出这个dataframe的schema

    2. 为DataFrame创建别名
      df = user_df.alias("df")
      df.show(3)

    |age|gender|occupation|userid|zipcode|
    | 24| M|technician| 1| 85711|
    | 53| F| other| 2| 94043|
    | 23| M| writer| 3| 32067|

    2.3 开始使用SparkSQL

    1. 登录临时表
      user_df.registerTempTable("user_table")
    2. 查看项数
      sqlContext.sql("select count(*)counts from user_table").show()
    3. 多行输入SQL语句
      sqlContext.sql("""sql语句""")
    4. 查看数据
      sqlContext.sql("select * from user_table").show()
    5. 使用limit
      不加limit会在hadoop集群上进行map-reduce计算,如果只要看前几行会运行很久。
      sqlContext.sql("select * from user_table limit 3").show()

    3 select显示部分字段

    3.1 使用RDD显示部分字段

    userRDDnew = userRDD.map(lambda x:(x[0],x[2]))
    userRDDnew.take(3)

    3.2 使用DataFrame显示部分字段

    user_df.select(user_df.userid,user_df.age).show()

    3.3 使用SparkSQL显示部分字段

    sqlContext.sql("select userid, age from user_table limit 5").show()

    4 增加计算字段

    4.1 使用RDD增加计算字段

    最后一个字段是计算出生年份
    userRDDnew = userRDD.map(lambda x:(x[0],x[3],x[2],x[1],2016-int(x[1])))
    userRDDnew.take(3)

    4.2 使用DataFrame增加计算字段

    user_df.select(user_df.userid,2016-user_df.age).show(5)
    增加计算字段并命名
    user_df.select(user_df.userid,(2016-user_df.age).alias("new column")).show()

    4.3 使用SparkSQL增加计算字段

    sqlContext.sql("select userid,2016-age new_column from user_table limit 5").show()

    5 筛选数据

    5.1 使用RDD筛选数据

    在RDD中使用 filter 方法筛选每一项数据,如筛选出所有的年龄为24,职业为technician的男性。
    userRDD.filter(lambda r:r[1]=='24' and r[2]=='M' and r[3]=='technician').take(3)

    5.2 使用DataFrame筛选数据

    1. 使用多个filter
      user_df.filter("occupation='technician'").filter("gender='M'").filter("age=24").show(3)
    2. 使用单个filter
      注意:使用 & 而不能用 and,使用 == 而不能用 = 。
      user_df.filter((user_df.occupation=='technician')&(user_df.age==24)&(user_df.gender=='M')).show(3)

    5.3 使用SparkSQL筛选数据

    sqlContext.sql("""select *
    from user_table
    where occupation='technician' and gender='M' and age=24""").show(3)

    6 按单个字段给数据排序

    6.1 使用RDD按单个字段排序

    使用takeOrdered(num,key=None)方法进行排序
    num :要显示的行数
    key :排序的字段
    userRDD.takeOrdered(5,key=lambda x:int(x[1])) #升序排列
    userRDD.takeOrdered(5,key=lambda x:-1*int(x[1])) #降序排列

    6.2 使用DataFrame按单个字段排序

    使用.orderBy("age")或者.orderBy(user_df.age),默认是升序。
    user_df.select("userid","occupation","gender","age").orderBy("age").show(3)
    使用.orderBy("age",ascending=0)降序排列
    user_df.select("userid","occupation","gender","age").orderBy("age",ascending=0).show(3)

    6.3 使用SparkSQL按单个字段排序

    使用SQL语句中的 order by 关键字加字段名指定要排序的字段,默认升序。
    sqlContext.sql("""select *
    from user_table
    order by age""").show(3)
    降序需要在SQL语句最后加上 DESC。
    sqlContext.sql("""select *
    from user_table
    order by age DESC""").show(3)

    7 按多个字段给数据排序

    7.1 使用RDD按多个字段排序

    相比于按单个字段排序,key 变为 lambda x:(-int(x[1]),x[2]),表示按x[1]降序排列,并以x[2]设置按性别升序排列
    userRDD.takeOrdered(5,key=lambda x:(-int(x[1]),x[2]))

    7.2 使用DataFrame按多个字段排序

    使用.orderBy(["age","gender"],ascending=[0,1]),表示按age和gender这两个字段排序,[0,1]分别表示age和gender按什么序排,0表示降序,1表示升序。
    user_df.orderBy(["age","gender"],ascending=[0,1]).show(5)

    7.3 使用SparkSQL按多个字段排序

    sqlContext.sql("""select userid,age,gender,occupation,zipcode
    from user_table
    order by age DESC,gender""").show(5)

    8 显示不重复数据

    8.1 使用RDD显示不重复数据

    1. 显示userRDD性别字段不重复的数据:
      userRDD.map(lambda x:x[2]).distinct().collect()
      .distinct()是筛选出不重复的数据,.collect()是转化为List。
    2. 显示性别+年龄不重复数据
      userRDD.map(lambda x:(x[1],x[2])).distinct().take(5)

    8.2 使用DataFrame显示不重复数据

    1. 显示性别字段不重复数据
      user_df.select("gender").distinct().show()
    2. 显示性别+年龄不重复数据
      user_df.select("gender","age").distinct().show(5)

    8.3 使用SparkSQL显示不重复数据

    1. 显示性别字段不重复数据
      sqlContext.sql("select distinct gender from user_table").show(5)
    2. 显示性别+年龄不重复数据
      sqlContext.sql("select distinct gender,age from user_table").show(5)

    9 分组统计数据

    9.1 使用RDD分组统计

    1. 按性别统计
      userRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y:x+y).collect()
      .map(lambda x:(x[2],1)) 会将每一个人的性别 M 转化为一个元组('M',1);
      .reduceByKey(lambda x,y:x+y) 将数据集按照性别总和来进行统计。
    2. 按性别、职业来统计
      userRDD.map(lambda x:((x[2],x[3]),1)).reduceByKey(lambda x,y:x+y).collect()

    9.2 使用DataFrame分组统计

    1. 按性别统计
      user_df.select("gender").groupBy("gender").count().orderBy("gender").show(10)
    2. 按性别、职业来统计
      user_df.select("gender","occupation").groupBy("gender","occupation").count().orderBy("gender","occupation").show(10)
    3. 以crosstab按照性别、职业统计数据
      user_df.stat.crosstab("occupation","gender").show(10)

    9.3 使用SparkSQL分组统计

    1. 按性别统计
      sqlContext.sql("""
      select gender,count(*) counts
      from user_table group by gender""").show()
    2. 按性别、职业来统计
      sqlContext.sql("""
      select gender,occupation,count(*) counts
      from user_table group by gender,occupation""").show()

    10 join连接数据

    10.1 下载数据表

    在上面的=得到的表中,最后一列zipcode是地理位置标识,如果想知道是哪个州就需要下载ZipCode数据表,进行联接。
    wget http://federalgovernmentzipcodes.us/free-zipcode-database-Primary.csv

    10.2 上传到hdfs目录

    hdfs dfs -copyFromLocal free-zipcode-database-Primary.csv /input

    10.3 读取rawDataRDD

    rawDataWithHeader = sc.textFile("hdfs://master:9000/input/free-zipcode-database-Primary.csv")
    rawDataWithHeader.take(2)

    10.4 去除表头

    header = rawDataWithHeader.first()
    rawData = rawDataWithHeader.filter(lambda x:x!=header)
    rawData.first()

    10.5 删除 " 符号

    rData = rawData.map(lambda x:x.replace(""",""))
    rData.first()

    10.6 获取每个字段

    ZipRDD = rData.map(lambda x:x.split(","))
    ZipRDD.first()

    10.7 创建zip_tab

    • 创建ZipCode Row的Schema
      from pyspark.sql import Row
      zipcode_data = ZipRDD.map(lambda p:
      Row(
      zipcode=int(p[0]),
      zipcodeType=p[1],
      city=p[2],
      state=p[3])
      )
      zipcode_data.take(5)

    • 创建DataFrame
      zipcode_df=sqlContext.createDataFrame(zipcode_data)
      zipcode_df.printSchema()

    • 创建临时登录表
      zipcode_df.registerTempTable("zipcode_table")
      zipcode_df.show(10)

    10.8 用SparkSQL联接两个表

    为了方便引用,在下面的代码中给这两个表创建了别名。
    将两个表联接并选出纽约州的用户数据。

    sqlContext.sql("""
    select u.*,z.city,z.state
    from user_table u
    left join zipcode_table z
    on u.zipcode = z.zipcode
    where z.state='NY'
    """).show(10)

    相关文章

      网友评论

        本文标题:SparkSQL、RDD和DataFrame基本操作

        本文链接:https://www.haomeiwen.com/subject/ajasvqtx.html