MLLIB 包是基于RDD操作
ML包是基于DataFrame操作的
ML包主要分成三个部分:
- 转换器
- 评估器
- 管道
一 转换器
将一个新列附加到dataframe上来转换数据
当从转换器的抽象类派生时,每个新的转换器类需要实现 .transform 方法 该方法要求传递一个要被转换的DataFrame
在spark.ml.feature 中提供了许多转换器
- Binarizer 根据指定的阈值将连续变量转换为对应的二进制值
- Bucketizer 与Binarizer类似,该方法根据阈值列表将连续变量转换成多项值
- ChisqSelector 对于分类目标变量 允许选择预定义数量的特征 使用分为两步 fit(df) .transform()
- CountVectorizer 标记文本
- DCT 离散余弦变换取实数值向量 并返回相同长度的向量
- ElementwiseProduct 该方法返回一个向量 其中的元素是传入该方法的向量和另一个传入参数scalingVec的向量的乘积
- HashingTF 哈希转换器 输入为标记文本的列表 返回一个带有计数的有预定长度的向量
- MaxAbsScaler 将数据调整到【-1,1】 范围内 中心不会移动
- MinMaxScaler 将数据缩放到【0,1】 范围内
- StandardScaler 标准化列 使其拥有零均值和等于1的标准差
- NGram 输入为标记文本的列表 返回结果包含一系列n-gram 以两个词 三个词或更多词形成一个n-gram 例如输入【“good”,“morning”,“robin”,"williams"】 则会输出 【"good moring ", "moring robin", "robin williams"】
- Normalizer 该方法使用p范数将数据缩放为单位范数 默认是L2
- OneHotEncoder 独热编码
- PCA 主成分进行数据降维
- PolynomialExpansion 执行向量的多项式展开 例如输出【x,y,z】 输出则为 【x,xx,y,xy,yy,z,xz,yz,zz】
- QuantileDiscretizer 与Bucketizer 方法类似 但不是传递分割参数 而是numBuckets 通过计算数据的近似分位数来决定分割点
- RegexTokenizer 使用正则表达式的字符串分词器
- RFormular R语言中的formular
- SQLTransformer 可是转换SQL语句
- StopWordsRemover 从标记文本中删除停用词
- Tokenizer 将字符串转换成小写 按照空格分隔
- VectorAssembler 将多个数字或向量列合并成一列向量
- word2vec 将一个句子作为输入 并将其转换为{string ,vector} 格式的映射
二 评估器
评估器就是模型 对观测对象进行预测或分类
分类模型
- LogisticRegression 分类的基准模型
- DecisionTreeClssifier 决策树 maxDepth minInstancePerNode 每个节点的最小数量 maxBins 连续变量被分割的bin的最大数量 而impurity 指定用于测量并计算来自分割的信息的度量
- GBTClassifier 梯度提升决策树 支持二进制标签 连续和分类特征
- RandomForestClassifier 支持二元标签和多项标签
- NaiveBayes
- MultilayerPerceptronClassifier 多层感知机分类器 输入和隐层都是sigmoid 输出层激活函数是softmax
- OneVsRest 将多分类问题简化为二分类问题 多标签的分类问题可以转化为多个二元逻辑回归
回归模型
- AFTSurivalRegression 适合加速失效时间回归模型 假设其中一个特征的边际效应加速或减速了预期寿命(或过程失败) 适合有明确阶段的过程
- DecisionTreeRegressor
- GBTRegressor
- GeneralizedLinearRegression 支持不同的误差分布 如gaussian binomial gamma poisson
- IsotonicRegression 拟合一个形式自由 非递减的行列数据中 对于拟合有序的和递增的观测数据集是有用的
- LinearRegression 线性回归
- RandomForestRegressor 与DecisionTreeRegressor或GBTRegressor类似 RandomForestRegressor 适合连续的标签 而不是离散的标签
聚类模型
- BisectingKMeans 二分K均值算法 结合了K均值聚类算法和层次聚类算法 最初将所有点作为一个簇 然后将数据迭代的分解为k个簇
- KMeans
- GaussianMixture 高斯混合模型 使用具有未知参数的K个高斯分布来剖析数据集 使用期望最大化算法 通过最大似然函数找到高斯参数
- LDA 自然语言处理中的主题生成
三 管道
管道是用来表示从转换到评估的过程
demo实际使用
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.feature as ft
spark = SparkSession.builder \
.appName("testname") \
.getOrCreate()
# read data
labels = [
('INFANT_ALIVE_AT_REPORT', IntegerType()),
('BIRTH_PLACE', StringType()),
('MOTHER_AGE_YEARS',IntegerType()),
('FATHER_COMBINED_AGE',IntegerType()),
('CIG_BEFORE',IntegerType()),
('CIG_1_TRI',IntegerType()),
('CIG_2_TRI',IntegerType()),
('CIG_3_TRI',IntegerType()),
('MOTHER_HEIGHT_IN',IntegerType()),
('MOTHER_PRE_WEIGHT',IntegerType()),
('MOTHER_DELIVERY_WEIGHT',IntegerType()),
('MOTHER_WEIGHT_GAIN',IntegerType()),
('DIABETES_PRE',IntegerType()),
('DIABETES_GEST',IntegerType()),
('HYP_TENS_PRE',IntegerType()),
('HYP_TENS_GEST',IntegerType()),
('PREV_BIRTH_PRETERM',StringType())
]
schema = StructType([StructField(e[0],e[1],False) for e in labels])
births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)
# transformation
# trans stringType into IntegerType
births = births.withColumn('BIRTH_PLACE_INT',births["BIRTH_PLACE"].cast(IntegerType()))
#Onehotencoder for BIRTH_PLACE_INT
encoder = ft.OneHotEncoder(inputCol="BIRTH_PLACE_INT",outputCol="BIRHT_PLACE_VEC")
#create a single col which merge all feas
featuresCreator = ft.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCols="features")
#create a model
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxiter=10,regParam=0.01,labelCol="INFANT_ALIVE_AT_REPORT")
#create a pipline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[encoder,featuresCreator,logistic])
#fit the model
births_train,births_test = births.randomSplit([0.7,0.3],seed=666)
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
#evaluate the performance
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol="INFANT_ALIVE_AT_REPORT")
print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(test_model,{evaluator.metricName:"areaUnderPR"}))
#save the pipeline
pipelinePath = "./lr_pipline"
pipeline.write().overwrite().save(pipelinePath)
#load the pipeline and predict
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test)
#save the trained model use PiplineModel
from pyspark.ml import PipelineModel
modelPath = "./lr_PipelineModel"
model.write().overwrite().save(modelPath)
loadedPipelineModel = PipelineModel.load(modelPath)
test_reloadedModel = loadedPipelineModel.transform(births_test)
#parameter optimize
#grid search
import pyspark.ml.tuning as tune
logistic = cl.LogisticRegression(labelCol = "INFANT_ALIVE_AT_REPORT")
grid = tune.ParamGridBuilder() \
.addGrid(logistic.maxIter,[2,10,50]) \
.addGrid(logistic.regParam,[0.01,0.05,0.3]) \
.build()
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol="probability",labelCol="INFANT_ALIVE_AT_REPORT")
cv = tune.CrossValidator(estimator = logistic,estimatorParamMaps = grid, evaluator = evaluator)
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)
cvModel = cv.fit(data_transformer.transform(births_train))
data_test = data_transformer.transform(births_test)
results = cvModel.transform(data_test)
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))
#extract the best model
results = [([{key.name:paramValue} for key,paramValue in zip(params.keys(),params.values())],metric) for params,metric in zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)]
sorted(results,key=lambda el:el[1],reverse=True)[0]
#train-validation splitting split datasets into train and test sets
#select five features
selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(),outputCol="selectedFeatures", labelCol = "INFANT_ALIVE_AT_REPORT")
logistic = cl.LogisticRegression(labelCol="INFANT_ALIVE_AT_REPORT",featuresCol="selectedFeatures")
pipeline = Pipeline(stages=[encoder,featureCreator,selector])
data_transformer = pipeline.fit(births_train)
tvs = tune.TrainValidationSplit(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)
tvsModel = tvs.fit(data_transformer.transform(births_train))
data_test = data_transformer.transform(births_test)
results = tvsModel.transform(data_test)
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderROC"}))
print (evaluator.evaluate(results,{evaluator.metricName:"areaUnderPR"}))
网友评论