美文网首页
基于spark随机森林的水质预测

基于spark随机森林的水质预测

作者: 藤风 | 来源:发表于2019-10-12 18:19 被阅读0次

    根据水质监测信息预测水质变化趋势,对水环境的有效防范治理具有重要意义。目前水质预测方法主要分为两类,一类为基于污染物在水环境中的理化过程建立的数值模型,主要包括WASP、QUAL、MIKE等;另一类为基于数据驱动的机器学习方法及深度学习方法,主要包括LSTM、adaboost、随机森林等。本文基于spark分布式计算框架实现随机森林算法进行水质预测。

    1、准备数据

    将数据上传到HDFS分布式文件系统上,再利用hive建立外部表,建表语句如下:

    
    create external table wayeal_forecast.water (`time` string COMMENT 'from deserializer',
          `id` string COMMENT 'from deserializer',
          `name` STRING COMMENT 'from deserializer', 
          `basin` string COMMENT 'from deserializer', 
          `section` STRING COMMENT 'from deserializer', 
          `ph` STRING COMMENT 'from deserializer', 
          `ph_type` string COMMENT 'from deserializer', 
          `do` string COMMENT 'from deserializer', 
          `do_type` string COMMENT 'from deserializer', 
          `nh3_n` string COMMENT 'from deserializer', 
          `nh3_n_type` STRING COMMENT 'from deserializer', 
          `codmn` STRING COMMENT 'from deserializer', 
          `codmn_type` STRING COMMENT 'from deserializer', 
          `c` STRING COMMENT 'from deserializer', 
          `c_type` STRING COMMENT 'from deserializer') 
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES (
    "separatorChar" = ","
    )
    

    部分数据如下所示:


    数据样本.PNG

    2、模型开发

    首先,从hive中读取数据:

    data = self.spark.sql(HIVE_SQL).select(WATER_FACTOR)
    data1 = data.filter(data['id'] == '78')
    

    然后,由于原始数据为时间序列数据,需将其转换成监督学习数据,代码如下:

          data = data.withColumn("id", monotonically_increasing_id())
            for colName in SELECT_WATER_FACTOR:
                for i in range(1, n_hours + 1, 1):
                    w = Window.orderBy("id")
                    data = data.withColumn("{}(t-{})".format(colName, i), lag(colName, i).over(w))
            data = data.na.drop()
            data = data.drop("id")
    

    最后,利用pipeline封装整个算法流程,并基于ParamGridBuilder及TrainValidationSplit实现网格搜索进行模型调优。代码如下:

            (train_data, test_data) = data.randomSplit([0.7, 0.3])
            data_col = data.columns
            data_col.remove('time')
            input_cols = [col for col in data_col if col not in SELECT_WATER_FACTOR]
            vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="featureVector")
            rf_regressor = RandomForestRegressor()\
                .setFeaturesCol("featureVector")\
                .setLabelCol("ph")\
                .setPredictionCol("prediction")
    
            param_grid = ParamGridBuilder()\
                .addGrid(rf_regressor.numTrees, [10, 50, 100, 150, 200, 500])\
                .build()
    
            pipeline = Pipeline(stages=[vector_assembler, rf_regressor])
    
            # model = pipeline.fit(train_data)
            # predictions = model.transform(test_data)
            evaluator = RegressionEvaluator(labelCol="ph", predictionCol="prediction", metricName="rmse")
    
            validator = TrainValidationSplit()\
                .setEstimator(pipeline)\
                .setEstimatorParamMaps(param_grid)\
                .setEvaluator(evaluator)\
                .setTrainRatio(0.9)
    
            validator_model = validator.fit(train_data)
    
            best_model = validator_model.bestModel
            predictions = best_model.transform(test_data)
            rmse = evaluator.evaluate(predictions)
            print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
            rf_model = best_model.stages[1]
            print(rf_model)
            predictions.show(truncate=False)
    

    最优模型结果如下:

    Root Mean Squared Error (RMSE) on test data = 0.202249
    RandomForestRegressionModel (uid=RandomForestRegressor_8bde32f77a3e) with 150 trees
    

    相关文章

      网友评论

          本文标题:基于spark随机森林的水质预测

          本文链接:https://www.haomeiwen.com/subject/rvccmctx.html