美文网首页
Spark SQL 初步使用

Spark SQL 初步使用

作者: 枫隐_5f5f | 来源:发表于2019-04-20 23:11 被阅读0次

一 Getting start

pyspark.sql 功能函数列表

1.通过sparksession 创建工作环境
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("a name") \
        .config("config","value") \
        .getOrcreate()

2.创建数据框 DataFrame
df = spark.read.json("/path/to/json_file")
#显示数据框
df.show()  
3.DataFrame 操作
#显示df的数据表定义
df.printSchema()

#显示某一列
df.select("name").show()

#显示多列 
df.select(df["name"], df["age"]+1).show()

#条件筛选   保留满足条件的记录
df.filter(df["age"] > 21).show()

#groupby + count
df.groupBy("name").count().show()

4.通过sql语句生成新表
#创建临时视图view
df.createOrReplaceTempView("people")   #把df数据框转换成名为people的表
sqlDF = spark.sql("select *  from people")
sqlDF.show()
5.通过RDD转换成数据集

(1)使用reflection推断schema

from pyspark.sql import Row

#产生RDD
sc = spark.sparkContext
lines = sc.textFile("path/to/infile")
parts = lines.map(lambda x:x.split(","))
people = parts.map(lambda x:Row(name=x[0],age=int(x[1])))

#转换成表
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

#查询
teens = spark.sql("select * from people where age >=13 and age <=19")
#teens.show()

teenNames = teens.rdd.map(lambda p:"Name:" + p.name).collect()
for name in teenNames:
    print (name)

(2) 编程方式定义schema

from pyspark.sql.types import *

sc = spark.sparkContext
lines = sc.textFile("path/to/infile")
parts = lines.map(lambda x:x.split(","))
people = parts.map(lambda x:(x[0],x[1].strip()))

schemastring = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemastring.split(" ")]
schema = StructType(fields)

schemaPeople = spark.createDataFrame(people,schema)
schemaPeople.createOrReplaceTempView("people")

results = spark.sql("select * from people")
results.show()


1.读写各个文件类型
#parquet文件类型
df = spark.read.load("path")
df.select("name","colNanme").write.save("fileName.parquet")

#csv
df = spark.read.load("path",format="csv",sep=":",inferSchema="true",header="true")

#orc
df = spark.read.orc("path")
df.write.format("orc") \
    .option("orc.bloom.filter.columns","favorite_color") \
    .option("orc.dictionary.key.threshold","1.0") \
    .save("users_with_option.orc")

#直接在文件上执行sql
df = spark.sql("select * from parquet.`path`")

#存储为table  在hdfs上
df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")

(1)parquet file

peopleDF = spark.read.json("path/to/json")
peopleDF.write.parquet("people.parquet")

parquetFile = spark.read.parquet("people.parquet")
parquetFile.createOrReplaceTempView("parquetFile")
teens = spark.sql("select name from parquetFile where age >=13 and age <=19")
teens.show()

(2) json

peopleDF = spark.read.json("path")
peopleDF.createOrReplaceTempView("people")
teens = spark.sql("select * from people where age between 13 and 19")
teens.show()

jsonstring = ['{"name":"Yin","address":{"city":"columbus","state":"ohino"}}']
sc = spark.sparkContext
peopleRDD = sc.parallelize(jsonstrings)
otherpeople = spark.read.json(peopleRDD)
otherpeople.show()

(3) Hive Table

from pyspark.sql import SparkSession,Row

warehouse_location = abspath('spark-warehouse')
spark = SparkSession.builder \
        .appName("a name") \
        .config("spark.sql.warehouse.dir",warehouse_location) \
        .enableHiveSupport() \
        .getOrCreate()

spark.sql("create table if not exists src (key int, value string) USING hive")
spark.sql("load data local inpath 'path/file' into table src")
spark.sql("select * from src").show()
sqlDF = spark.sql("select key,value from src where key < 10 order by value")


stringDs = sqlDF.rdd.map(lambda x:"key: %d, value:%s" % (x.key,x.value))
for record in stringDs.collect():
    print (record)



record = Row("key","value")
recordDF = spark.createDataFrame([record(i,"val_"+str(i)) for i in range(1,101)])
recordDF.createOrReplaceTempView("records")

spark.sql("select * from records r join src s on r.key = s.key").show()


三 性能调优

1.将数据缓存到内存
  spark.catalog.cacheTable("tablename")
  dataFrame.cache()
2.配置信息
Image.png
3.广播

用于表join 一般将小表进行广播

from pyspark.sql.functions import broadcast
broadcast(spark.table("src")).join(spark.table("records"),"key").show()


四 和pandas相互转换

import numpy as np
import pandas as pd

spark.conf.set("spark.sql.execution.arrow.enabled","true")
pdf = pd.DataFrame(np.random.rand(100,3))
df = spark.createDataFrame(pdf)
result_df = df.select("*").toPandas()

pandas 自定义函数 UDF

import pandas as pd
from pyspark.sql.functions import col,pandas_udf
from pyspark.sql.types import LongType

def multiply_func(a,b)
    return a * b

multiply = pandas_udf(multiply_func,returnType=LongType())
x = pd.Series([1,2,3])
print (multiply_func(x,x))

df.select(multiply(col("x"),col("x"))).show()

Grouped Map

from pyspark.sql.functions import pandas_udf,PandasUDFType

df = spark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v"))

@pandas_udf("id long, v double",PandasUDFType.GROUPED_MAP)

def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v = v - v.mean())

df.groupby("id").apply(substract_mean).show()

Grouped Aggregate

from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql import Window

df = spark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v"))
def mean_udf(v):
    return v.mean()

df.groupby("id").agg(mean_udf(df["v"])).show()

w = Window.partitionBy("id") \
        .rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

df.withColumn("mean_v",mean_udf(df["v"]).over(w)).show()

相关文章

网友评论

      本文标题:Spark SQL 初步使用

      本文链接:https://www.haomeiwen.com/subject/xdprgqtx.html