第7节我们详细介绍了PySpark中机器学习ML模块的各种函数的用法,本节将进行实际的案例分析。
# # 1 加载数据
# In[18]:
from pyspark.sql import types
labels = [
('INFANT_ALIVE_AT_REPORT', types.DoubleType()),
('BIRTH_PLACE', types.StringType()),
('MOTHER_AGE_YEARS', types.DoubleType()),
('FATHER_COMBINED_AGE', types.DoubleType()),
('CIG_BEFORE', types.DoubleType()),
('CIG_1_TRI', types.DoubleType()),
('CIG_2_TRI', types.DoubleType()),
('CIG_3_TRI', types.DoubleType()),
('MOTHER_HEIGHT_IN', types.DoubleType()),
('MOTHER_PRE_WEIGHT', types.DoubleType()),
('MOTHER_DELIVERY_WEIGHT', types.DoubleType()),
('MOTHER_WEIGHT_GAIN', types.DoubleType()),
('DIABETES_PRE', types.DoubleType()),
('DIABETES_GEST', types.DoubleType()),
('HYP_TENS_PRE', types.DoubleType()),
('HYP_TENS_GEST', types.DoubleType()),
('PREV_BIRTH_PRETERM', types.DoubleType())
]
schema = types.StructType([
types.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('file:///opt/spark-2.2.1-bin-hadoop2.7/test/births_transformed.csv',header = True,schema = schema)
# In[19]:
births.columns
['INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_BEFORE',
'CIG_1_TRI',
'CIG_2_TRI',
'CIG_3_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT',
'MOTHER_WEIGHT_GAIN',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM']
# In[20]:
births.printSchema()
root
|-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
|-- BIRTH_PLACE: string (nullable = true)
|-- MOTHER_AGE_YEARS: double (nullable = true)
|-- FATHER_COMBINED_AGE: double (nullable = true)
|-- CIG_BEFORE: double (nullable = true)
|-- CIG_1_TRI: double (nullable = true)
|-- CIG_2_TRI: double (nullable = true)
|-- CIG_3_TRI: double (nullable = true)
|-- MOTHER_HEIGHT_IN: double (nullable = true)
|-- MOTHER_PRE_WEIGHT: double (nullable = true)
|-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
|-- MOTHER_WEIGHT_GAIN: double (nullable = true)
|-- DIABETES_PRE: double (nullable = true)
|-- DIABETES_GEST: double (nullable = true)
|-- HYP_TENS_PRE: double (nullable = true)
|-- HYP_TENS_GEST: double (nullable = true)
|-- PREV_BIRTH_PRETERM: double (nullable = true)
# In[21]:
births.select(births['BIRTH_PLACE']).show(3)
+-----------+
|BIRTH_PLACE|
+-----------+
| 1|
| 1|
| 1|
+-----------+
only showing top 3 rows
# # 2 创建转换器
# 在建模前,由于统计模型只能对数值数据做操作,必须对BIRTH_PLACE变量进行编码
#
# 使用OneHotEncoder方法来对BIRTH_PALCE列进行编码,该方法不接受StringType(),只能处理数字类型,首先将该列转换为IntegerType()
# In[22]:
births = births.withColumn('BIRTH_PALCE_INT',births['BIRTH_PLACE'].cast(types.IntegerType()))
births.select(births['BIRTH_PLACE']).show(3)
# In[23]:
births.printSchema()
root
|-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
|-- BIRTH_PLACE: string (nullable = true)
|-- MOTHER_AGE_YEARS: double (nullable = true)
|-- FATHER_COMBINED_AGE: double (nullable = true)
|-- CIG_BEFORE: double (nullable = true)
|-- CIG_1_TRI: double (nullable = true)
|-- CIG_2_TRI: double (nullable = true)
|-- CIG_3_TRI: double (nullable = true)
|-- MOTHER_HEIGHT_IN: double (nullable = true)
|-- MOTHER_PRE_WEIGHT: double (nullable = true)
|-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
|-- MOTHER_WEIGHT_GAIN: double (nullable = true)
|-- DIABETES_PRE: double (nullable = true)
|-- DIABETES_GEST: double (nullable = true)
|-- HYP_TENS_PRE: double (nullable = true)
|-- HYP_TENS_GEST: double (nullable = true)
|-- PREV_BIRTH_PRETERM: double (nullable = true)
|-- BIRTH_PALCE_INT: integer (nullable = true)
# ## 2.1 创建第一个转换器
# In[25]:
from pyspark.ml import feature
encoder = feature.OneHotEncoder(inputCol='BIRTH_PALCE_INT',outputCol='BIRTH_PALCE_VEC')
# ## 2.2 创建第二个转换器
# 创建一个单一的列,它将所有特征整合在一起,使用VetorAssembler()转换器
#
# 传递给VectorAssembler对象的inputCols参数是一个列表,该列表包含所有要合并在一起以组成outputCol——features的列
# In[27]:
featureCreator = feature.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],
outputCol = 'features')
# 使用编码器对象的输出(调用.getOutputCol()方法),在任何时候更改了编码器对象中输出列的名称,不必更改此参数的值
# In[29]:
encoder.getOutputCol()
'BIRTH_PALCE_VEC'
# In[30]:
featureCreator.getOutputCol()
'features'
# # 3 创建评估器
# 先以逻辑回归模型作为分析
# In[32]:
from pyspark.ml import classification
logistic = classification.LogisticRegression(maxIter=10,regParam=0.01,labelCol='INFANT_ALIVE_AT_REPORT')
# # 4 创建一个管道
# In[34]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
encoder,
featureCreator,
logistic
])
# # 5 拟合模型(耗时)
# In[36]:
births_train,births_test = births.randomSplit([0.7,0.3],seed = 666)
model = pipeline.fit(births_train)
# # 6 预测
# In[37]:
test_model = model.transform(births_test)
type(test_model)
pyspark.sql.dataframe.DataFrame
# In[39]:
test_model.take(1)
[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
# rawPrediction是特征和系数的线性组合值;
#
# probability是为每个类别计算出的概率;
#
# prediction是最终的类分配
# # 7 评估模型的性能
# In[40]:
from pyspark.ml import evaluation
# 使用BinaryClassificationEvaluator来检验模型的表现,rawPredictionCol可以是由评估器产生的rawPrediction列,也可以是probability列
# In[44]:
evaluator1 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
evaluator2 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='INFANT_ALIVE_AT_REPORT')
# 查看最后表现性能,关于ROC和PR相关概念,需要读者自行查阅,后续作者会推出一些理论知识作为补充
# In[48]:
print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderROC'}))
print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderPR'}))
print("====================================================================")
print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderROC'}))
print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderPR'}))
0.7364549402822995
0.7082584952026181
====================================================================
0.7364549402822995
0.7082584952026181
# # 8 保存模型
# 可以保存管道、转换器和评估器以备再用
# ## 8.1 保存管道
# In[49]:
pipelinePath = './encoder_featureCreator_logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
# ## 8.2 保存模型
# In[50]:
from pyspark.ml import PipelineModel
modelPath = './logistic_PipelineModel'
model.write().overwrite().save(modelPath)
# ## 8.3 以后如果想要管道和模型,可以直接加载并.fit()预测
# In[51]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test).take(1)
[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
# In[52]:
loadedPipelineModel = PipelineModel.load(modelPath)
loadedPipelineModel.transform(births_test).take(1)
[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
# # 9 超参调优(耗时)
# 第一个模型几乎不可能是我们想要的最佳模型,超参调优是寻找最佳模型的科学方法
#
# 超参调优:找到模型的最佳参数,例如找到逻辑回归模型所需要的最大迭代次数、决策树的最大深度等等。
#
# grid search和train-validation splitting是超参调优的常用方法
# ## 9.1 网格搜索法
# 根据给定评估指标,循环遍历定义的参数值列表,估计单独的模型,从而选择一个最佳的模型;
#
# 当然如果定义优化的参数较多或者参数的值太多,需要耗费大量的时间才能选择除最佳模型。
#
# 例如:两个参数,每个参数有两个值,则需要拟合4个模型(指数级增长)
# In[53]:
from pyspark.ml import tuning
# 指定模型和要循环遍历的参数列表
logistic = classification.LogisticRegression(labelCol = 'INFANT_ALIVE_AT_REPORT')
grid = tuning.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]).addGrid(logistic.regParam,[0.01,0.05,0.3]).build()
# In[54]:
# 某种比较模型的方法,使用CrossValidator需要评估器、estimatorParamMaps和evaluator。
# 该模型循环遍历值的网格,评估各个模型,并使用evaluator比较其性能。
evaluator = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
cv = tuning.CrossValidator(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)
# In[55]:
# 创建一个只用于转换的管道
pipeline = Pipeline(stages=[encoder,featureCreator])
data_transformer = pipeline.fit(births_train)
# In[56]:
# 寻找模型的最佳组合参数,cvModel将返回估计的最佳模型
cvModel = cv.fit(data_transformer.transform(births_train))
# In[58]:
# 超参调优,性能稍微有所改善
data_train = data_transformer.transform(births_test)
results = cvModel.transform(data_train)
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))
0.7375994284895242
0.7102487377506715
# In[60]:
# 最佳模型的参数提取
results = [
(
[
{key.name: paramValue}
for key, paramValue
in zip(
params.keys(),
params.values())
], metric
)
for params, metric
in zip(
cvModel.getEstimatorParamMaps(),
cvModel.avgMetrics
)
]
sorted(results,
key=lambda el: el[1],
reverse=True)[0]
([{'regParam': 0.01}, {'maxIter': 50}], 0.7393782132646929)
# ## 9.2 Train-validation划分
# 为了选择最佳模型,TrainValidationSplit模型对输入的数据集(训练集)执行随机划分,划分成两个子集:较小的训练集和测试集。划分仅执行一次。
# **使用`ChiSqSelector`只选出前5个特征,以此来限制模型的复杂度**
# In[62]:
selector = feature.ChiSqSelector(
numTopFeatures=5,
featuresCol=featureCreator.getOutputCol(),
outputCol='selectedFeatures',
labelCol='INFANT_ALIVE_AT_REPORT'
)
logistic = classification.LogisticRegression(
labelCol='INFANT_ALIVE_AT_REPORT',
featuresCol='selectedFeatures'
)
pipeline = Pipeline(stages=[encoder,featureCreator,selector])
data_transformer = pipeline.fit(births_train)
# In[63]:
tvs = tuning.TrainValidationSplit(
estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator
)
# In[64]:
# 特征少的模型比完整的模型表现稍差,但是差别不明显。
tvsModel = tvs.fit(data_transformer.transform(births_train))
data_train = data_transformer.transform(births_test)
results = tvsModel.transform(data_train)
print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderPR'}))
0.7271377736609268
0.6992424489743029
# # 10 使用随机森林算法建模
# **需要将label特征转化为DoubleType**
# In[66]:
from pyspark.sql import functions
births = births.withColumn('INFANT_ALIVE_AT_REPORT', functions.col('INFANT_ALIVE_AT_REPORT').cast(types.DoubleType()))
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)
# In[67]:
classifier = classification.RandomForestClassifier(
numTrees=5,
maxDepth=5,
labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(
stages=[
encoder,
featureCreator,
classifier])
model = pipeline.fit(births_train)
test = model.transform(births_test)
# In[68]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))
0.7598074990441518
0.7440993755919022
# In[69]:
# 仅仅用一棵树的表现
classifier = classification.DecisionTreeClassifier(
maxDepth=5,
labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
encoder,
featureCreator,
classifier]
)
model = pipeline.fit(births_train)
test = model.transform(births_test)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))
0.7521769220006015
0.7595866540712273
网友评论