美文网首页
异常值检测算法--高斯分布

异常值检测算法--高斯分布

作者: 王金松 | 来源:发表于2019-07-22 14:19 被阅读0次

    www.mamicode.com/info-detail-1464635.html
    异常检测原理是根据训练数据的高斯分布,计算均值和方差,若测试数据样本点带入高斯公式计算的概率低于某个阈值(0.1),判定为异常点。

    1. 创建数据集转化工具类,把csv数据集转化为RDD数据结构
    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.rdd.RDD
    
    
    
    object FeaturesParser{
      def parseFeatures(rawdata: RDD[String]): RDD[Vector] = {
        val rdd: RDD[Array[Double]] = rawdata.map(_.split(",").map(_.toDouble))
        val vectors: RDD[Vector] = rdd.map(arrDouble => Vectors.dense(arrDouble))
        vectors
      }
    
      def parseFeaturesWithLabel(cvData: RDD[String]): RDD[LabeledPoint] = {
        val rdd: RDD[Array[Double]] = cvData.map(_.split(",").map(_.toDouble))
        val labeledPoints = rdd.map(arrDouble => new LabeledPoint(arrDouble(0), Vectors.dense(arrDouble.slice(1, arrDouble.length))))
        labeledPoints
      }
    }
    
    1. 创建异常检测工具类,主要是预测是否为异常点
    object AnomalyDetection {
    
    
      /**
        * True if the given point is an anomaly, false otherwise
        * @param point
        * @param means
        * @param variances
        * @param epsilon
        * @return
        */
      def predict (point: Vector, means: Vector, variances: Vector, epsilon: Double): Boolean = {
        println("-->")
        println("-->v1"+probFunction(point, means, variances))
        println("-->v2"+epsilon)
        probFunction(point, means, variances) < epsilon
      }
    
      def probFunction(point: Vector, means: Vector, variances: Vector): Double = {
        val tripletByFeature: List[(Double, Double, Double)] = (point.toArray, means.toArray, variances.toArray).zipped.toList
        tripletByFeature.map { triplet =>
          val x = triplet._1
          val mean = triplet._2
          val variance = triplet._3
          val expValue = Math.pow(Math.E, -0.5 * Math.pow(x - mean,2) / variance)
          (1.0 / (Math.sqrt(variance) * Math.sqrt(2.0 * Math.PI))) * expValue
        }.product
      }
    }
    
    1. 异常检测模型类
    import org.apache.spark.mllib.linalg._
    import org.apache.spark.rdd.RDD
    
    class AnomalyDetectionModel(means2: Vector, variances2: Vector, epsilon2: Double) extends java.io.Serializable{
       var means: Vector = means2
       var variances: Vector = variances2
       var epsilon: Double = epsilon2
    
       def predict(point: Vector) : Boolean ={
          println("-->1")
          AnomalyDetection.predict(point, means, variances, epsilon)
       }
       
       def predict(points: RDD[Vector]): RDD[(Vector, Boolean)] = {
        println("-->2")
        points.map(p => (p,AnomalyDetection.predict(p, means, variances, epsilon)))
       }
    }
    
    1. 包括启动异常检测模型,优化参数,输出评价指标等函数功能(注意序列化Serializable )
    import org.apache.spark.Logging
    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
    import org.apache.spark.rdd.RDD
    
    /**
      * Anomaly Detection algorithm
      */
    class AnomalyDetection extends java.io.Serializable with Logging {
    
      val default_epsilon: Double = 0.01
    
      def run(data: RDD[Vector]): AnomalyDetectionModel = {
        val sc = data.sparkContext
    
        val stats: MultivariateStatisticalSummary = Statistics.colStats(data)
        val mean: Vector = stats.mean
        val variances: Vector = stats.variance
        logInfo("MEAN %s VARIANCE %s".format(mean, variances))
        // println(s"--> MEAN VARIANCE$mean,$variances")
        println("--> MEAN VARIANCE"+mean+variances)
        new AnomalyDetectionModel(mean, variances, default_epsilon)
      }
    
      /**
        * Uses the labeled input points to optimize the epsilon parameter by finding the best F1 Score
        * @param crossValData
        * @param anomalyDetectionModel
        * @return
        */
      def optimize(crossValData: RDD[LabeledPoint], anomalyDetectionModel: AnomalyDetectionModel) = {
        val sc = crossValData.sparkContext
        val bcMean = sc.broadcast(anomalyDetectionModel.means)
        val bcVar = sc.broadcast(anomalyDetectionModel.variances)
    
        //compute probability density function for each example in the cross validation set
        val probsCV: RDD[Double] = crossValData.map(labeledpoint =>
          AnomalyDetection.probFunction(labeledpoint.features, bcMean.value, bcVar.value)
        )
    
        //select epsilon
        crossValData.persist()
        val epsilonWithF1Score: (Double, Double) = evaluate(crossValData, probsCV)
        crossValData.unpersist()
    
        logInfo("Best epsilon %s F1 score %s".format(epsilonWithF1Score._1, epsilonWithF1Score._2))
        new AnomalyDetectionModel(anomalyDetectionModel.means, anomalyDetectionModel.variances, epsilonWithF1Score._1)
      }
    
      /**
        *  Finds the best threshold to use for selecting outliers based on the results from a validation set and the ground truth.
        *
        * @param crossValData labeled data
        * @param probsCV probability density function as calculated for the labeled data
        * @return Epsilon and the F1 score
        */
      private def evaluate(crossValData: RDD[LabeledPoint], probsCV: RDD[Double]) = {
    
        val minPval: Double = probsCV.min()
        val maxPval: Double = probsCV.max()
        logInfo("minPVal: %s, maxPVal %s".format(minPval, maxPval))
        val sc = probsCV.sparkContext
    
        var bestEpsilon = 0D
        var bestF1 = 0D
    
        val stepsize = (maxPval - minPval) / 1000.0
    
        //find best F1 for different epsilons
        for (epsilon <- minPval to maxPval by stepsize){
    
          val bcepsilon = sc.broadcast(epsilon)
    
          val ourPredictions: RDD[Double] = probsCV.map{ prob =>
            if (prob < bcepsilon.value)
              1.0 //anomaly
            else
              0.0
          }
          val labelAndPredictions: RDD[(Double, Double)] = crossValData.map(_.label).zip(ourPredictions)
          val labelWithPredictionCached: RDD[(Double, Double)] = labelAndPredictions
    
          val falsePositives = countStatisticalMeasure(labelWithPredictionCached, 0.0, 1.0)
          val truePositives = countStatisticalMeasure(labelWithPredictionCached, 1.0, 1.0)
          val falseNegatives = countStatisticalMeasure(labelWithPredictionCached, 1.0, 0.0)
    
          val precision = truePositives / Math.max(1.0, truePositives + falsePositives)
          val recall = truePositives / Math.max(1.0, truePositives + falseNegatives)
    
          val f1Score = 2.0 * precision * recall / (precision + recall)
    
          if (f1Score > bestF1){
            bestF1 = f1Score
            bestEpsilon = epsilon
          }
        }
    
        (bestEpsilon, bestF1)
      }
    
      /**
        * Function to calculate true / false positives, negatives
        *
        * @param labelWithPredictionCached
        * @param labelVal
        * @param predictionVal
        * @return
        */
      private def countStatisticalMeasure(labelWithPredictionCached: RDD[(Double, Double)], labelVal: Double, predictionVal: Double): Double = {
        labelWithPredictionCached.filter { labelWithPrediction =>
          val label = labelWithPrediction._1
          val prediction = labelWithPrediction._2
          label == labelVal && prediction == predictionVal
        }.count().toDouble
      }
    
    }
    
    1. 读取数据集,在hdfs的路径/user/mapr/,转化为RDD,训练模型,预测异常点:
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.rdd.RDD
    
    // val conf = new SparkConf().setAppName("Anomaly Detection Spark2")
    // val sc = new SparkContext(conf)
    
    val rawFilePath = "/user/mapr/training.csv"
    val cvFilePath = "/user/mapr/cross_val.csv"
    val rawdata = sc.textFile(rawFilePath, 2).cache()
    val cvData = sc.textFile(cvFilePath, 2).cache()
    
    val trainingVec: RDD[Vector] = FeaturesParser.parseFeatures(rawdata)
    val cvLabeledVec: RDD[LabeledPoint] = FeaturesParser.parseFeaturesWithLabel(cvData)
    
    // trainingVec.collect().foreach(println)
    // cvLabeledVec.collect().foreach(println)
    
    val data = trainingVec.cache()
    val anDet: AnomalyDetection = new AnomalyDetection()
    //derive model
    val model = anDet.run(data)
    
    val dataCvVec = cvLabeledVec.cache()
    // val optimalModel = anDet.optimize(dataCvVec, model)
    
    //find outliers in CV
    val cvVec = cvLabeledVec.map(_.features)
    // cvVec.collect().foreach(println)
    // print("-->"+typeOf[cvVec])
    val results = model.predict(cvVec)
    
    
    // results.collect().foreach(println)
    val outliers = results.filter(_._2).collect()
    // outliers.foreach(v => println(v._1))
    println("\nFound %s outliers\n".format(outliers.length))
    

    相关文章

      网友评论

          本文标题:异常值检测算法--高斯分布

          本文链接:https://www.haomeiwen.com/subject/pktczqtx.html