美文网首页spark
MLeap线下线上pipeline测试

MLeap线下线上pipeline测试

作者: jacksu在简书 | 来源:发表于2019-04-10 11:09 被阅读36次

    实验目的

    为了实现开箱即用的机器学习平台,只需简单修改配置,就可实现线下特征处理和训练的模型,一键部署到线上,保持线上线下一致性。离线训练平台选择了spark,线上模型部署选择了Mleap。

    实验说明

    使用开源数据lending_club,基于spark pipeline构造LR模型训练,并使用MLeap对模型进行保存,并通过单机运行MLeap,加载模型通过PipeLine对数据进行预测,达到PipeLine自动部署,线上线下特征转换的一致性。

    offline

    第一、二步,对lending_club数据进行简单处理,比如数据去除null

    第三步,把数据分为train和test两个数据集

    第四-七步,构造特征处理的Pipeline

    第八步,训练模型

    第九步,调用MLeap保存模型

    如下是具体代码:

    package com.yeahmobi
    // Spark Training Pipeline Libraries
    import org.apache.spark.ml.feature._
    import org.apache.spark.ml.classification.{LogisticRegression, RandomForestClassifier}
    import org.apache.spark.ml.{Pipeline, PipelineStage}
    import org.apache.spark.sql.SparkSession
    import com.databricks.spark.avro._
     
    // MLeap/Bundle.ML Serialization Libraries
    import ml.combust.mleap.spark.SparkSupport._
    import resource._
    import ml.combust.bundle.BundleFile
    import org.apache.spark.ml.bundle.SparkBundleContext
     
    object LendingClubDemo {
      def main(args: Array[String]): Unit = {
        val inputFile = "s3://algo.yeahmobi.com/etl/test/lending_club.avro"
        val spark = SparkSession
          .builder()
          .appName("mleapDemo")
          .getOrCreate()
     
        //Step 1 load data and preprocess
        var dataset = spark.sqlContext.read.format("com.databricks.spark.avro").
          load(inputFile)
     
        dataset.createOrReplaceTempView("df")
        println(dataset.count())
     
        val datasetFnl = spark.sqlContext.sql(f"""
        select
            loan_amount,
            fico_score_group_fnl,
            case when dti >= 10.0
                then 10.0
                else dti
            end as dti,
            emp_length,
            case when state in ('CA', 'NY', 'MN', 'IL', 'FL', 'WA', 'MA', 'TX', 'GA', 'OH', 'NJ', 'VA', 'MI')
                then state
                else 'Other'
            end as state,
            loan_title,
            approved
        from df
        where loan_title in('Debt Consolidation', 'Other', 'Home/Home Improvement', 'Payoff Credit Card', 'Car Payment/Loan',
        'Business Loan', 'Health/Medical', 'Moving', 'Wedding/Engagement', 'Vacation', 'College', 'Renewable Energy', 'Payoff Bills',
        'Personal Loan', 'Motorcycle')
    """)
        println(datasetFnl.count())
     
        // Step 2: Define continous and categorical features and filter nulls
        val continuousFeatures = Array("loan_amount",
          "dti")
     
        val categoricalFeatures = Array("loan_title",
          "emp_length",
          "state",
          "fico_score_group_fnl")
     
        val allFeatures = continuousFeatures.union(categoricalFeatures)
     
        // Filter all null values
        val allCols = allFeatures.union(Seq("approved")).map(datasetFnl.col)
        val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
        val datasetImputedFiltered = datasetFnl.select(allCols: _*).filter(nullFilter).persist()
     
        println(datasetImputedFiltered.count())
     
        //Step 3: Split data into training and validation¶
        val Array(trainingDataset, validationDataset) = datasetImputedFiltered.randomSplit(Array(0.7, 0.3))
     
        //Step 4: Continous Feature Pipeline
        val continuousFeatureAssembler = new VectorAssembler(uid = "continuous_feature_assembler").
          setInputCols(continuousFeatures).
          setOutputCol("unscaled_continuous_features")
     
        val continuousFeatureScaler = new StandardScaler(uid = "continuous_feature_scaler").
          setInputCol("unscaled_continuous_features").
          setOutputCol("scaled_continuous_features")
     
        val polyExpansionAssembler = new VectorAssembler(uid = "poly_expansion_feature_assembler").
          setInputCols(Array("loan_amount", "dti")).
          setOutputCol("poly_expansions_features")
     
        val continuousFeaturePolynomialExpansion = new PolynomialExpansion(uid = "polynomial_expansion_loan_amount").
          setInputCol("poly_expansions_features").
          setOutputCol("loan_amount_polynomial_expansion_features")
     
        //Step 5: Categorical Feature Pipeline
        val categoricalFeatureIndexers = categoricalFeatures.map {
          feature => new StringIndexer(uid = s"string_indexer_$feature").
            setInputCol(feature).
            setOutputCol(s"${feature}_index")
        }
     
        val categoricalFeatureOneHotEncoders = categoricalFeatureIndexers.map {
          indexer => new OneHotEncoder(uid = s"oh_encoder_${indexer.getOutputCol}").
            setInputCol(indexer.getOutputCol).
            setOutputCol(s"${indexer.getOutputCol}_oh")
        }
     
        //Step 6: Assemble our features and feature pipeline
        val featureColsLr = categoricalFeatureOneHotEncoders.map(_.getOutputCol).union(Seq("scaled_continuous_features"))
     
        //Step 7: assemble all processes categorical and continuous features into a single feature vector
        val featureAssemblerLr = new VectorAssembler(uid = "feature_assembler_lr").
          setInputCols(featureColsLr).
          setOutputCol("features_lr")
     
        val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler, polyExpansionAssembler, continuousFeaturePolynomialExpansion).
          union(categoricalFeatureIndexers).
          union(categoricalFeatureOneHotEncoders).
          union(Seq(featureAssemblerLr))
     
        val featurePipeline = new Pipeline(uid = "feature_pipeline").
          setStages(estimators)
        val sparkFeaturePipelineModel = featurePipeline.fit(datasetImputedFiltered)
     
        //Step 8: Train Logistic Regression Model
        val logisticRegression = new LogisticRegression(uid = "logistic_regression").
          setFeaturesCol("features_lr").
          setLabelCol("approved").
          setPredictionCol("approved_prediction")
     
        val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, logisticRegression))
        val sparkPipelineLr = sparkPipelineEstimatorLr.fit(datasetImputedFiltered)
     
        println("Complete: Training Logistic Regression")
     
        //Step 9: (Optional): Serialize your models to bundle.ml
        val sbc = SparkBundleContext().withDataset(sparkPipelineLr.transform(datasetImputedFiltered))
        for(bf <- managed(BundleFile("jar:file:/tmp/lc.model.lr.zip"))) {
          sparkPipelineLr.writeBundle.save(bf)(sbc).get
        }
      }
    }
    

    Online

    随便提取lending_club中的一条数据,构造MLeap DataFrame,然后调用

    val mleapPipeline = bundle.root
    val frame2 = mleapPipeline.transform(frame)
    即可以完成特征处理和预测。

    package com.yeahmobi
    import ml.combust.bundle.BundleFile
    import ml.combust.mleap.core.types.StructField
    import ml.combust.mleap.runtime.MleapSupport._
    import ml.combust.mleap.tensor.DenseTensor
    import resource._
     
    object Online {
      def main(args: Array[String]): Unit = {
        // load the Spark pipeline we saved in the previous section
        val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/D:\\workspace\\ml\\lc.model.lr.zip"))) yield {
          bundleFile.loadMleapBundle().get
        }).opt.get
     
        // create a simple LeapFrame to transform
        import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
        import ml.combust.mleap.core.types._
     
        // MLeap makes extensive use of monadic types like Try
        val schema = StructType(StructField("loan_amount", ScalarType.Double),
          StructField("fico_score_group_fnl", ScalarType.String),
          StructField("dti", ScalarType.Double),
          StructField("emp_length", ScalarType.String),
          StructField("state", ScalarType.String),
          StructField("loan_title", ScalarType.String)).get
        val data = Seq(Row(1000.0,"700 - 800",0.1, "< 1 year","MA","Debt Consolidation"))
        val frame = DefaultLeapFrame(schema, data)
     
        // transform the dataframe using our pipeline
        val mleapPipeline = bundle.root
        val frame2 = mleapPipeline.transform(frame).get
        val data2 = frame2.dataset
        frame2.show()
        for(pro<-frame2.select("rawPrediction")){
          println(pro.dataset.head.getTensor(0))
          println(pro.dataset.head.getTensor(0).toArray.mkString(","))
        }
        //println(data2(0).toList)
        //println(item.asInstanceOf[ml.combust.mleap.tensor.Tensor].toArray.mkString(","))
      }
    }
    

    总结

    MLeap暂时不支持SQL,这样不可以线下、线上对数据预处理保持一致,但是支持Spark Pipeline是他最大优点。

    如有疑问,欢迎关注下面公众号进行交流。

    image.png

    相关文章

      网友评论

        本文标题:MLeap线下线上pipeline测试

        本文链接:https://www.haomeiwen.com/subject/pmogiqtx.html