pyspark整理

作者: 26f30aca5431 | 来源:发表于2018-08-26 14:15 被阅读106次

    pyspark入门资料

    公众号回复:pyspark (会有pyspark资料大礼包:Learning PySpark.pdf,PySpark_SQL_Cheat_Sheet_Python.pdf
    如果你有python基础,只是工程上需要spark提升计算效率,那么pyspark是你最好的工具。经过一系列踩坑,发现很多pyspark大多数的坑是pyspark中的DataFrame和我们熟悉的pandas中的DataFrame有很多api不同,由于存储方式不同(分布式VS单机),很多操作也需要更新我们的观念,最基本的查看DataFrame的前10行,表达也确实不同,不过有了python的基础,学习曲线会十分友好。

    工作中发现,先了解下官方api比自己遇到问题再去查效率要更高,这里给大家推荐一些最近找到的pyspark学习资源。
    查找pyspark的函数接口

    pyspark Search

    pyspark sql 的函数接口

    pyspark.sql module api

    • DataFrame
    • SparkSession

    一些优秀的博客推荐

    Spark-SQL之DataFrame操作大全
    pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
    Spark与Pandas中DataFrame的详细对比----DataFrame操作的重要文档
    Spark Python API 官方文档中文版讲解
    pyspark之数据处理学习--很多代码值得学习

    设置spark

    这一部分是当性能上遇到问题,需要对底层spark参数进行配置时可以了解的资源。

    - spark.conf.set("spark.sql.shuffle.partitions", 6)
    spark.conf.set("spark.executor.memory", "2g")
    //get all settings
    val configMap:Map[String, String] = spark.conf.getAll()
    

    cheatsheet

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(“python spark sql basic example").enableHiveSupport().getOrCreate()
    
    #get dataframe from hive table
    Df = spark.sql(“select * from ft_dev.ybr_b limit 10”)
    Df.show()
    Df.select(df[‘jdpin’],df[‘user_aging’]/12).show()
    Df = df.withColumn(‘user_aging’,df[‘user_aging’].cast(FloatType())
    Df = df.withColumnRenamed(‘user_aging’,’keling’)
    #there are very rich apis for data process,like filter,sort and so on . For more information,see https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html
    
    image.png

    一个完整例子

    Load data
    births = spark.read.csv('births_transformed.csv.gz', header=True, 
    schema=schema) 
    
    Creating transformers
    import pyspark.ml.feature as ft 
    births = births .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] 
    .cast(typ.IntegerType())) 
    
    encoder=ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT',outputCol='BIRTH_PLACE_VEC') 
    
    featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col 
    in labels[2:]] + [encoder.getOutputCol()], outputCol='features' )
    
    Creating an estimator
    import pyspark.ml.classification as cl 
    
    logistic = cl.LogisticRegression( maxIter=100, regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')
    
    Creating a pipeline
    from pyspark.ml import Pipeline 
    
    pipeline = Pipeline(stages=[encoder, featuresCreator, logistic ])  
    
    
    Fitting the model
    births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)
    
    model = pipeline.fit(births_train) 
    test_model = model.transform(births_test) 
    
    Evaluate the model
    import pyspark.ml.evaluation as ev 
    
    evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')  
    
    print(evaluator.evaluate(test_model, 
    {evaluator.metricName: 'areaUnderROC'})) 
    print(evaluator.evaluate(test_model, 
    {evaluator.metricName: 'areaUnderPR'})) 
    
    Saving and load  model
    pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline' 
    pipeline.write().overwrite().save(pipelinePath) 
    
    loadedPipeline = Pipeline.load(pipelinePath) 
    Result = loadedPipeline.fit(births_train).transform(births_test)
    

    运行方式
    Shell

    Easy to explore,debug and test

    Spark-submit

    spark-submit etl.py --executor-memory 4g --executor-num 20 --total-executor-cores 80 --master yarn

    pyspark问题解决

    • 在pyspark对稍微大一点的spatk.dataframe进行toPandas转化时就会报错。
    • java.lang.OutOfMemoryError in pyspark
    • stackoverflow 对toPandas内存溢出的处理方法
    • Spark java.lang.OutOfMemoryError: Java heap space
    • lit:因为采用 $ 来包裹一个常量,会让Spark错以为这是一个Column。这时,需要定义在org.apache.spark.sql.functions中的 lit 函数来帮助.即lit(var)是告诉spark传入的var是常量。

    查看spark资源

    yarn applications list | grep tangyaping
    

    相关文章

      网友评论

        本文标题:pyspark整理

        本文链接:https://www.haomeiwen.com/subject/zwroiftx.html