Spark ML

作者: 枫隐_5f5f | 来源:发表于2019-04-27 10:33 被阅读0次

    MLLIB 包是基于RDD操作
    ML包是基于DataFrame操作的

    ML包主要分成三个部分:

    • 转换器
    • 评估器
    • 管道

    一 转换器

    将一个新列附加到dataframe上来转换数据
    当从转换器的抽象类派生时,每个新的转换器类需要实现 .transform 方法 该方法要求传递一个要被转换的DataFrame

    在spark.ml.feature 中提供了许多转换器

    • Binarizer 根据指定的阈值将连续变量转换为对应的二进制值
    • Bucketizer 与Binarizer类似,该方法根据阈值列表将连续变量转换成多项值
    • ChisqSelector 对于分类目标变量 允许选择预定义数量的特征 使用分为两步 fit(df) .transform()
    • CountVectorizer 标记文本
    • DCT 离散余弦变换取实数值向量 并返回相同长度的向量
    • ElementwiseProduct 该方法返回一个向量 其中的元素是传入该方法的向量和另一个传入参数scalingVec的向量的乘积
    • HashingTF 哈希转换器 输入为标记文本的列表 返回一个带有计数的有预定长度的向量
    • MaxAbsScaler 将数据调整到【-1,1】 范围内 中心不会移动
    • MinMaxScaler 将数据缩放到【0,1】 范围内
    • StandardScaler 标准化列 使其拥有零均值和等于1的标准差
    • NGram 输入为标记文本的列表 返回结果包含一系列n-gram 以两个词 三个词或更多词形成一个n-gram 例如输入【“good”,“morning”,“robin”,"williams"】 则会输出 【"good moring ", "moring robin", "robin williams"】
    • Normalizer 该方法使用p范数将数据缩放为单位范数 默认是L2
    • OneHotEncoder 独热编码
    • PCA 主成分进行数据降维
    • PolynomialExpansion 执行向量的多项式展开 例如输出【x,y,z】 输出则为 【x,xx,y,xy,yy,z,xz,yz,zz】
    • QuantileDiscretizer 与Bucketizer 方法类似 但不是传递分割参数 而是numBuckets 通过计算数据的近似分位数来决定分割点
    • RegexTokenizer 使用正则表达式的字符串分词器
    • RFormular R语言中的formular
    • SQLTransformer 可是转换SQL语句
    • StopWordsRemover 从标记文本中删除停用词
    • Tokenizer 将字符串转换成小写 按照空格分隔
    • VectorAssembler 将多个数字或向量列合并成一列向量
    • word2vec 将一个句子作为输入 并将其转换为{string ,vector} 格式的映射

    二 评估器

      评估器就是模型   对观测对象进行预测或分类
    

    分类模型

    • LogisticRegression 分类的基准模型
    • DecisionTreeClssifier 决策树 maxDepth minInstancePerNode 每个节点的最小数量 maxBins 连续变量被分割的bin的最大数量 而impurity 指定用于测量并计算来自分割的信息的度量
    • GBTClassifier 梯度提升决策树 支持二进制标签 连续和分类特征
    • RandomForestClassifier 支持二元标签和多项标签
    • NaiveBayes
    • MultilayerPerceptronClassifier 多层感知机分类器 输入和隐层都是sigmoid 输出层激活函数是softmax
    • OneVsRest 将多分类问题简化为二分类问题 多标签的分类问题可以转化为多个二元逻辑回归

    回归模型

    • AFTSurivalRegression 适合加速失效时间回归模型 假设其中一个特征的边际效应加速或减速了预期寿命(或过程失败) 适合有明确阶段的过程
    • DecisionTreeRegressor
    • GBTRegressor
    • GeneralizedLinearRegression 支持不同的误差分布 如gaussian binomial gamma poisson
    • IsotonicRegression 拟合一个形式自由 非递减的行列数据中 对于拟合有序的和递增的观测数据集是有用的
    • LinearRegression 线性回归
    • RandomForestRegressor 与DecisionTreeRegressor或GBTRegressor类似 RandomForestRegressor 适合连续的标签 而不是离散的标签

    聚类模型

    • BisectingKMeans 二分K均值算法 结合了K均值聚类算法和层次聚类算法 最初将所有点作为一个簇 然后将数据迭代的分解为k个簇
    • KMeans
    • GaussianMixture 高斯混合模型 使用具有未知参数的K个高斯分布来剖析数据集 使用期望最大化算法 通过最大似然函数找到高斯参数
    • LDA 自然语言处理中的主题生成

    三 管道

    管道是用来表示从转换到评估的过程
    demo实际使用

    from  pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from pyspark.ml.feature as ft
    spark = SparkSession.builder \
        .appName("testname") \
        .getOrCreate()
    
    # read data
    labels = [
        ('INFANT_ALIVE_AT_REPORT', IntegerType()),
        ('BIRTH_PLACE', StringType()),
        ('MOTHER_AGE_YEARS',IntegerType()),
        ('FATHER_COMBINED_AGE',IntegerType()),
        ('CIG_BEFORE',IntegerType()),
        ('CIG_1_TRI',IntegerType()),
        ('CIG_2_TRI',IntegerType()),
        ('CIG_3_TRI',IntegerType()),
        ('MOTHER_HEIGHT_IN',IntegerType()),
        ('MOTHER_PRE_WEIGHT',IntegerType()),
        ('MOTHER_DELIVERY_WEIGHT',IntegerType()),
        ('MOTHER_WEIGHT_GAIN',IntegerType()),
        ('DIABETES_PRE',IntegerType()),
        ('DIABETES_GEST',IntegerType()),
        ('HYP_TENS_PRE',IntegerType()),
        ('HYP_TENS_GEST',IntegerType()),
        ('PREV_BIRTH_PRETERM',StringType())
        ]
    
    schema = StructType([StructField(e[0],e[1],False) for e in labels])
    births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)
    
    
    # transformation
    # trans stringType into IntegerType
    births = births.withColumn('BIRTH_PLACE_INT',births["BIRTH_PLACE"].cast(IntegerType()))
    
    #Onehotencoder for BIRTH_PLACE_INT
    encoder = ft.OneHotEncoder(inputCol="BIRTH_PLACE_INT",outputCol="BIRHT_PLACE_VEC")
    
    #create a single col which merge all feas
    featuresCreator = ft.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCols="features")
    
    
    #create a model
    import pyspark.ml.classification as cl
    logistic = cl.LogisticRegression(maxiter=10,regParam=0.01,labelCol="INFANT_ALIVE_AT_REPORT")
    
    #create a pipline
    from pyspark.ml import Pipeline
    pipeline = Pipeline(stages=[encoder,featuresCreator,logistic])
    
    #fit the model
    births_train,births_test = births.randomSplit([0.7,0.3],seed=666)
    
    model = pipeline.fit(births_train)
    test_model = model.transform(births_test)
    
    #evaluate the performance
    import pyspark.ml.evaluation as ev
    evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol="INFANT_ALIVE_AT_REPORT")
    print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderROC"}))
    print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderPR"}))
    
    #save the pipeline
    pipelinePath = "./lr_pipline"
    pipeline.write().overwrite().save(pipelinePath)
    
    #load the pipeline and predict
    loadedPipeline = Pipeline.load(pipelinePath)
    loadedPipeline.fit(births_train).transform(births_test)
    
    #save the trained model  use PiplineModel
    from pyspark.ml import PipelineModel
    modelPath = "./lr_PipelineModel"
    model.write().overwrite().save(modelPath)
    
    loadedPipelineModel = PipelineModel.load(modelPath)
    test_reloadedModel = loadedPipelineModel.transform(births_test)
    
    
    #parameter optimize
    #grid search
    import pyspark.ml.tuning as tune
    logistic = cl.LogisticRegression(labelCol = "INFANT_ALIVE_AT_REPORT")
    grid = tune.ParamGridBuilder() \
        .addGrid(logistic.maxIter,[2,10,50]) \
        .addGrid(logistic.regParam,[0.01,0.05,0.3]) \
        .build()
    
    evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="INFANT_ALIVE_AT_REPORT")
    
    cv = tune.CrossValidator(estimator = logistic,estimatorParamMaps = grid, evaluator = evaluator)
    pipeline = Pipeline(stages=[encoder,featuresCreator])
    data_transformer = pipeline.fit(births_train)
    cvModel = cv.fit(data_transformer.transform(births_train))
    
    data_test = data_transformer.transform(births_test)
    results = cvModel.transform(data_test)
    
    print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
    print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))
    
    #extract the best model
    results = [([{key.name:paramValue} for key,paramValue in zip(params.keys(),params.values())],metric) for params,metric in zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)]
    sorted(results,key=lambda el:el[1],reverse=True)[0]
    
    
    #train-validation splitting   split datasets into train and test sets
    #select five features
    selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(),outputCol="selectedFeatures", labelCol = "INFANT_ALIVE_AT_REPORT")
    
    logistic = cl.LogisticRegression(labelCol="INFANT_ALIVE_AT_REPORT",featuresCol="selectedFeatures")
    pipeline = Pipeline(stages=[encoder,featureCreator,selector])
    data_transformer = pipeline.fit(births_train)
    tvs = tune.TrainValidationSplit(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)
    
    tvsModel = tvs.fit(data_transformer.transform(births_train))
    data_test = data_transformer.transform(births_test)
    results = tvsModel.transform(data_test)
    
    print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
    print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))
    
    

    相关文章

      网友评论

          本文标题:Spark ML

          本文链接:https://www.haomeiwen.com/subject/oxvcnqtx.html