美文网首页
Spark SQL 初步使用

Spark SQL 初步使用

作者: 枫隐_5f5f | 来源:发表于2019-04-20 23:11 被阅读0次

    一 Getting start

    pyspark.sql 功能函数列表

    1.通过sparksession 创建工作环境
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
            .appName("a name") \
            .config("config","value") \
            .getOrcreate()
    
    
    2.创建数据框 DataFrame
    df = spark.read.json("/path/to/json_file")
    #显示数据框
    df.show()  
    
    3.DataFrame 操作
    #显示df的数据表定义
    df.printSchema()
    
    #显示某一列
    df.select("name").show()
    
    #显示多列 
    df.select(df["name"], df["age"]+1).show()
    
    #条件筛选   保留满足条件的记录
    df.filter(df["age"] > 21).show()
    
    #groupby + count
    df.groupBy("name").count().show()
    
    
    4.通过sql语句生成新表
    #创建临时视图view
    df.createOrReplaceTempView("people")   #把df数据框转换成名为people的表
    sqlDF = spark.sql("select *  from people")
    sqlDF.show()
    
    5.通过RDD转换成数据集

    (1)使用reflection推断schema

    from pyspark.sql import Row
    
    #产生RDD
    sc = spark.sparkContext
    lines = sc.textFile("path/to/infile")
    parts = lines.map(lambda x:x.split(","))
    people = parts.map(lambda x:Row(name=x[0],age=int(x[1])))
    
    #转换成表
    schemaPeople = spark.createDataFrame(people)
    schemaPeople.createOrReplaceTempView("people")
    
    #查询
    teens = spark.sql("select * from people where age >=13 and age <=19")
    #teens.show()
    
    teenNames = teens.rdd.map(lambda p:"Name:" + p.name).collect()
    for name in teenNames:
        print (name)
    
    

    (2) 编程方式定义schema

    from pyspark.sql.types import *
    
    sc = spark.sparkContext
    lines = sc.textFile("path/to/infile")
    parts = lines.map(lambda x:x.split(","))
    people = parts.map(lambda x:(x[0],x[1].strip()))
    
    schemastring = "name age"
    fields = [StructField(field_name,StringType(),True) for field_name in schemastring.split(" ")]
    schema = StructType(fields)
    
    schemaPeople = spark.createDataFrame(people,schema)
    schemaPeople.createOrReplaceTempView("people")
    
    results = spark.sql("select * from people")
    results.show()
    
    

    1.读写各个文件类型
    #parquet文件类型
    df = spark.read.load("path")
    df.select("name","colNanme").write.save("fileName.parquet")
    
    #csv
    df = spark.read.load("path",format="csv",sep=":",inferSchema="true",header="true")
    
    #orc
    df = spark.read.orc("path")
    df.write.format("orc") \
        .option("orc.bloom.filter.columns","favorite_color") \
        .option("orc.dictionary.key.threshold","1.0") \
        .save("users_with_option.orc")
    
    #直接在文件上执行sql
    df = spark.sql("select * from parquet.`path`")
    
    #存储为table  在hdfs上
    df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")
    

    (1)parquet file

    peopleDF = spark.read.json("path/to/json")
    peopleDF.write.parquet("people.parquet")
    
    parquetFile = spark.read.parquet("people.parquet")
    parquetFile.createOrReplaceTempView("parquetFile")
    teens = spark.sql("select name from parquetFile where age >=13 and age <=19")
    teens.show()
    
    

    (2) json

    peopleDF = spark.read.json("path")
    peopleDF.createOrReplaceTempView("people")
    teens = spark.sql("select * from people where age between 13 and 19")
    teens.show()
    
    jsonstring = ['{"name":"Yin","address":{"city":"columbus","state":"ohino"}}']
    sc = spark.sparkContext
    peopleRDD = sc.parallelize(jsonstrings)
    otherpeople = spark.read.json(peopleRDD)
    otherpeople.show()
    

    (3) Hive Table

    from pyspark.sql import SparkSession,Row
    
    warehouse_location = abspath('spark-warehouse')
    spark = SparkSession.builder \
            .appName("a name") \
            .config("spark.sql.warehouse.dir",warehouse_location) \
            .enableHiveSupport() \
            .getOrCreate()
    
    spark.sql("create table if not exists src (key int, value string) USING hive")
    spark.sql("load data local inpath 'path/file' into table src")
    spark.sql("select * from src").show()
    sqlDF = spark.sql("select key,value from src where key < 10 order by value")
    
    
    stringDs = sqlDF.rdd.map(lambda x:"key: %d, value:%s" % (x.key,x.value))
    for record in stringDs.collect():
        print (record)
    
    
    
    record = Row("key","value")
    recordDF = spark.createDataFrame([record(i,"val_"+str(i)) for i in range(1,101)])
    recordDF.createOrReplaceTempView("records")
    
    spark.sql("select * from records r join src s on r.key = s.key").show()
    
    

    三 性能调优

    1.将数据缓存到内存
      spark.catalog.cacheTable("tablename")
      dataFrame.cache()
    
    2.配置信息
    Image.png
    3.广播

    用于表join 一般将小表进行广播

    from pyspark.sql.functions import broadcast
    broadcast(spark.table("src")).join(spark.table("records"),"key").show()
    
    

    四 和pandas相互转换

    import numpy as np
    import pandas as pd
    
    spark.conf.set("spark.sql.execution.arrow.enabled","true")
    pdf = pd.DataFrame(np.random.rand(100,3))
    df = spark.createDataFrame(pdf)
    result_df = df.select("*").toPandas()
    
    

    pandas 自定义函数 UDF

    import pandas as pd
    from pyspark.sql.functions import col,pandas_udf
    from pyspark.sql.types import LongType
    
    def multiply_func(a,b)
        return a * b
    
    multiply = pandas_udf(multiply_func,returnType=LongType())
    x = pd.Series([1,2,3])
    print (multiply_func(x,x))
    
    df.select(multiply(col("x"),col("x"))).show()
    
    

    Grouped Map

    from pyspark.sql.functions import pandas_udf,PandasUDFType
    
    df = spark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v"))
    
    @pandas_udf("id long, v double",PandasUDFType.GROUPED_MAP)
    
    def subtract_mean(pdf):
        v = pdf.v
        return pdf.assign(v = v - v.mean())
    
    df.groupby("id").apply(substract_mean).show()
    
    

    Grouped Aggregate

    from pyspark.sql.functions import pandas_udf,PandasUDFType
    from pyspark.sql import Window
    
    df = spark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v"))
    def mean_udf(v):
        return v.mean()
    
    df.groupby("id").agg(mean_udf(df["v"])).show()
    
    w = Window.partitionBy("id") \
            .rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
    
    df.withColumn("mean_v",mean_udf(df["v"]).over(w)).show()
    
    

    相关文章

      网友评论

          本文标题:Spark SQL 初步使用

          本文链接:https://www.haomeiwen.com/subject/xdprgqtx.html