案例1

作者: 7125messi | 来源:发表于2018-01-12 09:43 被阅读17次
    #引入各种库
    import os
    import sys
    #添加Spark工作环境
    os.environ['SPARK_HOME']="/opt/spark-2.1.1-bin-hadoop2.7/"
    
    #添加pyspark库到Python工作环境目录中
    sys.path.append("/opt/spark-2.1.1-bin-hadoop2.7/bin/pyspark")
    
    
    
    #初始化spark/sc
    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    # from pyspark.sql.types import *
    spark = SparkSession.builder.appName("create").getOrCreate()
    sc = spark.sparkContext
    
    
    import numpy as np
    import pandas as pd
    
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.feature import VectorAssembler
    from pyspark.sql.functions import array, col, count, mean, sum, udf, when
    from pyspark.sql.types import DoubleType, IntegerType, StringType, Row
    from pyspark.sql.functions import sum, col, udf
    
    
    df = spark.read.option('header','true')\
              .option('inferSchema','true')\
              .csv('/home/ydzhao/Book/spark-nba-analytics-master/data/season_totals.csv')
    df.show(6)
    df.printSchema()
    df.describe()
    df.orderBy('pts',ascending=False).show(5)
    df.orderBy('pts',ascending=False).limit(10).show()
    df.orderBy('pts',ascending=False).limit(10).toPandas()
    df.orderBy('pts',ascending=False).limit(10).toPandas()[['yr','player','age','pts']]
    df.orderBy('pts',ascending=False).limit(10)[['yr','player','age','pts']].show()
    
    
    # groupBy
    fga_py = df.groupBy('yr')\
               .agg({'mp' : 'sum', 'fg3a' : 'sum', 'fga' : 'sum'})\
               .select(col('yr'), (36*col('sum(fga)')/col('sum(mp)')).alias('fga_pm'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_pm'))\
               .orderBy('yr',ascending=False)
    fga_py.show(6)
    
    
    # sql
    df.createOrReplaceTempView('df')
    fga_py = spark.sql("SELECT yr,sum(fg3a)/sum(mp)*36 as fg3a_pm FROM df GROUP BY yr ORDER BY yr desc")
    fga_py.show(6)
    
    
    ##############################################################   train the model
    t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
    training = t.transform(fga_py)\
                .withColumn('yr',fga_py.yr)\
                .withColumn('label',fga_py.fg3a_pm)
    training.show(10)
    training.toPandas()
    
    lr = LinearRegression(maxIter=10)
    model = lr.fit(training)
    model
    
    ############## 1.apply model for the 1979-80 season and  2020-21 season
    training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
    training_y = training.select('fg3a_pm').rdd.map(lambda x: x[0]).collect()
    
    testing_yrs = [2017, 2018, 2019, 2020, 2021]
    all_yrs = training_yrs + testing_yrs
    
    ############## 2.built testing DataFrame
    test_rdd = sc.parallelize(all_yrs)
    row = Row('yr')
    testing = t.transform(test_rdd.map(row).toDF())
    testing.show()
    
    ############## 3.apply linear regression model
    df_results = model.transform(testing)
    df_results.show()
    
    
    
    sc.stop()
    spark.stop()
    

    相关文章

      网友评论

        本文标题:案例1

        本文链接:https://www.haomeiwen.com/subject/gebxoxtx.html