pyspark入门资料
公众号回复:pyspark (会有pyspark资料大礼包:Learning PySpark.pdf,PySpark_SQL_Cheat_Sheet_Python.pdf)
如果你有python基础,只是工程上需要spark提升计算效率,那么pyspark是你最好的工具。经过一系列踩坑,发现很多pyspark大多数的坑是pyspark中的DataFrame和我们熟悉的pandas中的DataFrame有很多api不同,由于存储方式不同(分布式VS单机),很多操作也需要更新我们的观念,最基本的查看DataFrame的前10行,表达也确实不同,不过有了python的基础,学习曲线会十分友好。
工作中发现,先了解下官方api比自己遇到问题再去查效率要更高,这里给大家推荐一些最近找到的pyspark学习资源。
查找pyspark的函数接口
pyspark sql 的函数接口
- DataFrame
- SparkSession
一些优秀的博客推荐
Spark-SQL之DataFrame操作大全
pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
Spark与Pandas中DataFrame的详细对比----DataFrame操作的重要文档
Spark Python API 官方文档中文版讲解
pyspark之数据处理学习--很多代码值得学习
设置spark
这一部分是当性能上遇到问题,需要对底层spark参数进行配置时可以了解的资源。
- spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")
//get all settings
val configMap:Map[String, String] = spark.conf.getAll()
cheatsheet
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(“python spark sql basic example").enableHiveSupport().getOrCreate()
#get dataframe from hive table
Df = spark.sql(“select * from ft_dev.ybr_b limit 10”)
Df.show()
Df.select(df[‘jdpin’],df[‘user_aging’]/12).show()
Df = df.withColumn(‘user_aging’,df[‘user_aging’].cast(FloatType())
Df = df.withColumnRenamed(‘user_aging’,’keling’)
#there are very rich apis for data process,like filter,sort and so on . For more information,see https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html
image.png
一个完整例子
Load data
births = spark.read.csv('births_transformed.csv.gz', header=True,
schema=schema)
Creating transformers
import pyspark.ml.feature as ft
births = births .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']
.cast(typ.IntegerType()))
encoder=ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT',outputCol='BIRTH_PLACE_VEC')
featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col
in labels[2:]] + [encoder.getOutputCol()], outputCol='features' )
Creating an estimator
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression( maxIter=100, regParam=0.01,
labelCol='INFANT_ALIVE_AT_REPORT')
Creating a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic ])
Fitting the model
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
Evaluate the model
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderPR'}))
Saving and load model
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
loadedPipeline = Pipeline.load(pipelinePath)
Result = loadedPipeline.fit(births_train).transform(births_test)
运行方式
Shell
Easy to explore,debug and test
Spark-submit
spark-submit etl.py --executor-memory 4g --executor-num 20 --total-executor-cores 80 --master yarn
pyspark问题解决
- 在pyspark对稍微大一点的spatk.dataframe进行toPandas转化时就会报错。
- java.lang.OutOfMemoryError in pyspark
- stackoverflow 对toPandas内存溢出的处理方法
- Spark java.lang.OutOfMemoryError: Java heap space
- lit:因为采用 $ 来包裹一个常量,会让Spark错以为这是一个Column。这时,需要定义在org.apache.spark.sql.functions中的 lit 函数来帮助.即lit(var)是告诉spark传入的var是常量。
查看spark资源
yarn applications list | grep tangyaping
网友评论