美文网首页dibo大数据
pyspark与机器学习

pyspark与机器学习

作者: 巴拉巴拉_9515 | 来源:发表于2019-12-12 14:05 被阅读0次

    借助于spark的分布式特性,机器学习与spark的结合可以解决数据规模大、复杂运算时间久的问题。
    spark提供MLlib组件用于满足机器学习的需求。
    本文将从机器学习数据读取、数据操作、特征处理、模型训练、结果评估、模型保存六个方面展开。

    一、基础操作

    1、sparksession

    (1) 创建SparkSession

    SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能[1]
    任何Spark程序的第一步都是先创建SparkSession。

    From pyspark.sql import SparkSession
    spark=SparkSession.builder.appName('data_processing').getOrCreate()
    
    (2) SparkSession与SparkContext[1]

    在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context(例如对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext)。
    但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext

    至于图中的RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

    这些概念在编码过程中偶尔会出现,对于spark分布式运行的详细架构本文就不深入了,只需对SparkSession、SparkContext有个大致的了解。

    2、数据加载

    (1) 本地数据读取

    使用spark方式读取本地csv。

    df = spark.read.csv('XXX.csv',inferSchema=True,header=True)
    

    使用pandas方式读取本地csv,转换pandas dataframe为spark dataframe。

    import pandas as pd
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    pandas_df = pd.read_csv('/home/logsaas/pyspark/lalafile/movie_ratings_df.csv')
    sc = SparkContext()
    sqlContest = SQLContext(sc)
    df = sqlContest.createDataFrame(pandas_df)
    
    (2) 数据库读取
    # hive数据库读取
    spark.sql('select * from XX')
    
    (3) Spark_DataFrame与Pandas_DataFrame区别[2]

    Spark中DataFrame与Pandas中DataFrame的主要区别是工作方式的不同。

    区别 pandas spark
    工作方式 单机模式,没有并行机制,不支持Hadoop,处理大量数据有瓶颈 分布式并行计算框架,所有的数据和操作自动并行分布在各个集群结点上,支持Hadoop,能处理大量数据
    DataFrame可变性 可变 Spark中RDDs是不可变的,因此DataFrame也是不可变的
    相互转换 从spark_df转换:pandas_df = spark_df.toPandas() 从pandas_df转换:spark_df = SQLContext.createDataFrame(pandas_df)

    3、数据操作

    由于spark dataframe和python dataframe的区别,随之而来的操作差别也比较大。

    操作 pandas spark
    取列信息 df[“name”] df.select(“name”)
    取满足条件的信息 df[df[‘age’]>21] df.filter(df[‘age’]>21)
    groupby df.groupby(“A”).avg(“B”) from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
    data合并 append/concat等 df.join()
    去重统计 len(set(df['title'])) df.select('title').distinct().count()
    新增一列 df['newcol']=lists df.withColumn('newcol',lists)
    其他 ··· ···

    常见的spark dataframe操作如下:

    # **基础描述**
    spark_df.count()  # 行数
    spark_df.columns  # 列名称
    spark_df.printSchema()  # 结构及属性显示
    spark_df.show(5,False)  # truncate=False表示不压缩显示
    spark_df.describe().show()  # 均值/最值等描述
    
    # **dataframe操作**
    # 取'age','mobile'两列
    spark_df.select('age','mobile').show(5) 
    # 新增一列:age_after_10_yrs
    spark_df.withColumn("age_after_10_yrs",(spark_df["age"]+10)).show(10,False)
    # 新建一列age_double,将age转换为double属性
    spark_df.withColumn('age_double',spark_df['age'].cast(DoubleType())).show(10,False)
    # 筛选mobile==Vivo的信息
    spark_df.filter(spark_df['mobile']=='Vivo').show()
    spark_df.filter(spark_df['mobile']=='Vivo').select('age','ratings','mobile').show()
    spark_df.filter(spark_df['mobile']=='Vivo').filter(spark_df['experience'] >10).show()
    # 去重统计
    spark_df.select('mobile').distinct().show() 
    # 行去重
    spark_df=spark_df.dropDuplicates()
    # 删除列
    df_new=spark_df.drop('mobile')
    # groupby操作
    spark_df.groupBy('mobile').count().show(5,False)
    spark_df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
    spark_df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)  # 根据mobile分区,计算experience的sum
    # udf 自建sql函数
    from pyspark.sql.functions import udf
    def price_range(brand):
        if brand in ['Samsung','Apple']:
            return 'High Price'
        elif brand =='MI':
            return 'Mid Price'
        else:
            return 'Low Price'
    brand_udf=udf(price_range,StringType())  # create udf using python function # 输出为string格式
    # 新建一列price_range
    spark_df.withColumn('price_range',brand_udf(spark_df['mobile'])).show(10,False)
    # 使用lamba创建udf
    age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())  # using lambda function
    # 新建一列age_group
    spark_df.withColumn("age_group", age_udf(spark_df.age)).show(10,False)
    

    在spark中很少会用for循环去处理一个个特征,一般使用函数/自建UDF,批量处理掉了。

    比如计算Review列每个数据的长度。
    python模式
    review_length = []
    for info in text_df['Review']:
    ···· review_length.apend(length(info))
    text_df['length'] = review_length
    pyspark模式
    from pyspark.sql.functions import length text_df=text_df.withColumn('length',length(text_df['Review']))

    3、特征处理

    以下为pyspark.ml.feature提供的特征处理功能,满足了大部分机器学习的特征处理需求。

    标准化与归一化

    函数 备注
    NGram 正则标准化,不需要fit 直接transform
    Normalizer 归一化函数,使它的范数或者数值范围在一定的范围内
    MaxAbsScaler 归一化函数,将列标准化到[0,1]之间,每一个值都除以本列的绝对值最大的数,先fit然后 transform()
    MinMaxScaler 最大最小归一化,先fit然后 transform()
    StandardScaler 对列进行标准化,先fit再transform

    分箱处理

    函数 备注
    Binarizer 将数值型特征的二值化。将数据框中的某一列按照阈值划分为只包含0,1的列
    Bucketizer 将连续特征按照splits值进行分箱
    QuantileDiscretizer 将连续列进行分箱操作,numBuckets 表示分箱数目

    文本特征处理

    函数 备注
    CountVectorizer 只考虑词汇在文本中出现的频率
    HashingTF 自然语言处理的场景中,hashingTF使用的比较多
    IDF TF-IDF 作为特征提取
    StringIndexer 针对单个类别型特征进行转换。把字符串的列按照出现频率进行排序,出现次数最高的对应的Index为0,依次下排
    IndexToString 索引转字符串
    Tokenizer 将字符串列转换成小写并按空格切分
    RegexTokenizer 基于正则的方式进行文档切分成单词组
    Word2Vec 将words转换成一个vectorSize维的向量
    OneHotEncoderEstimator 独热编码相关
    RFormula 文本类特征处理,先 StringIndexer 再 OneHotEncoderEstimator

    特征操作

    函数 备注
    Imputer 缺失值填补,默认使用均值或中值(“median”)填补,要计算均值所以要先 fit(),然后再transfrom()
    StopWordsRemover 英文停用词移除
    SQLTransformer 使用SQL语句创建新的列,直接transform()
    BucketedRandomProjectionLSH 基于欧几里德距离的空间度量
    MinHashLSH 基于Jaccard距离的空间度量
    ElementwiseProduct 计算inputCol与scaling内积,不需要训练,直接transform
    PolynomialExpansion 特征变换,将特征拓展比如[x,y],如果degree=2,则拓展成[x,xy,y,xx,y*y],所以直接transform即可
    OneHotEncoder 独热编码,特征用一个二进制数字来表示。例如[0, 0, 3]处理后为[ 1., 0., 0., 1., 0., 0., 0., 0., 1.]
    ChiSqSelector 依据卡方检验,计算类别特征与分类标签的关联性。该函数只有先训练才能知道挑选哪些特征值,所以要先fit,应用的时候再transform

    vector 组合

    函数 备注
    VectorIndexer 数据集中的类别特征转换。可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引
    VectorSizeHint 允许用户显式指定列的向量大小
    VectorAssembler 用于将多个列合并为一个向量列,直接transform即可,经常用的
    VectorSlicer 通过对这些索引的值进行筛选得到新的向量集

    压缩降维

    函数 备注
    PCA 对特征进行PCA降维,先fit然后 transform()
    DCT 离散余弦变换(Discrete Cosine Transform),用于将数据或图像的压缩
    FeatureHasher 特征哈希,相当于一种降维技巧
    # 正则标准化,不需要fit 直接transform
    ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
    ngram.transform(df)
    
    # 归一化函数,使它的范数或者数值范围在一定的范围内
    normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
    normalizer.transform(df).head().features
    
    # 归一化函数,将列标准化到[0,1]之间
    maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
    model = maScaler.fit(df)
    model.transform(df).show()
    
    # 最大最小归一化,先fit然后 transform()
    mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
    model = mmScaler.fit(df)
    model.transform(df)
    
    # 对列进行标准化,先fit再transform
    standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
    model = standardScaler.fit(df)
    model.transform(df)
    
    # 按阈值1划分values,结果输出到features,结果为0/1
    binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")
    new_df = binarizer.transform(df)
    
    # 将连续特征按照splits值进行分箱
    bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],inputCol="values", outputCol="buckets")
    bucketed = bucketizer.setHandleInvalid("keep").transform(df)
    
    # 将连续列进行分箱操作,numBuckets 表示分箱数目
    qds = QuantileDiscretizer(numBuckets=2,inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
    bucketizer = qds.fit(df)
    qds.setHandleInvalid("keep").fit(df).transform(df)
    
    # 只考虑词汇在文本中出现的频率
    cv = CountVectorizer(inputCol="raw", outputCol="vectors")
    model = cv.fit(df)  # model.vocabulary
    new_df = model.transform(df)
    
    # 词频统计
    hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
    hashingTF.transform(df)
    
    # 文本tf-idf计算
    idf = IDF(minDocFreq=3, inputCol="tf", outputCol="idf")
    model = idf.fit(df)  # model.idf
    model.transform(df)
    
    # 针对单个类别型特征进行转换,把字符串的列按照出现频率进行排序
    stringIndexer = StringIndexer(inputCol="label",outputCol="indexed", handleInvalid="error",stringOrderType="frequencyDesc")
    model = stringIndexer.fit(stringIndDf)
    td = model.transform(stringIndDf)
    
    # 将字符串列转换成小写并按空格切分
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    tokenizer.transform(df)
    
    # 基于正则的方式进行文档切分成单词组
    reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
    reTokenizer.transform(df)
    
    # Word2Vec
    word2Vec = Word2Vec(vectorSize=5, seed=42,inputCol="sentence", outputCol="model")
    model = word2Vec.fit(doc)
    model.transform(doc)
    
    # OneHotEncoderEstimator
    ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
    model = ohe.fit(df)
    model.transform(df)
    
    # RFormula 文本类特征处理
    rf = RFormula(formula="y ~ x + s")
    model = rf.fit(df)
    model.transform(df)
    
    # 缺失值填补,要计算均值所以要先 fit(),然后再transfrom()
    imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
    model = imputer.fit(df)
    model.transform(df).show()
    
    # 移除停顿词
    remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
    remover.transform(df)
    
    # 使用SQL语句创建新的列
    sqlTrans = SQLTransformer(statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
    sqlTrans.transform(df)
    
    # 欧几里德距离的空间度量
    brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
    model = brp.fit(df)
    new_df = model.transform(df)
    
    # 基于Jaccard距离的空间度量
    mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
    model = mh.fit(df)
    model.transform(df)
    
    # 计算inputCol与scaling内积,不需要训练,直接transform
    ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]), inputCol="values", outputCol="eprod")
    new_df = ep.transform(df)
    
    # PolynomialExpansion特征变换
    px = PolynomialExpansion(degree=2, inputCol="dense", outputCol="expanded")
    px.transform(df)
    
    # 独热编码
    encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
    encoder.transform(td)
    
    # 依据卡方检验特征处理
    selector = ChiSqSelector(numTopFeatures=1,outputCol="selectedFeatures")
    model = selector.fit(df)
    new_df = model.transform(df)
    
    # 数据集中的类别特征转换
    indexer = VectorIndexer(maxCategories=2, inputCol="a", outputCol="indexed")
    model = indexer.fit(df)
    model.transform(df)
    
    # 允许用户显式指定列的向量大小
    sizeHint = VectorSizeHint(inputCol="vector", size=3, handleInvalid="skip")  # 先指定大小
    vecAssembler = VectorAssembler(inputCols=["vector", "float"], outputCol="assembled")  # 整合成一列
    pipeline = Pipeline(stages=[sizeHint, vecAssembler])
    pipelineModel = pipeline.fit(df)  # 功能模块合并
    pipelineModel.transform(df)
    
    # 用于将多个列合并为一个向量列,直接transform即可,经常用的
    vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
    vecAssembler.transform(df)
    
    # 通过对这些索引的值进行筛选得到新的向量集
    vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
    vs.transform(df).head().sliced
    
    # PCA
    pca = PCA(k=2, inputCol="features", outputCol="pca_features")
    model = pca.fit(df)
    model.transform(df)
    
    # 离散余弦变换压缩数据
    dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec")
    df2 = dct.transform(df1)
    
    # 特征哈希,相当于一种降维技巧
    hasher = FeatureHasher(inputCols=cols, outputCol="features")
    hasher.transform(df).head().features
    

    4、模型训练

    分类模块

    model 备注
    LinearSVC 线性分类支持向量机
    LogisticRegression 逻辑回归
    DecisionTreeClassifier 决策树分类
    GBTClassifier GBDT梯度提升决策树
    RandomForestClassifier 随机森林
    NaiveBayes 朴素贝叶斯
    MultilayerPerceptronClassifier 多层感知机分类器
    OneVsRest 将多分类问题简化为二分类问题
    # 线性分类支持向量机 
    svm = LinearSVC(maxIter=5, regParam=0.01)  # maxIter最大迭代次数5次,regParam正则化参数
    model = svm.fit(df)
    # 逻辑回归
    blor = LogisticRegression(regParam=0.01, weightCol="weight") 
    blorModel = blor.fit(bdf)
    # 决策树分类
    dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")  # 限定决策树的最大可能深度为2
    model = dt.fit(td)
    # GBDT梯度提升决策树
    gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
    model = gbt.fit(td)
    # 随机森林
    rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
    model = rf.fit(td)
    # 朴素贝叶斯
    nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
    model = nb.fit(df)
    # 多层感知机
    mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[2, 2, 2], blockSize=1, seed=123)
    model = mlp.fit(df)
    # OneVsRest
    lr = LogisticRegression(regParam=0.01)
    ovr = OneVsRest(classifier=lr)
    model = ovr.fit(df)
    

    聚类模块

    model 备注
    BisectingKMeans 二分类KMeans
    KMeans k均值聚类算法
    GaussianMixture 高斯混合模型
    LDA LDA主题聚类
    PowerIterationClustering 幂迭代聚类
    # 二分类KMeans
    bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
    model = bkm.fit(df)
    transformed = model.transform(df)
    # k均值聚类算法
    kmeans = KMeans(k=2, seed=1)
    model = kmeans.fit(df)
    transformed = model.transform(df)
    # 高斯混合模型
    gm = GaussianMixture(k=3, tol=0.0001,maxIter=10, seed=10)
    model = gm.fit(df)
    transformed = model.transform(df)
    # LDA主题聚类
    lda = LDA(k=2, seed=1, optimizer="em")
    model = lda.fit(df)
    # 幂迭代聚类
    pic = PowerIterationClustering(k=2, maxIter=40,weightCol="weight")
    assignments = pic.assignClusters(df)
    

    回归模块

    model 备注
    AFTSurvivalRegression 生存分析的对数线性模型
    DecisionTreeRegressor 决策树回归模型
    GBTRegressor 全称梯度下降树回归模型
    IsotonicRegression 保序回归
    # 生存分析的对数线性模型
    aftsr = AFTSurvivalRegression()
    model = aftsr.fit(df)
    # 决策树回归模型
    dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
    model = dt.fit(df)
    # 全称梯度下降树回归模型
    gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
    model = gbt.fit(df)
    # 保序回归
    ir = IsotonicRegression()
    model = ir.fit(df)
    

    推荐模块

    model 备注
    ALS (Alternatingleastsquares。交替最小二乘法
    # 推荐系统
    als = ALS(rank=10, maxIter=5, seed=0)
    model = als.fit(df)
    

    5、结果评估

    model 备注
    BinaryClassificationEvaluator 二分类评估
    RegressionEvaluator 回归评估
    MulticlassClassificationEvaluator 多分类评估
    ClusteringEvaluator 聚类评估
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw")
    evaluator.evaluate(dataset)
    
    evaluator = RegressionEvaluator(predictionCol="raw")
    evaluator.evaluate(dataset)
    
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    evaluator.evaluate(dataset)
    
    evaluator = ClusteringEvaluator(predictionCol="prediction")
    evaluator.evaluate(dataset)
    

    6、模型保存

    直接使用save保存模型,使用load加载训练结果。

    # rf_classifier为RandomForestClassificationModel训练完的模型
    rf_classifier.save("xxx/RF_model")
    
    # 模型调用
    rf=RandomForestClassificationModel.load("xxx/RF_model")
    model_preditions=rf.transform(test_df)
    

    二、运行案例

    对上面的各个过程的方法进行组装,以随机森林代码为例:

    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import VectorAssembler  # 特征处理
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.classification import RandomForestClassificationModel
    
    
    spark=SparkSession.builder.appName('random_forest').getOrCreate()
    
    
    # 数据读入
    pandas_df = pd.read_csv('xxx/affairs.csv')
    sc = SparkContext()  # 初始化;SparkContext则是客户端的核心
    sqlContest = SQLContext(sc)  # SQLContext是Spark SQL进行结构化数据处理的入口
    df = sqlContest.createDataFrame(pandas_df)  # pandas df 转换为 spark df 格式
    
    
    # 数据属性
    print((df.count(),len(df.columns)))
    df.printSchema()
    df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()
    df.groupBy('affairs').count().show()
    df.groupBy('rate_marriage').count().show()
    df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()
    df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()
    df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()
    df.groupBy('affairs').mean().show()
    
    
    # 特征处理
    df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features")  # 把特征组装成一个list
    df = df_assembler.transform(df)
    df.printSchema()
    df.show(5,truncate=False)
    
    
    # 数据集划分
    model_df=df.select(['features','affairs'])
    train_df,test_df=model_df.randomSplit([0.75,0.25])
    train_df.count()
    train_df.groupBy('affairs').count().show()
    test_df.groupBy('affairs').count().show()
    
    
    # 模型构建
    rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)
    rf_predictions=rf_classifier.transform(test_df)
    rf_predictions.show()
    # 结果查看
    rf_classifier.featureImportances  # 各个特征的权重
    
    
    # 模型效果
    rf_predictions.groupBy('prediction').count().show()
    rf_predictions.select(['probability','affairs','prediction']).show(10,False)
    # 多分类模型——准确率
    rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)
    print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))
    print(rf_accuracy)
    # 多分类模型——精确率
    rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)
    print('The precision rate on test data is {0:.0%}'.format(rf_precision))
    # AUC
    rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)
    print(rf_auc)
    
    
    # 模型保存
    rf_classifier.save("/home/logsaas/pyspark/lalafile/RF_model")
    
    # 模型调用
    rf=RandomForestClassificationModel.load("/home/logsaas/pyspark/lalafile/RF_model")
    model_preditions=rf.transform(test_df)
    model_preditions.show()
    

    参考资料

    [1] SparkSession与SparkContext:https://blog.csdn.net/qq_35495339/article/details/98119422
    [2] Spark中DataFrame与Pandas中DataFrame的区别:https://blog.csdn.net/u013129944/article/details/80019546
    [3] pyspark 官网解释:http://spark.apache.org/docs/latest/api/python/pyspark.ml.html
    [4] pyspark.ml.feature函数中文简介:https://blog.csdn.net/yw_vine/article/details/80117759
    [5] Spark特征工程:https://blog.csdn.net/u012050154/article/details/60766387
    [6] http://dblab.xmu.edu.cn/blog/1709-2/ 子雨
    [7] Machine Learning with PySpark by Pramod Singh
    [8] https://github.com/Apress/machine-learning-with-pyspark

    相关文章

      网友评论

        本文标题:pyspark与机器学习

        本文链接:https://www.haomeiwen.com/subject/squggctx.html