美文网首页
Spark 分布式实现贝叶斯判别

Spark 分布式实现贝叶斯判别

作者: k_wzzc | 来源:发表于2019-03-27 23:26 被阅读0次

    Spark 分布式实现贝叶斯判别

    贝叶斯公式

    贝叶斯
    贝叶斯公式
    Bi 通常视为A发生的”原因“,P(Bi)称为先验概率(主观概率),表示各种原因发生的可能性大小;P(Bi|A)(i=1,2...)则反映当出现结果A之后,再对各种“原因”概率的新认识,故称后验概率。

    一个小案例

    大家都知道狼来了的故事,我们就有贝叶斯的思想来解释一下这个故事:
    在最开始的时候,大家对于放羊娃的认识不够深刻,主观上认为放羊娃说真话(记为事件B1)和说假话(记为事件B2)的概率相同。即:
    P(B_1) = P(B_2) = 0.5
    再假设狼来了(记为事件A),说谎话喊狼来了时,狼来的概率为1/3,说真话喊狼来了时,狼来的概率是2/3,即:
    P(A|B_1) = 1/3 ; P(A|B_2) =2/3

    第一次村民上山打狼,狼没来(记为事件\overline{A}),此时村民们对放羊娃就有了新的认识:
    狼没来的情况下小孩说谎了(在村民们的主观印象上,小孩说谎的概率增加了):
    P(B_1|\overline{A})=\frac{P(\overline{A}|B_1)P(B_1)}{\sum_{k=1}^N P(\overline{A}|B_i)P(B_i)} = \frac{8}{11}

    随着小孩说谎的次数增加,村民们对于小孩说谎的主观概率也不断增加,当这个概率增加到一定程度时计算小孩说真话,村民们就不会再相信他。

    贝叶斯判别

    之前提到的两种判别分析方法都非常简单,实用,但是也存在着一定的缺点:一是判别方法与各个总体出现的概率大小无关,而是与错判后造成的损失无关。贝叶斯判别则考虑了这两种情况:贝叶斯判别假定对样本有一定的认知(先验概率),然后计算得出后验概率并进行统计推断的判别方法。

    贝叶斯判别求解过程

    由于过程较长,公式比较多,直接上书


    在这里插入图片描述 在这里插入图片描述
    在这里插入图片描述

    代码实现过程

    本案例使用的数据为鸢尾花数据集

    def main(args: Array[String]): Unit = {
    
        val spark = SparkSession
          .builder()
          .appName(s"${this.getClass.getSimpleName}")
          .master("local[*]")
          .getOrCreate()
    
        import spark.implicits._
    
        val sc = spark.sparkContext
    
        val irisData = spark.read
          .option("header", true)
          .option("inferSchema", true)
          .csv("F:\\DataSource\\iris.csv")
    
        val schema = irisData.schema
        val fts = schema.filterNot(_.name == "class").map(_.name).toArray
    
        val amountVectorAssembler: VectorAssembler = new VectorAssembler()
          .setInputCols(fts)
          .setOutputCol("features")
    
        val vec2Array = udf((vec: DenseVector) => vec.toArray)
    
        val irisFeatrus = amountVectorAssembler
          .transform(irisData)
          .select($"class", vec2Array($"features") as "features")
    
        val p: Long = irisFeatrus.count()
    
       // 计算均值向量的自定义聚合函数(请参考之前的两篇文章)
        val ui = spark.udf.register("udafMedian", new meanVector(fts.length))
    
        // 计算样本均值向量
        val uiGroup = irisFeatrus
          .groupBy($"class")
          .agg(ui($"features") as "ui", count($"class") as "len")
    
        val covMatrix = irisFeatrus
          .join(uiGroup, "class")
          .rdd
          .map(row => {
            val lable = row.getAs[String]("class")
            val len = row.getAs[Long]("len")
            val u = densevec(row.getAs[Seq[Double]]("ui").toArray)
            val x = densevec(row.getAs[Seq[Double]]("features").toArray)
            val denseMatrix = (x - u).toDenseMatrix
            lable -> (denseMatrix, u, len)
          })
          .reduceByKey((d1, d2) => {
            // 矩阵合并,均值向量,样本大小
            (DenseMatrix.vertcat(d1._1, d2._1), d1._2, d1._3)
          })
          .mapValues(tp => {
            val covm: DenseMatrix[Double] =
              (tp._1.t * tp._1).map(_ / (tp._3 - 1)) //协方差矩阵
            val qi = math.log(tp._3.toDouble / p) // 先验概率,在此默认为各类样本的频率
              (covm, tp._2.toDenseMatrix, qi)
          })
    
    
        val covBroad = sc.broadcast(covMatrix.collect())
        val predictudf = udf((seq: Seq[Double]) => {
          val dist: Array[(String, Double)] = covBroad.value
            .map(tp => {
            /**
            * 计算判别函数的相关指标
            **/
              val xi = densevec(seq.toArray).toDenseMatrix
              val inCov = inv(tp._2._1)
              val lnCov = math.log(det(tp._2._1)) / 2
              val xdiff = (xi * inCov * xi.t).data.head / 2
              val mdist = (tp._2._2 * inv(tp._2._1) * tp._2._2.t).data.head / 2
              val xu = (xi * inCov * tp._2._2.t).data.head
              val d = tp._2._3 - lnCov - xdiff - mdist + xu
              (tp._1, d)
            })
    
          val pm = dist.map(x => math.exp(x._2)).sum
    
         // 计算后验概率
          dist.map(tp => {
            tp._1 -> math.exp(tp._2) / pm
          })
          
        })
    
        irisFeatrus
          .withColumn("prediction", predictudf($"features"))
          .show(truncate = false)
    
        spark.stop()
      }
    

    结果查看:从结果看出,分类效果还是很好的

    |class      |features            |prediction |
    +-----------+--------------------+-----------+
    |Iris-setosa|[5.1, 3.5, 1.4, 0.2]|Iris-setosa|
    |Iris-setosa|[4.9, 3.0, 1.4, 0.2]|Iris-setosa|
    |Iris-setosa|[4.7, 3.2, 1.3, 0.2]|Iris-setosa|
    

    由于作者水平有限,在介绍及实现过程中难免有纰漏之处,欢迎细心的朋友指正

    参考资料:

    《多元统计分析及R语言建模》--王斌会
    《概率论与数理统计》 --茆师松

    相关文章

      网友评论

          本文标题:Spark 分布式实现贝叶斯判别

          本文链接:https://www.haomeiwen.com/subject/xwsfpqtx.html