美文网首页数据科学机器学习
8 基于PySpark的机器学习实战

8 基于PySpark的机器学习实战

作者: 7125messi | 来源:发表于2018-01-15 21:48 被阅读490次

第7节我们详细介绍了PySpark中机器学习ML模块的各种函数的用法,本节将进行实际的案例分析。

# # 1 加载数据

# In[18]:

from pyspark.sql import types

labels = [
    ('INFANT_ALIVE_AT_REPORT', types.DoubleType()),
    ('BIRTH_PLACE', types.StringType()),
    ('MOTHER_AGE_YEARS', types.DoubleType()),
    ('FATHER_COMBINED_AGE', types.DoubleType()),
    ('CIG_BEFORE', types.DoubleType()),
    ('CIG_1_TRI', types.DoubleType()),
    ('CIG_2_TRI', types.DoubleType()),
    ('CIG_3_TRI', types.DoubleType()),
    ('MOTHER_HEIGHT_IN', types.DoubleType()),
    ('MOTHER_PRE_WEIGHT', types.DoubleType()),
    ('MOTHER_DELIVERY_WEIGHT', types.DoubleType()),
    ('MOTHER_WEIGHT_GAIN', types.DoubleType()),
    ('DIABETES_PRE', types.DoubleType()),
    ('DIABETES_GEST', types.DoubleType()),
    ('HYP_TENS_PRE', types.DoubleType()),
    ('HYP_TENS_GEST', types.DoubleType()),
    ('PREV_BIRTH_PRETERM', types.DoubleType())
]

schema = types.StructType([
    types.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv('file:///opt/spark-2.2.1-bin-hadoop2.7/test/births_transformed.csv',header = True,schema = schema)
# In[19]:

births.columns
['INFANT_ALIVE_AT_REPORT',
 'BIRTH_PLACE',
 'MOTHER_AGE_YEARS',
 'FATHER_COMBINED_AGE',
 'CIG_BEFORE',
 'CIG_1_TRI',
 'CIG_2_TRI',
 'CIG_3_TRI',
 'MOTHER_HEIGHT_IN',
 'MOTHER_PRE_WEIGHT',
 'MOTHER_DELIVERY_WEIGHT',
 'MOTHER_WEIGHT_GAIN',
 'DIABETES_PRE',
 'DIABETES_GEST',
 'HYP_TENS_PRE',
 'HYP_TENS_GEST',
 'PREV_BIRTH_PRETERM']
# In[20]:

births.printSchema()
root
 |-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: double (nullable = true)
 |-- FATHER_COMBINED_AGE: double (nullable = true)
 |-- CIG_BEFORE: double (nullable = true)
 |-- CIG_1_TRI: double (nullable = true)
 |-- CIG_2_TRI: double (nullable = true)
 |-- CIG_3_TRI: double (nullable = true)
 |-- MOTHER_HEIGHT_IN: double (nullable = true)
 |-- MOTHER_PRE_WEIGHT: double (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: double (nullable = true)
 |-- DIABETES_PRE: double (nullable = true)
 |-- DIABETES_GEST: double (nullable = true)
 |-- HYP_TENS_PRE: double (nullable = true)
 |-- HYP_TENS_GEST: double (nullable = true)
 |-- PREV_BIRTH_PRETERM: double (nullable = true)
# In[21]:

births.select(births['BIRTH_PLACE']).show(3)
+-----------+
|BIRTH_PLACE|
+-----------+
|          1|
|          1|
|          1|
+-----------+
only showing top 3 rows
# # 2 创建转换器

# 在建模前,由于统计模型只能对数值数据做操作,必须对BIRTH_PLACE变量进行编码
# 
# 使用OneHotEncoder方法来对BIRTH_PALCE列进行编码,该方法不接受StringType(),只能处理数字类型,首先将该列转换为IntegerType()

# In[22]:

births = births.withColumn('BIRTH_PALCE_INT',births['BIRTH_PLACE'].cast(types.IntegerType()))
births.select(births['BIRTH_PLACE']).show(3)


# In[23]:

births.printSchema()
root
 |-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
 |-- BIRTH_PLACE: string (nullable = true)
 |-- MOTHER_AGE_YEARS: double (nullable = true)
 |-- FATHER_COMBINED_AGE: double (nullable = true)
 |-- CIG_BEFORE: double (nullable = true)
 |-- CIG_1_TRI: double (nullable = true)
 |-- CIG_2_TRI: double (nullable = true)
 |-- CIG_3_TRI: double (nullable = true)
 |-- MOTHER_HEIGHT_IN: double (nullable = true)
 |-- MOTHER_PRE_WEIGHT: double (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: double (nullable = true)
 |-- DIABETES_PRE: double (nullable = true)
 |-- DIABETES_GEST: double (nullable = true)
 |-- HYP_TENS_PRE: double (nullable = true)
 |-- HYP_TENS_GEST: double (nullable = true)
 |-- PREV_BIRTH_PRETERM: double (nullable = true)
 |-- BIRTH_PALCE_INT: integer (nullable = true)
# ## 2.1 创建第一个转换器

# In[25]:

from pyspark.ml import feature
encoder = feature.OneHotEncoder(inputCol='BIRTH_PALCE_INT',outputCol='BIRTH_PALCE_VEC')
# ## 2.2 创建第二个转换器

# 创建一个单一的列,它将所有特征整合在一起,使用VetorAssembler()转换器
# 
# 传递给VectorAssembler对象的inputCols参数是一个列表,该列表包含所有要合并在一起以组成outputCol——features的列

# In[27]:

featureCreator = feature.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],
                                        outputCol = 'features')


# 使用编码器对象的输出(调用.getOutputCol()方法),在任何时候更改了编码器对象中输出列的名称,不必更改此参数的值

# In[29]:

encoder.getOutputCol()
'BIRTH_PALCE_VEC'

# In[30]:

featureCreator.getOutputCol()
'features'
# # 3 创建评估器

# 先以逻辑回归模型作为分析

# In[32]:

from pyspark.ml import classification
logistic = classification.LogisticRegression(maxIter=10,regParam=0.01,labelCol='INFANT_ALIVE_AT_REPORT')
# # 4 创建一个管道

# In[34]:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
        encoder,
        featureCreator,
        logistic
    ])
# # 5 拟合模型(耗时)

# In[36]:

births_train,births_test = births.randomSplit([0.7,0.3],seed = 666)
model = pipeline.fit(births_train)
# # 6 预测

# In[37]:

test_model = model.transform(births_test)
type(test_model)
pyspark.sql.dataframe.DataFrame

# In[39]:

test_model.take(1)
[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]

# rawPrediction是特征和系数的线性组合值;
# 
# probability是为每个类别计算出的概率;
# 
# prediction是最终的类分配
# # 7 评估模型的性能

# In[40]:

from pyspark.ml import evaluation


# 使用BinaryClassificationEvaluator来检验模型的表现,rawPredictionCol可以是由评估器产生的rawPrediction列,也可以是probability列

# In[44]:

evaluator1 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
evaluator2 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='INFANT_ALIVE_AT_REPORT')


# 查看最后表现性能,关于ROC和PR相关概念,需要读者自行查阅,后续作者会推出一些理论知识作为补充

# In[48]:

print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderROC'}))
print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderPR'}))

print("====================================================================")

print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderROC'}))
print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderPR'}))
0.7364549402822995
0.7082584952026181
====================================================================
0.7364549402822995
0.7082584952026181
# # 8 保存模型

# 可以保存管道、转换器和评估器以备再用
# ## 8.1 保存管道

# In[49]:

pipelinePath = './encoder_featureCreator_logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
# ## 8.2 保存模型

# In[50]:

from pyspark.ml import PipelineModel
modelPath = './logistic_PipelineModel'
model.write().overwrite().save(modelPath)
# ## 8.3 以后如果想要管道和模型,可以直接加载并.fit()预测

# In[51]:

loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test).take(1)
[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]

# In[52]:

loadedPipelineModel = PipelineModel.load(modelPath)
loadedPipelineModel.transform(births_test).take(1)
[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
# # 9 超参调优(耗时)

# 第一个模型几乎不可能是我们想要的最佳模型,超参调优是寻找最佳模型的科学方法
# 
# 超参调优:找到模型的最佳参数,例如找到逻辑回归模型所需要的最大迭代次数、决策树的最大深度等等。
# 
# grid search和train-validation splitting是超参调优的常用方法
# ## 9.1 网格搜索法

# 根据给定评估指标,循环遍历定义的参数值列表,估计单独的模型,从而选择一个最佳的模型;
# 
# 当然如果定义优化的参数较多或者参数的值太多,需要耗费大量的时间才能选择除最佳模型。
# 
# 例如:两个参数,每个参数有两个值,则需要拟合4个模型(指数级增长)

# In[53]:

from pyspark.ml import tuning

# 指定模型和要循环遍历的参数列表
logistic = classification.LogisticRegression(labelCol = 'INFANT_ALIVE_AT_REPORT')

grid = tuning.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]).addGrid(logistic.regParam,[0.01,0.05,0.3]).build()


# In[54]:

# 某种比较模型的方法,使用CrossValidator需要评估器、estimatorParamMaps和evaluator。
# 该模型循环遍历值的网格,评估各个模型,并使用evaluator比较其性能。
evaluator = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
cv = tuning.CrossValidator(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)


# In[55]:

# 创建一个只用于转换的管道
pipeline = Pipeline(stages=[encoder,featureCreator])
data_transformer = pipeline.fit(births_train)


# In[56]:

# 寻找模型的最佳组合参数,cvModel将返回估计的最佳模型
cvModel = cv.fit(data_transformer.transform(births_train))


# In[58]:

# 超参调优,性能稍微有所改善
data_train = data_transformer.transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))
0.7375994284895242
0.7102487377506715

# In[60]:

# 最佳模型的参数提取
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, 
       key=lambda el: el[1], 
       reverse=True)[0]
([{'regParam': 0.01}, {'maxIter': 50}], 0.7393782132646929)
# ## 9.2 Train-validation划分

# 为了选择最佳模型,TrainValidationSplit模型对输入的数据集(训练集)执行随机划分,划分成两个子集:较小的训练集和测试集。划分仅执行一次。

# **使用`ChiSqSelector`只选出前5个特征,以此来限制模型的复杂度**

# In[62]:

selector = feature.ChiSqSelector(
    numTopFeatures=5, 
    featuresCol=featureCreator.getOutputCol(), 
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = classification.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT',
    featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featureCreator,selector])
data_transformer = pipeline.fit(births_train)


# In[63]:

tvs = tuning.TrainValidationSplit(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)


# In[64]:

# 特征少的模型比完整的模型表现稍差,但是差别不明显。
tvsModel = tvs.fit(data_transformer.transform(births_train))

data_train = data_transformer.transform(births_test)

results = tvsModel.transform(data_train)

print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderROC'}))

print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderPR'}))
0.7271377736609268
0.6992424489743029
# # 10 使用随机森林算法建模

# **需要将label特征转化为DoubleType**

# In[66]:

from pyspark.sql import functions
births = births.withColumn('INFANT_ALIVE_AT_REPORT', functions.col('INFANT_ALIVE_AT_REPORT').cast(types.DoubleType()))
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)


# In[67]:

classifier = classification.RandomForestClassifier(
    numTrees=5, 
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')

pipeline = Pipeline(
    stages=[
        encoder,
        featureCreator, 
        classifier])

model = pipeline.fit(births_train)
test = model.transform(births_test)


# In[68]:

evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))
0.7598074990441518
0.7440993755919022

# In[69]:
# 仅仅用一棵树的表现
classifier = classification.DecisionTreeClassifier(
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
        encoder,
        featureCreator, 
        classifier]
)

model = pipeline.fit(births_train)
test = model.transform(births_test)

evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))
0.7521769220006015
0.7595866540712273

相关文章

网友评论

本文标题:8 基于PySpark的机器学习实战

本文链接:https://www.haomeiwen.com/subject/lufjoxtx.html