美文网首页
Spark mllib

Spark mllib

作者: 枫隐_5f5f | 来源:发表于2019-04-26 23:04 被阅读0次

    mllib 包括三个核心机器学习功能

    1 数据准备

    特征提取 变换 选择 分类特征的散列 自然语言处理

    2 机器学习算法

    回归 分类 聚类

    3 实用程序

    统计方法 、如描述性统计、卡方检验 、 线性代数和模型评估方法

    MLLIB的数据结构是labeledPoint RDD 用于训练模型 由两个数据组成 标签和特征

    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    import sys
    spark = SparkSession.builder \
        .appName("TestName") \
        .getOrCreate()
    
    labels = [
        ('INFANT_ALIVE_AT_REPORT', StringType()),
        ('BIRTH_YEAR', IntegerType()),
        ('BIRTH_MONTH', IntegerType()),
        ('BIRTH_PLACE', StringType()),
        ('MOTHER_AGE_YEARS', IntegerType()),
        ('MOTHER_RACE_6CODE', StringType()),
        ('MOTHER_EDUCATION', StringType()),
        ('FATHER_COMBINED_AGE', IntegerType()),
        ('FATHER_EDUCATION', StringType()),
        ('MONTH_PRECARE_RECODE', StringType()),
        ('CIG_BEFORE', IntegerType()),
        ('CIG_1_TRI', IntegerType()),
        ('CIG_2_TRI', IntegerType()),
        ('CIG_3_TRI', IntegerType()),
        ('MOTHER_HEIGHT_IN', IntegerType()),
        ('MOTHER_BMI_RECODE', IntegerType()),
        ('MOTHER_PRE_WEIGHT', IntegerType()),
        ('MOTHER_DELIVERY_WEIGHT', IntegerType()),
        ('MOTHER_WEIGHT_GAIN', IntegerType()),
        ('DIABETES_PRE', StringType()),
        ('DIABETES_GEST', StringType()),
        ('HYP_TENS_PRE', StringType()),
        ('HYP_TENS_GEST', StringType()),
        ('PREV_BIRTH_PRETERM', StringType()),
        ('NO_RISK', StringType()),
        ('NO_INFECTIONS_REPORTED', StringType()),
        ('LABOR_IND', StringType()),
        ('LABOR_AUGM', StringType()),
        ('STEROIDS', StringType()),
        ('ANTIBIOTICS', StringType()),
        ('ANESTHESIA', StringType()),
        ('DELIV_METHOD_RECODE_COMB', StringType()),
        ('ATTENDANT_BIRTH', StringType()),
        ('APGAR_5', IntegerType()),
        ('APGAR_5_RECODE', StringType()),
        ('APGAR_10', IntegerType()),
        ('APGAR_10_RECODE', StringType()),
        ('INFANT_SEX', StringType()),
        ('OBSTETRIC_GESTATION_WEEKS', IntegerType()),
        ('INFANT_WEIGHT_GRAMS', IntegerType()),
        ('INFANT_ASSIST_VENTI', StringType()),
        ('INFANT_ASSIST_VENTI_6HRS', StringType()),
        ('INFANT_NICU_ADMISSION', StringType()),
        ('INFANT_SURFACANT', StringType()),
        ('INFANT_ANTIBIOTICS', StringType()),
        ('INFANT_SEIZURES', StringType()),
        ('INFANT_NO_ABNORMALITIES', StringType()),
        ('INFANT_ANCEPHALY', StringType()),
        ('INFANT_MENINGOMYELOCELE', StringType()),
        ('INFANT_LIMB_REDUCTION', StringType()),
        ('INFANT_DOWN_SYNDROME', StringType()),
        ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', StringType()),
        ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', StringType()),
        ('INFANT_BREASTFED', StringType())
    ]
    schema = StructType([StructField(e[0],e[1],False) for e in labels])
    
    births = spark.read.csv("file:///home/njliu/prc/pyspark/05/births_train.csv.gz",header=True,schema=schema)
    
    selected_feas = ["INFANT_ALIVE_AT_REPORT",
            "BIRTH_PLACE",
            "MOTHER_AGE_YEARS",
            "FATHER_COMBINED_AGE",
            "CIG_BEFORE",    
            "CIG_1_TRI",
            "CIG_2_TRI",
            "CIG_3_TRI",
            "MOTHER_HEIGHT_IN",    
            "MOTHER_PRE_WEIGHT",
            "MOTHER_DELIVERY_WEIGHT",
            "MOTHER_WEIGHT_GAIN",
            "DIABETES_PRE",
            "DIABETES_GEST",
            "HYP_TENS_PRE",
            "HYP_TENS_GEST",
            "PREV_BIRTH_PRETERM",
            ]
    
    births_trim = births.select(selected_feas)
    
    recode_dictionary = {"YNU":{"Y":1,"N":0,"U":0}}
    
    import pyspark.sql.functions as func
    
    def recode(col,key):
        return recode_dictionary[key][col]    
    
    def correct_cig(feat):
        return func.when(func.col(feat) != 99, func.col(feat)).otherwise(0)
    
    
    #transform the function recode into UDF which could be used in spark
    rec_integer = func.udf(recode,IntegerType())    
    
    births_transformed = births_trim.withColumn("CIG_BEFORE",correct_cig("CIG_BEFORE"))  \
        .withColumn("CIG_1_TRI",correct_cig("CIG_1_TRI")) \
        .withColumn("CIG_2_TRI",correct_cig("CIG_2_TRI")) \
        .withColumn("CIG_3_TRI",correct_cig("CIG_3_TRI"))
    
    cols = [(col.name, col.dataType) for col in births_trim.schema]
    YNU_cols = []
    for i,s in enumerate(cols):
        if s[1] == StringType():
            dis = births.select(s[0]).distinct().rdd.map(lambda x:x[0]).collect()
            if "Y" in dis:
                YNU_cols.append(s[0])
    
    exprs_YNU = [rec_integer(x,func.lit("YNU")).alias(x) if x in YNU_cols else x for x in births_transformed.columns]    
    
    births_transformed = births_transformed.select(exprs_YNU)
    #births_transformed.select(YNU_cols[-5:]).show(5)
    
    #summary feas  colStats
    import pyspark.mllib.stat as st
    import numpy as np
    numerical_cols = ["MOTHER_AGE_YEARS","FATHER_COMBINED_AGE","CIG_BEFORE","CIG_1_TRI","CIG_2_TRI","CIG_3_TRI","MOTHER_HEIGHT_IN","MOTHER_PRE_WEIGHT","MOTHER_DELIVERY_WEIGHT","MOTHER_WEIGHT_GAIN"]
    
    numeric_rdd = births_transformed.select(numerical_cols).rdd.map(lambda row:[e for e in row])
    mllib_stats = st.Statistics.colStats(numeric_rdd)
    for col,m,v in zip(numerical_cols,mllib_stats.mean(),mllib_stats.variance()):
        print ("{0}:\t{1:.2f}\t{2:.2f}".format(col,m,np.sqrt(v)))
    
    #calc categorical variables
    categorical_cols = [e for e in births_transformed.columns if e not in numerical_cols]
    categorical_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row:[e for e in row])
    
    #for i,col in enumerate(categorical_cols):
    #    agg = categorical_rdd.groupBy(lambda row:row[i]).map(lambda row:(row[0],len(row[1])))
    #    print (col,sorted(agg.collect(),key=lambda el:el[1],reverse=True))
    
    #numerical feas correlation
    corrs = st.Statistics.corr(numeric_rdd)
    print (corrs)
    for i,e in enumerate(corrs > 0.5):
        correlated = [(numerical_cols[j],corrs[i][j]) for j,e in enumerate(e) if e == 1.0 and j != i]
        if len(correlated) > 0:
            for e in correlated:
                print ("{0}-to-{1}:{2:.2f}".format(numerical_cols[i],e[0],e[1]))
    
    features_to_keep = [
        'INFANT_ALIVE_AT_REPORT',
        'BIRTH_PLACE',
        'MOTHER_AGE_YEARS',
        'FATHER_COMBINED_AGE',
        'CIG_1_TRI',
        'MOTHER_HEIGHT_IN',
        'MOTHER_PRE_WEIGHT',
        'DIABETES_PRE',
        'DIABETES_GEST',
        'HYP_TENS_PRE',
        'HYP_TENS_GEST',
        'PREV_BIRTH_PRETERM'
    ]
    
    births_transformed = births_transformed.select([e for e in features_to_keep])
    
    #statistical analysis
    #chiSqTest  feas and feas correlation for categorical variables
    import pyspark.mllib.linalg as ln
    for cat in categorical_cols[1:]:
        agg = births_transformed.groupby("INFANT_ALIVE_AT_REPORT") \
            .pivot(cat) \
            .count()
        agg_rdd = agg.rdd.map(lambda row:(row[1:])).flatMap(lambda row:[0 if e == None else e for e in row]).collect()
        row_length = len(agg.collect()[0]) - 1
        agg = ln.Matrices.dense(row_length,2,agg_rdd)
        test = st.Statistics.chiSqTest(agg)
        print (cat,round(test.pValue,4))
    
    
    
    
    #create dataset for model predict
    #translate dataframe to LabeledPoint RDD
    import pyspark.mllib.feature as ft
    import pyspark.mllib.regression as reg
    hashing = ft.HashingTF(7)
    births_hashed = births_transformed.rdd.map(lambda row:[list(hashing.transform(row[1]).toArray()) if col == "BIRTH_PLACE" else row[i] for i,col in enumerate(features_to_keep)]) \
        .map(lambda row:[[e] if type(e) == int else e for e in row]) \
        .map(lambda row:[item for sublist in row for item in sublist]) \
        .map(lambda row:reg.LabeledPoint(row[0],ln.Vectors.dense(row[1:])))
    
    #split train and test dataset
    births_train,births_test = births_hashed.randomSplit([0.6,0.4])
    
    #LR predict
    from pyspark.mllib.classification import LogisticRegressionWithLBFGS as LR
    lr_model = LR.train(births_train,iterations=10)
    
    lr_results = (births_test.map(lambda row:row.label) \
            .zip(lr_model.predict(births_test.map(lambda row:row.features)))).map(lambda row:(row[0],row[1] * 1.0))
    
    #evaluate
    import pyspark.mllib.evaluation as ev
    lr_ev = ev.BinaryClassificationMetrics(lr_results)
    print ("Area under PR:{}".format(lr_ev.areaUnderPR))
    print ("Area under ROC: {}".format(lr_ev.areaUnderROC))
    
    #feature selection with chi-square
    selector = ft.ChiSqSelector(4).fit(births_train)
    topFeatures_train = (
        births_train.map(lambda row:row.label) \
        .zip(selector.transform(births_train.map(lambda row:row.features)))
    
    ).map(lambda row:reg.LabeledPoint(row[0],row[1]))
    
    topFeatures_test = (
        births_test.map(lambda row:row.label) \
        .zip(selector.transform(births_test.map(lambda row:row.features)))
    
    ).map(lambda row:reg.LabeledPoint(row[0],row[1]))
    
    
    #random forest model
    from pyspark.mllib.tree import RandomForest
    rf_model = RandomForest.trainClassifier(data=topFeatures_train,
                    numClasses=2,
                    categoricalFeaturesInfo={},
                    numTrees=6,
                    featureSubsetStrategy="all",seed=666)
    
    rf_results = (topFeatures_test.map(lambda row:row.label) \
            .zip(rf_model.predict(topFeatures_test.map(lambda row:row.features))))
    
    
    rf_ev = ev.BinaryClassificationMetrics(rf_results)
    print ("Area under PR:{}".format(rf_ev.areaUnderPR))
    print ("Area under ROC: {}".format(rf_ev.areaUnderROC))
    
    

    相关文章

      网友评论

          本文标题:Spark mllib

          本文链接:https://www.haomeiwen.com/subject/zymcnqtx.html