美文网首页
异常值检测算法--高斯分布

异常值检测算法--高斯分布

作者: 王金松 | 来源:发表于2019-07-22 14:19 被阅读0次

www.mamicode.com/info-detail-1464635.html
异常检测原理是根据训练数据的高斯分布,计算均值和方差,若测试数据样本点带入高斯公式计算的概率低于某个阈值(0.1),判定为异常点。

  1. 创建数据集转化工具类,把csv数据集转化为RDD数据结构
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD



object FeaturesParser{
  def parseFeatures(rawdata: RDD[String]): RDD[Vector] = {
    val rdd: RDD[Array[Double]] = rawdata.map(_.split(",").map(_.toDouble))
    val vectors: RDD[Vector] = rdd.map(arrDouble => Vectors.dense(arrDouble))
    vectors
  }

  def parseFeaturesWithLabel(cvData: RDD[String]): RDD[LabeledPoint] = {
    val rdd: RDD[Array[Double]] = cvData.map(_.split(",").map(_.toDouble))
    val labeledPoints = rdd.map(arrDouble => new LabeledPoint(arrDouble(0), Vectors.dense(arrDouble.slice(1, arrDouble.length))))
    labeledPoints
  }
}
  1. 创建异常检测工具类,主要是预测是否为异常点
object AnomalyDetection {


  /**
    * True if the given point is an anomaly, false otherwise
    * @param point
    * @param means
    * @param variances
    * @param epsilon
    * @return
    */
  def predict (point: Vector, means: Vector, variances: Vector, epsilon: Double): Boolean = {
    println("-->")
    println("-->v1"+probFunction(point, means, variances))
    println("-->v2"+epsilon)
    probFunction(point, means, variances) < epsilon
  }

  def probFunction(point: Vector, means: Vector, variances: Vector): Double = {
    val tripletByFeature: List[(Double, Double, Double)] = (point.toArray, means.toArray, variances.toArray).zipped.toList
    tripletByFeature.map { triplet =>
      val x = triplet._1
      val mean = triplet._2
      val variance = triplet._3
      val expValue = Math.pow(Math.E, -0.5 * Math.pow(x - mean,2) / variance)
      (1.0 / (Math.sqrt(variance) * Math.sqrt(2.0 * Math.PI))) * expValue
    }.product
  }
}
  1. 异常检测模型类
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD

class AnomalyDetectionModel(means2: Vector, variances2: Vector, epsilon2: Double) extends java.io.Serializable{
   var means: Vector = means2
   var variances: Vector = variances2
   var epsilon: Double = epsilon2

   def predict(point: Vector) : Boolean ={
      println("-->1")
      AnomalyDetection.predict(point, means, variances, epsilon)
   }
   
   def predict(points: RDD[Vector]): RDD[(Vector, Boolean)] = {
    println("-->2")
    points.map(p => (p,AnomalyDetection.predict(p, means, variances, epsilon)))
   }
}
  1. 包括启动异常检测模型,优化参数,输出评价指标等函数功能(注意序列化Serializable )
import org.apache.spark.Logging
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.rdd.RDD

/**
  * Anomaly Detection algorithm
  */
class AnomalyDetection extends java.io.Serializable with Logging {

  val default_epsilon: Double = 0.01

  def run(data: RDD[Vector]): AnomalyDetectionModel = {
    val sc = data.sparkContext

    val stats: MultivariateStatisticalSummary = Statistics.colStats(data)
    val mean: Vector = stats.mean
    val variances: Vector = stats.variance
    logInfo("MEAN %s VARIANCE %s".format(mean, variances))
    // println(s"--> MEAN VARIANCE$mean,$variances")
    println("--> MEAN VARIANCE"+mean+variances)
    new AnomalyDetectionModel(mean, variances, default_epsilon)
  }

  /**
    * Uses the labeled input points to optimize the epsilon parameter by finding the best F1 Score
    * @param crossValData
    * @param anomalyDetectionModel
    * @return
    */
  def optimize(crossValData: RDD[LabeledPoint], anomalyDetectionModel: AnomalyDetectionModel) = {
    val sc = crossValData.sparkContext
    val bcMean = sc.broadcast(anomalyDetectionModel.means)
    val bcVar = sc.broadcast(anomalyDetectionModel.variances)

    //compute probability density function for each example in the cross validation set
    val probsCV: RDD[Double] = crossValData.map(labeledpoint =>
      AnomalyDetection.probFunction(labeledpoint.features, bcMean.value, bcVar.value)
    )

    //select epsilon
    crossValData.persist()
    val epsilonWithF1Score: (Double, Double) = evaluate(crossValData, probsCV)
    crossValData.unpersist()

    logInfo("Best epsilon %s F1 score %s".format(epsilonWithF1Score._1, epsilonWithF1Score._2))
    new AnomalyDetectionModel(anomalyDetectionModel.means, anomalyDetectionModel.variances, epsilonWithF1Score._1)
  }

  /**
    *  Finds the best threshold to use for selecting outliers based on the results from a validation set and the ground truth.
    *
    * @param crossValData labeled data
    * @param probsCV probability density function as calculated for the labeled data
    * @return Epsilon and the F1 score
    */
  private def evaluate(crossValData: RDD[LabeledPoint], probsCV: RDD[Double]) = {

    val minPval: Double = probsCV.min()
    val maxPval: Double = probsCV.max()
    logInfo("minPVal: %s, maxPVal %s".format(minPval, maxPval))
    val sc = probsCV.sparkContext

    var bestEpsilon = 0D
    var bestF1 = 0D

    val stepsize = (maxPval - minPval) / 1000.0

    //find best F1 for different epsilons
    for (epsilon <- minPval to maxPval by stepsize){

      val bcepsilon = sc.broadcast(epsilon)

      val ourPredictions: RDD[Double] = probsCV.map{ prob =>
        if (prob < bcepsilon.value)
          1.0 //anomaly
        else
          0.0
      }
      val labelAndPredictions: RDD[(Double, Double)] = crossValData.map(_.label).zip(ourPredictions)
      val labelWithPredictionCached: RDD[(Double, Double)] = labelAndPredictions

      val falsePositives = countStatisticalMeasure(labelWithPredictionCached, 0.0, 1.0)
      val truePositives = countStatisticalMeasure(labelWithPredictionCached, 1.0, 1.0)
      val falseNegatives = countStatisticalMeasure(labelWithPredictionCached, 1.0, 0.0)

      val precision = truePositives / Math.max(1.0, truePositives + falsePositives)
      val recall = truePositives / Math.max(1.0, truePositives + falseNegatives)

      val f1Score = 2.0 * precision * recall / (precision + recall)

      if (f1Score > bestF1){
        bestF1 = f1Score
        bestEpsilon = epsilon
      }
    }

    (bestEpsilon, bestF1)
  }

  /**
    * Function to calculate true / false positives, negatives
    *
    * @param labelWithPredictionCached
    * @param labelVal
    * @param predictionVal
    * @return
    */
  private def countStatisticalMeasure(labelWithPredictionCached: RDD[(Double, Double)], labelVal: Double, predictionVal: Double): Double = {
    labelWithPredictionCached.filter { labelWithPrediction =>
      val label = labelWithPrediction._1
      val prediction = labelWithPrediction._2
      label == labelVal && prediction == predictionVal
    }.count().toDouble
  }

}
  1. 读取数据集,在hdfs的路径/user/mapr/,转化为RDD,训练模型,预测异常点:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

// val conf = new SparkConf().setAppName("Anomaly Detection Spark2")
// val sc = new SparkContext(conf)

val rawFilePath = "/user/mapr/training.csv"
val cvFilePath = "/user/mapr/cross_val.csv"
val rawdata = sc.textFile(rawFilePath, 2).cache()
val cvData = sc.textFile(cvFilePath, 2).cache()

val trainingVec: RDD[Vector] = FeaturesParser.parseFeatures(rawdata)
val cvLabeledVec: RDD[LabeledPoint] = FeaturesParser.parseFeaturesWithLabel(cvData)

// trainingVec.collect().foreach(println)
// cvLabeledVec.collect().foreach(println)

val data = trainingVec.cache()
val anDet: AnomalyDetection = new AnomalyDetection()
//derive model
val model = anDet.run(data)

val dataCvVec = cvLabeledVec.cache()
// val optimalModel = anDet.optimize(dataCvVec, model)

//find outliers in CV
val cvVec = cvLabeledVec.map(_.features)
// cvVec.collect().foreach(println)
// print("-->"+typeOf[cvVec])
val results = model.predict(cvVec)


// results.collect().foreach(println)
val outliers = results.filter(_._2).collect()
// outliers.foreach(v => println(v._1))
println("\nFound %s outliers\n".format(outliers.length))

相关文章

  • 异常值检测算法--高斯分布

    www.mamicode.com/info-detail-1464635.html异常检测原理是根据训练数据的高斯...

  • Udacity 数据分析进阶课程笔记L38:异常值

    通过3个联系,直观认识异常值outliers 异常值检测/删除算法:训练->删除误差最大的10%数据->再训练使用...

  • 吴恩达机器学习-Chapter 16 异常检测

    目的:介绍无监督学习异常检测算法,主要是用高斯分布(正态分布)数据模型 1. Lesson 123 问题动机  ...

  • 异常值检测算法--箱线图四分位检测异常值

    算法 首先,给大家讲下什么叫四分位数。顾名思义,就是把一堆数据排序会分成四份,找出其中的那三个点。中间那个叫中位数...

  • pandas数据缺失值|异常值|重复值处理

    缺失值处理 检测异常值 检测异常值的方法:https://blog.csdn.net/qianfeng_dashu...

  • 异常值检测

    异常值也称为离群值,是有些一个或者几个与其他数值差别较大。 (一)统计得方法 1.3σ原则 对于服从正态分布的数据...

  • 异常值检测

    背景 有时候数据集中会包含一个或多个数值异常大或异常小的值,这样的极端值称为异常值 对于异常值,我们该怎么办呢? ...

  • 异常值检测

    简单统计 散点图 3∂原则 这个原则有个条件:数据需要服从正态分布。在3∂原则下,异常值如超过3倍标准差,那么可以...

  • Udacity-异常值

    1.产生异常值的原因 2.异常值检测/删除算法 a.训练所有的数据b.去除错误的点,一般占10%c.对当前减小后的...

  • 18、如何识别数据中的异常值

    Bojan Miletic在使用机器学习算法时询问了有关数据集中异常值检测的问题。这篇文章是对他的问题的回答。 离...

网友评论

      本文标题:异常值检测算法--高斯分布

      本文链接:https://www.haomeiwen.com/subject/pktczqtx.html