Spark ML

作者: 枫隐_5f5f | 来源:发表于2019-04-27 10:33 被阅读0次

MLLIB 包是基于RDD操作
ML包是基于DataFrame操作的

ML包主要分成三个部分:

  • 转换器
  • 评估器
  • 管道

一 转换器

将一个新列附加到dataframe上来转换数据
当从转换器的抽象类派生时,每个新的转换器类需要实现 .transform 方法 该方法要求传递一个要被转换的DataFrame

在spark.ml.feature 中提供了许多转换器

  • Binarizer 根据指定的阈值将连续变量转换为对应的二进制值
  • Bucketizer 与Binarizer类似,该方法根据阈值列表将连续变量转换成多项值
  • ChisqSelector 对于分类目标变量 允许选择预定义数量的特征 使用分为两步 fit(df) .transform()
  • CountVectorizer 标记文本
  • DCT 离散余弦变换取实数值向量 并返回相同长度的向量
  • ElementwiseProduct 该方法返回一个向量 其中的元素是传入该方法的向量和另一个传入参数scalingVec的向量的乘积
  • HashingTF 哈希转换器 输入为标记文本的列表 返回一个带有计数的有预定长度的向量
  • MaxAbsScaler 将数据调整到【-1,1】 范围内 中心不会移动
  • MinMaxScaler 将数据缩放到【0,1】 范围内
  • StandardScaler 标准化列 使其拥有零均值和等于1的标准差
  • NGram 输入为标记文本的列表 返回结果包含一系列n-gram 以两个词 三个词或更多词形成一个n-gram 例如输入【“good”,“morning”,“robin”,"williams"】 则会输出 【"good moring ", "moring robin", "robin williams"】
  • Normalizer 该方法使用p范数将数据缩放为单位范数 默认是L2
  • OneHotEncoder 独热编码
  • PCA 主成分进行数据降维
  • PolynomialExpansion 执行向量的多项式展开 例如输出【x,y,z】 输出则为 【x,xx,y,xy,yy,z,xz,yz,zz】
  • QuantileDiscretizer 与Bucketizer 方法类似 但不是传递分割参数 而是numBuckets 通过计算数据的近似分位数来决定分割点
  • RegexTokenizer 使用正则表达式的字符串分词器
  • RFormular R语言中的formular
  • SQLTransformer 可是转换SQL语句
  • StopWordsRemover 从标记文本中删除停用词
  • Tokenizer 将字符串转换成小写 按照空格分隔
  • VectorAssembler 将多个数字或向量列合并成一列向量
  • word2vec 将一个句子作为输入 并将其转换为{string ,vector} 格式的映射

二 评估器

  评估器就是模型   对观测对象进行预测或分类

分类模型

  • LogisticRegression 分类的基准模型
  • DecisionTreeClssifier 决策树 maxDepth minInstancePerNode 每个节点的最小数量 maxBins 连续变量被分割的bin的最大数量 而impurity 指定用于测量并计算来自分割的信息的度量
  • GBTClassifier 梯度提升决策树 支持二进制标签 连续和分类特征
  • RandomForestClassifier 支持二元标签和多项标签
  • NaiveBayes
  • MultilayerPerceptronClassifier 多层感知机分类器 输入和隐层都是sigmoid 输出层激活函数是softmax
  • OneVsRest 将多分类问题简化为二分类问题 多标签的分类问题可以转化为多个二元逻辑回归

回归模型

  • AFTSurivalRegression 适合加速失效时间回归模型 假设其中一个特征的边际效应加速或减速了预期寿命(或过程失败) 适合有明确阶段的过程
  • DecisionTreeRegressor
  • GBTRegressor
  • GeneralizedLinearRegression 支持不同的误差分布 如gaussian binomial gamma poisson
  • IsotonicRegression 拟合一个形式自由 非递减的行列数据中 对于拟合有序的和递增的观测数据集是有用的
  • LinearRegression 线性回归
  • RandomForestRegressor 与DecisionTreeRegressor或GBTRegressor类似 RandomForestRegressor 适合连续的标签 而不是离散的标签

聚类模型

  • BisectingKMeans 二分K均值算法 结合了K均值聚类算法和层次聚类算法 最初将所有点作为一个簇 然后将数据迭代的分解为k个簇
  • KMeans
  • GaussianMixture 高斯混合模型 使用具有未知参数的K个高斯分布来剖析数据集 使用期望最大化算法 通过最大似然函数找到高斯参数
  • LDA 自然语言处理中的主题生成

三 管道

管道是用来表示从转换到评估的过程
demo实际使用

from  pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.feature as ft
spark = SparkSession.builder \
    .appName("testname") \
    .getOrCreate()

# read data
labels = [
    ('INFANT_ALIVE_AT_REPORT', IntegerType()),
    ('BIRTH_PLACE', StringType()),
    ('MOTHER_AGE_YEARS',IntegerType()),
    ('FATHER_COMBINED_AGE',IntegerType()),
    ('CIG_BEFORE',IntegerType()),
    ('CIG_1_TRI',IntegerType()),
    ('CIG_2_TRI',IntegerType()),
    ('CIG_3_TRI',IntegerType()),
    ('MOTHER_HEIGHT_IN',IntegerType()),
    ('MOTHER_PRE_WEIGHT',IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT',IntegerType()),
    ('MOTHER_WEIGHT_GAIN',IntegerType()),
    ('DIABETES_PRE',IntegerType()),
    ('DIABETES_GEST',IntegerType()),
    ('HYP_TENS_PRE',IntegerType()),
    ('HYP_TENS_GEST',IntegerType()),
    ('PREV_BIRTH_PRETERM',StringType())
    ]

schema = StructType([StructField(e[0],e[1],False) for e in labels])
births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)


# transformation
# trans stringType into IntegerType
births = births.withColumn('BIRTH_PLACE_INT',births["BIRTH_PLACE"].cast(IntegerType()))

#Onehotencoder for BIRTH_PLACE_INT
encoder = ft.OneHotEncoder(inputCol="BIRTH_PLACE_INT",outputCol="BIRHT_PLACE_VEC")

#create a single col which merge all feas
featuresCreator = ft.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCols="features")


#create a model
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxiter=10,regParam=0.01,labelCol="INFANT_ALIVE_AT_REPORT")

#create a pipline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder,featuresCreator,logistic])

#fit the model
births_train,births_test = births.randomSplit([0.7,0.3],seed=666)

model = pipeline.fit(births_train)
test_model = model.transform(births_test)

#evaluate the performance
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol="INFANT_ALIVE_AT_REPORT")
print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderPR"}))

#save the pipeline
pipelinePath = "./lr_pipline"
pipeline.write().overwrite().save(pipelinePath)

#load the pipeline and predict
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test)

#save the trained model  use PiplineModel
from pyspark.ml import PipelineModel
modelPath = "./lr_PipelineModel"
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(births_test)


#parameter optimize
#grid search
import pyspark.ml.tuning as tune
logistic = cl.LogisticRegression(labelCol = "INFANT_ALIVE_AT_REPORT")
grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,[2,10,50]) \
    .addGrid(logistic.regParam,[0.01,0.05,0.3]) \
    .build()

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="INFANT_ALIVE_AT_REPORT")

cv = tune.CrossValidator(estimator = logistic,estimatorParamMaps = grid, evaluator = evaluator)
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)
cvModel = cv.fit(data_transformer.transform(births_train))

data_test = data_transformer.transform(births_test)
results = cvModel.transform(data_test)

print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))

#extract the best model
results = [([{key.name:paramValue} for key,paramValue in zip(params.keys(),params.values())],metric) for params,metric in zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)]
sorted(results,key=lambda el:el[1],reverse=True)[0]


#train-validation splitting   split datasets into train and test sets
#select five features
selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(),outputCol="selectedFeatures", labelCol = "INFANT_ALIVE_AT_REPORT")

logistic = cl.LogisticRegression(labelCol="INFANT_ALIVE_AT_REPORT",featuresCol="selectedFeatures")
pipeline = Pipeline(stages=[encoder,featureCreator,selector])
data_transformer = pipeline.fit(births_train)
tvs = tune.TrainValidationSplit(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)

tvsModel = tvs.fit(data_transformer.transform(births_train))
data_test = data_transformer.transform(births_test)
results = tvsModel.transform(data_test)

print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))

相关文章

网友评论

      本文标题:Spark ML

      本文链接:https://www.haomeiwen.com/subject/oxvcnqtx.html