一 Getting start
1.通过sparksession 创建工作环境
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("a name") \
.config("config","value") \
.getOrcreate()
2.创建数据框 DataFrame
df = spark.read.json("/path/to/json_file")
#显示数据框
df.show()
3.DataFrame 操作
#显示df的数据表定义
df.printSchema()
#显示某一列
df.select("name").show()
#显示多列
df.select(df["name"], df["age"]+1).show()
#条件筛选 保留满足条件的记录
df.filter(df["age"] > 21).show()
#groupby + count
df.groupBy("name").count().show()
4.通过sql语句生成新表
#创建临时视图view
df.createOrReplaceTempView("people") #把df数据框转换成名为people的表
sqlDF = spark.sql("select * from people")
sqlDF.show()
5.通过RDD转换成数据集
(1)使用reflection推断schema
from pyspark.sql import Row
#产生RDD
sc = spark.sparkContext
lines = sc.textFile("path/to/infile")
parts = lines.map(lambda x:x.split(","))
people = parts.map(lambda x:Row(name=x[0],age=int(x[1])))
#转换成表
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
#查询
teens = spark.sql("select * from people where age >=13 and age <=19")
#teens.show()
teenNames = teens.rdd.map(lambda p:"Name:" + p.name).collect()
for name in teenNames:
print (name)
(2) 编程方式定义schema
from pyspark.sql.types import *
sc = spark.sparkContext
lines = sc.textFile("path/to/infile")
parts = lines.map(lambda x:x.split(","))
people = parts.map(lambda x:(x[0],x[1].strip()))
schemastring = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemastring.split(" ")]
schema = StructType(fields)
schemaPeople = spark.createDataFrame(people,schema)
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("select * from people")
results.show()
1.读写各个文件类型
#parquet文件类型
df = spark.read.load("path")
df.select("name","colNanme").write.save("fileName.parquet")
#csv
df = spark.read.load("path",format="csv",sep=":",inferSchema="true",header="true")
#orc
df = spark.read.orc("path")
df.write.format("orc") \
.option("orc.bloom.filter.columns","favorite_color") \
.option("orc.dictionary.key.threshold","1.0") \
.save("users_with_option.orc")
#直接在文件上执行sql
df = spark.sql("select * from parquet.`path`")
#存储为table 在hdfs上
df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")
(1)parquet file
peopleDF = spark.read.json("path/to/json")
peopleDF.write.parquet("people.parquet")
parquetFile = spark.read.parquet("people.parquet")
parquetFile.createOrReplaceTempView("parquetFile")
teens = spark.sql("select name from parquetFile where age >=13 and age <=19")
teens.show()
(2) json
peopleDF = spark.read.json("path")
peopleDF.createOrReplaceTempView("people")
teens = spark.sql("select * from people where age between 13 and 19")
teens.show()
jsonstring = ['{"name":"Yin","address":{"city":"columbus","state":"ohino"}}']
sc = spark.sparkContext
peopleRDD = sc.parallelize(jsonstrings)
otherpeople = spark.read.json(peopleRDD)
otherpeople.show()
(3) Hive Table
from pyspark.sql import SparkSession,Row
warehouse_location = abspath('spark-warehouse')
spark = SparkSession.builder \
.appName("a name") \
.config("spark.sql.warehouse.dir",warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
spark.sql("create table if not exists src (key int, value string) USING hive")
spark.sql("load data local inpath 'path/file' into table src")
spark.sql("select * from src").show()
sqlDF = spark.sql("select key,value from src where key < 10 order by value")
stringDs = sqlDF.rdd.map(lambda x:"key: %d, value:%s" % (x.key,x.value))
for record in stringDs.collect():
print (record)
record = Row("key","value")
recordDF = spark.createDataFrame([record(i,"val_"+str(i)) for i in range(1,101)])
recordDF.createOrReplaceTempView("records")
spark.sql("select * from records r join src s on r.key = s.key").show()
三 性能调优
1.将数据缓存到内存
spark.catalog.cacheTable("tablename")
dataFrame.cache()
2.配置信息
Image.png3.广播
用于表join 一般将小表进行广播
from pyspark.sql.functions import broadcast
broadcast(spark.table("src")).join(spark.table("records"),"key").show()
四 和pandas相互转换
import numpy as np
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled","true")
pdf = pd.DataFrame(np.random.rand(100,3))
df = spark.createDataFrame(pdf)
result_df = df.select("*").toPandas()
pandas 自定义函数 UDF
import pandas as pd
from pyspark.sql.functions import col,pandas_udf
from pyspark.sql.types import LongType
def multiply_func(a,b)
return a * b
multiply = pandas_udf(multiply_func,returnType=LongType())
x = pd.Series([1,2,3])
print (multiply_func(x,x))
df.select(multiply(col("x"),col("x"))).show()
Grouped Map
from pyspark.sql.functions import pandas_udf,PandasUDFType
df = spark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v"))
@pandas_udf("id long, v double",PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v = v - v.mean())
df.groupby("id").apply(substract_mean).show()
Grouped Aggregate
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql import Window
df = spark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],("id","v"))
def mean_udf(v):
return v.mean()
df.groupby("id").agg(mean_udf(df["v"])).show()
w = Window.partitionBy("id") \
.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("mean_v",mean_udf(df["v"]).over(w)).show()
网友评论