美文网首页
机器学习-Scala实现(一)

机器学习-Scala实现(一)

作者: 言隐 | 来源:发表于2019-01-03 23:44 被阅读0次

    机器学习定义

    机器学习完善了编程的思路,从通过编程的方式明确指定规则到从数据中通过一定算法自动学习提取规则,然后用这些规则解决现实生活中的问题。比如分类(Classifying),聚类(Clustering),预测回归(Predicate &Regression,图像物体识别(Image Object recognition/Detection),自然语言处理(NLP-Natural Language Processing)等等都可用机器学习的方法来解决。特别是有些问题通常是通过传统的预设规则的方式几乎不可能解决的(比如图像识别,手写识别等)。借助于机器学习,通过学习训练模型(Training Model),规则封装在模型中,通过学习的方法训练获取模型,进而将训练好的模型应用于实际生产中。

    编程语言选择考虑

    本系列主要是集中于机器学习实践,用Scala实现机器学习算法(比如最简单的KMeans聚类,基于熵增益决策树,直到深度学习等等),之所以选择Scala仅仅是个人偏好,Scala是函数式编程语言(Pure Functional Programming Language),语法简单实用,特别适合于数据处理,当然了,对OO(Object-Oriented Programming)的概念也支持的很好,还有个主要的原因是它编译以后运行于JVM上,这样几乎大量的Java库经过简单封装处理就可以用于Scala中。 Scala的缺点也很明显,比如版本之间不是很兼容(不过越来越稳定,版本移植慢慢变得容易),GPU的支持不好等等。还是在Spark编写Spark Stream和SparkSQL代码时第一次接触Scala,从那以后无法放手,看了很多资料,但回过头来看,学习scala个人认为在线的资料就足够了,比如这个网站Tour of Scala。在youtube上也可以找到大量的介绍Scala的视频,比如Scala的类型系统,函数编程设计思路,泛型编程支持,Monad & Monoid的概念等等。

    为什么不选择Python?毕竟这么多的机器学习框架都提供了Python封装。其实是资料看多了,感觉审美疲劳:-),当然了,生产环境中还是以Python为主,毕竟大部分的学习算法和知名机器学习框架都能找到Python的开源库。

    Java 8,虽然增加了对函数编程,流,lambda表达式的支持,但,对数据分析处理部分的支持依然略显冗态,或许Java 9,10等能变的好些。不过还是看现在和个人兴趣。

    基本数学概念及实现

    只介绍与本篇文章有关的数学概念,这些概念会直接用在本节的实例中,基本是统计学(Statistics)和线代(Algebra)中的概念,比如方差,期望,向量,向量之间的距离,相关性,矩阵基本概念等。机器学习中,多个特性(Feature)观察值一般用多维向量表示,比如用X向量表示某种小麦的特性值组,X=(x_{0},x_{1},x_{2},x_{3}...x_{n-1})x_{0}表示某种小麦的宽度 特性,x_{1}表示其长度特性,x_{2}表示其重量特性等... 在Scala中一般用observation:Vector[T]表示,其中T是类型参数,是Scala泛型的表示方式,可以是类型Int,Float,Double等等。


    观测对象一般不会是单个,观测对象组(Observation Set )一般会用矩阵的方式表示,在Scala中,我们用dataSet:List[Vector[T]]表示,dataSet一般会从文件加载。比如如下数据,X_{0}=(x_{00},x_{01},x_{02})...

    \begin{Bmatrix} x_{00}& x_{01} & x_{02} \\ x_{10}& x_{11} & x_{12} \\ x_{20}& x_{21} & x_{22} \\ x_{30}& x_{31} & x_{32} \\ ... \end{Bmatrix}
    这些基本的概念清楚了后,我么可以看看这些在后面KMeans学习算法中用到的基本数学概念和其代码的实现。

    1. 数学期望(Mean)
      期望表示一组数据的中间值,是和加的平均值,用公式表示:

    m=\frac{\sum_{i=0}^{n-1}x_{i}}{n}

    利用foldLeft操作很容易实现计算数学期望,实现如下:

    def mean(): Double = {
        lazy val sum = this.values.foldLeft(0.0)((x,y)=>x+y)
         sum / this.values.size
      }
    

    其中values的类型是Vector[Double],foldLeft表示左折叠运算,初始值为0,左折叠就是从左侧开始,初始值x=0.0和第一个值y=values(1)结对成(x,y)然后应用lambda表达式(x,y)=>x+y,和值s再和values(2)结对,再次应用公式(x,y)=>x+y,以此类推,直至数据列表遍历结束。

    1. 方差标准差(Variance and StdDev)
      方差用于量化衡量一组值距离中间值差距的波动大小,用公式表示如下:
      s=\sqrt{\frac{\sum_{i=0}^{n-1}{(x_{i}-m)}^{2}}{n}}

    方差和标准差的代码实现如下:

     def variance(): Double = {
        lazy val valMean = this.mean();
        val vSum = this.values.map(v => {
          val difference = (v - valMean)
          difference * difference
        }).foldLeft(0.0)((a, b) => a + b)
         vSum / this.values.size;
      }
      def stdDev(): Double = {
        lazy val vSum = this.variance();
        Math.sqrt(vSum)
      }
    

    方差Variance的实现,是采用了典型的Map-Reduce的计算方法,先把values中的每个值Map成其和期望mean的差值的平方,然后通过reduce操作(foldLeft)操作完成求和,最后取其均值返回。Scala的函数返回值语法和其他语言不同,没有明确指定return关键字,而是把最后一个逻辑值作为返回值。符合Scala设计目标,简洁尽量减少代码冗余。

    1. 多变量数据间距离 (Multi- Dimension Vector Distance)

    对于2维3维数据点之间的距离计算比较容易理解,多维度向量间的距离计算就变的更加抽象,通常有这几种方法,Euclidean距离,Manhatan距离和Minkowski距离,本文只介绍前两种(因为最简单),先假设有两个同维度的向量X,Y 分别表示为X=(x_{0},x_{1},x_{2},...x_{n-1}),Y=(y_{0},y_{1},y_{2}...,y_{n-1}),在Scala中可以表示为X:Vector[Double]表示。

    • Euclidean距离

    d(X,Y)=\sqrt{\sum_{i=0}^{n-1}(x_{i}-y_{i})^{2}}

    代码实现:

    trait  EuclideanDistance extends Distance {
      override def distance(x: Vector[Double], y: Vector[Double]): Double = {
        val dist = Math.sqrt((x, y).zipped.foldLeft(0.0)((s, t) => {
          val d = t._1 - t._2;
          s + d * d
        }))
        dist
      }
    }
    
    • Manhattan距离

    d(X,Y)=\sum_{i=0}^{n-1}\left | x_{i}-y_{i} \right |

    代码实现:

    trait ManhattanDistance[T] extends Distance {
      override def distance(x: Vector[Double], y: Vector[Double]): Double = {
        val dist = (x, y).zipped.foldLeft(0.0)((s, t) => s + Math.abs(t._1 - t._2))
        dist
      }
    }
    

    关于多维向量距离计算的Scala实现,对于trait的定义可能有些疑惑,trait可以翻译为特质,有点类似于Java中接口的定义,不同的是定义中可以包含实现(java8接口用default关键字支持接口函数的默认实现),trait中的方法可以在Scala实现类中重载,还可以被Mixin到类的定义中。可以看到EucleanDistance和ManhttanDistance的实现都扩展自Distance trait,Distance其定义如下:

    trait Distance {
      def distance(x: scala.Vector[Double], y: scala.Vector[Double]): Double
    }
    

    后面的实现中,我们也会注意到Cluster和KMeans的实现都会对Distance有依赖。

    KMeans 聚类

    KMeans算法是最为经典的基于划分的聚类方法,K-means算法的基本思路是:以空间中k个点为中心进行聚类,对最靠近他们的对象归类。通过迭代的方法,逐次更新各簇(Cluster)的中心值和其成员,直至得到最好的聚类结果。中心用Centroid表示。
    让我们假设k个簇(cluster)\{C_{k}\} 和其对应的中心Centroid\{m_{k}\},KMeans其实是一个优化问题(Optimization),目标是使数据点到所属簇中心点距离之和最小,可以用一下公式表示:

    \underset{C_{k}}{min} \sum_{1}^{k}\sum_{x_{i}\in C_{k}}^{ } d(x_{i},m_{k})

    KMeans聚类实现

    1. ClusterTrait及Cluster定义

    KMeans非常重要的概念是簇(Cluster),簇主要包括两属性一个是簇中心一般用centroid:Vector[Double]表示,另外一个是簇的成员用members:ListBuffer[Int]表示,有了这个两个属性,我们就可以定义ClusterTrait如下:

    trait ClusterTrait[T] {
      this: Distance =>
      val centroid: Vector[T]
      val members: ListBuffer[Int] = new ListBuffer[Int]()
    
      def +=(index: Int): Unit = members.append(index)
    
      def size: Int = members.size
    }
    

    在ClusterTrait[T]的定义中,除了members和Centroid的定义外,还重载了+=操作符,只是为了保存观测值(Observations)的索引,size属性返回簇的成员大小。
    this:Distance => 表示ClusterTrait要依赖于Distance Trait,是Scala中定义依赖注入DI(Dependence Injection)一种方式,它要求在继承ClusterTrait的地方必须要把DistanceTrait的实现混入(Mixnin)到类中。 Distance有两个继承者,实现了EuclideanDistance和ManhattanDistance。ClusterTrait依赖于Distance,主要是计算成员到centroid中心点的距离。有了ClusterTrait的定义,我们可以看Cluster定义:

    class Cluster[T <% Double](val centroid: Vector[Double]) extends ClusterTrait[Double] with EuclideanDistance {
    
      def calibrateCentroid(dataSet: List[Vector[T]]): Cluster[T] = {
        val sums = this.members.map(dataSet(_).map(_.toDouble)).toList.transpose.map(_.sum)
        val means = sums.map(s => s / this.members.size).toVector
        Cluster[T](means)
      }
    
      def stdDev(dataSet: List[Vector[Double]], distance1: Distance): Double = {
        val data = this.members.map(dataSet(_)).map(distance(this.centroid, _)).toVector
        val stats = Statistic(data)
        stats.stdDev()
      }
    }
    
    object Cluster {
      def apply[T <% Double](c: Vector[Double]): Cluster[T] = {
        new Cluster[T](c)
      }
    }
    
    

    类Cluster定义,扩展了ClusterTrait并且混入(Mixin)EuclideanDistance 特质,在Cluster的实现类里,多了两个函数,CalibrateCentroid用于矫正/移动簇中心点位置,矫正的办法很简单,把簇成员取出后安列维度求其均值,作为新的中心点位置。 Scala类定义的奇特之处在于,除了定义类本身外还可以定义其伴生对象(Componion Object)。伴生对象可以理解为Java中的静态类,或是工厂函数(Factory),apply函数是是其构造器函数。 list.transpose 是转置操作。

    [T <% Double]是类型视图绑定(Type View Bound),目的是支持隐式类型转变(Implicit Type Conversion)。

    另一个函数是求标准差的函数,这个函数是为了计算簇成员到其中心点距离大小的方差,用于衡量距离波动的大小。在后面迭代运算时,会用到。

    1. KMeans的实现
      先来看看KMeans有哪些特质,特质定义如下:
    trait KMeansTrait {
      this: Distance =>
      val k:Int
      val iterMaxTimes:Int
      def run(dataSet: List[Vector[Double]]): List[Cluster[Double]]
    }
    

    从定义上可以看到,KMeansTrait也依赖于Distance Trait,k是聚簇数,iterMaxTimes是最大迭代的次数,run会被实现类重载实现,run会完成迭代计算,然后返回聚簇的实例。其实现类代码:

    class KMeans(val k: Int, val iterMaxTimes: Int) extends KMeansTrait with EuclideanDistance {
    
      override def run(dataSet: List[Vector[Double]]): List[Cluster[Double]] = {
        val clusters = initialize(dataSet)
        if (clusters.isEmpty) List.empty
        else {
          val memberShip = Array.fill(dataSet.size)(0)
          assignToCluster(dataSet, clusters, memberShip)
          updateClusters(iterMaxTimes, dataSet, clusters, memberShip)
        }
      }
      //省略了几个函数,会在后面提及
    }
    
    object KMeans {
      def apply(k: Int, iter: Int, distance: Distance): KMeans = new KMeans(k, iter)
    }
    

    KMeans类扩展了KMeansTrait ,并且混入(Mixin)EuclideanDistance,用于距离计算。 run函数输入是整个观测数据集dataSet,返回的是聚簇完成的结果clusters:List[Cluster[Double]]。 run函数中调用initialize完成初始的cluster中心点计算。簇初始化是采用M. Agha and W. Ashour快速初始算法完成的,先看看代码:

    
    def initialize(dataSet: List[Vector[Double]]): List[Cluster[Double]] = {
        val maxStdDevDim = (0 to dataSet.transpose.size - 1).maxBy(
          dataSet.transpose.map(l => {
            val stats = Statistic(l.toVector);
            val stdDev = stats.stdDev()
            stdDev
          }))
        val sortedObservations = dataSet.zipWithIndex.map(v => {
          (v._1(maxStdDevDim), v._2)
        }).sortWith(_._1 < _._1)
    
        val halfSegSize = ((sortedObservations.size >> 1) / k).floor.toInt;
        val initialMeans = sortedObservations.filter(v => {
          (v._2 % halfSegSize) == 0 && v._2 % (halfSegSize << 1) == 0
        }).map(n => dataSet(n._2))
    
        val initialClusters = initialMeans.foldLeft(ListBuffer[Cluster[Double]]())((listBuffer, v) => {
          listBuffer.append(Cluster[Double](v))
          listBuffer
        })
        initialClusters.toList
      }
    

    先用语言简单描述一下这个算法,算法的目标是快速初始化k个Clusters,确定每个Cluster的中心点centroid的值。选择Centroid的方法是dataSet按列维求其标准差,然后取最大方差所在的列进行排序,排序后选择将其分成k个区间,取区间内的中心位置的值作为Cluster的初始化Centroid。 最后返回initialClusters。 此时的每个簇(cluster)成员依然为空。 初始化了簇后,我们然后分配和更新Cluster。 这个主要是在函数updateClusters中完成的,代码如下:

      def updateClusters(iterTimes: Int, dataSet: List[Vector[Double]], initClusters: List[Cluster[Double]], memberShip: Array[Int]): List[Cluster[Double]] = {
        var newClusters: List[Cluster[Double]] = List.empty;
        breakable {
          for (i <- 0 to iterTimes) {
            newClusters = initClusters.map(
              cluster => {
                if (cluster.size > 0)
                  cluster.calibrateCentroid(dataSet.toList)
                else {
                  initClusters.filter(_.size > 0)
                    .maxBy(_.stdDev(dataSet, distance))
                }
              })
            val reassignedSize = assignToCluster(dataSet, newClusters, memberShip)
            if (reassignedSize == 0) break()
          }
        }
        newClusters
      }
    

    updateClusters是个迭代算法,也可以用递归的算法实现。 退出条件有两个:

    1. 达到最大迭代次数
    2. reasignedSize为零,也就是没有数据在簇间移动了。

    或许有人对breakable感到奇怪,scala中没有提供像Java那样,对continue和break的直接支持,只能通过这种方式跳出for循环。 memberShip记录了dataSet的数据被分配到那个簇中。

    def assignToCluster(dataSet: List[Vector[Double]], clusters: List[Cluster[Double]], memberShip: Array[Int]): Int = {
        val reassignedSize = dataSet.zipWithIndex.count(v => {
          val closestCluster = getClosestCluster(v._1, clusters)
          val reassigned = memberShip(v._2) != closestCluster
          clusters(closestCluster) += v._2
          memberShip(v._2) = closestCluster
          reassigned
        })
        reassignedSize
      }
    
    def getClosestCluster(observation: Vector[Double], clusters: List[Cluster[Double]]): Int = {
        val closestClusterIndex = (0 to clusters.size - 1).minBy(clusters.map(c => {
          val dist = distance(c.centroid.toVector, observation)
          dist
        }))
        closestClusterIndex
      }
    

    getClosestCluster是对指定数据(dataSet的数据记录),计算其到各个聚簇的中心(Centroid)的距离,然后选出最近的簇并返回其索引值。assignToCluster完成分配数据到簇,并记录重新分配簇的次数。

    总结

    KMeans算法是最经典的非监督式机器学习分类算法之一,其中用到的数学工具也比较简单。此类算法一般以python实现居多,本篇只是尝试用Scala重新组织实现KMeans。Scala作为函数编程语言,对类的封装性也有很好的支持,其本质上对流(Stream)概念的支持,对Map-Reduce 模式的支持,对trait的支持,丰富的Collections库,对函数编程的支持,使其特别适合于做数据处理。

    以上完整的代码可以在这找到代码GitHub 工程是sbt组织的, 中还有一些其它功能代码实现,比如数据加载,数据可视化等等,都是很简单的实现,以后会逐步丰富。工程中还包括了Seed data和Iris data数据为验证或测试。工程中用单元测试是基于ScalaTest,有兴趣可以研究一下。

    参考资料

    1. KMeans Machine Learning
    2. Tour of Scala
    3. 机器学习
    4. Breeze Lib
    5. Euclidean distance
    6. Manhatan Distance
    7. Seed Dataset
    8. Iris Dataset
    9. Algorithm of initialization of Clusters
    10. ScalaTest
    11. Scala Type System

    相关文章

      网友评论

          本文标题:机器学习-Scala实现(一)

          本文链接:https://www.haomeiwen.com/subject/hmnpuftx.html