spark 实现感知机算法
感知机
感知机是二分类的线性分类问题,由Rosenblatt在1957年提出,虽然年代久远,但是感知机简单实用易实现,并且是深度学习和神经网络的基础。感知机模型的输入是样本的特征向量,输出是样本的类别,分别为“+1”,“-1”,如果训练样本是线性可分的数据集,则感知机学习将会得到一个将训练数据集的两类样本点完全正确分开的超平面。非线性可分数据集则不能训练得到这个超平面,对于非线性可分的数据集做训练只能通过其他优化方法得出一个较好的模型。
感知机模型
由输入空间到输出空间的函数:f(x) = sing(w·x+b),其中w称为权值向量,b称为偏置,sign是符号函数:
感知机模型的几何解释:线性方程 w·x+b = 0 对应于特征空间R的一个超平面S,其中
w是超平面的法向量,b是超平面的截距。这个超平面正好将样本空间划分成两部分,将正负两类数据完全分开。
感知机算法实现步骤
感知机算法原始形式的实现步骤:
输入:训练数据集T=(x1,y1),(x2,y2),...,(xN,yN),yi∈{−1,+1},学习率η(0<η<1)
输出:w,b;感知机模型f(x)=sign(w⋅x+b)
(1)赋初值 w0,b0
(2)选取数据点(xi,yi)
(3)根据判定函数判断样本点是否误判:即yi(w·xi+b)<=0,如果误判则更新w和b:
(4)转到第二步,知道训练集中没有误判点。
感知机算法有两种形式 1. 原始形式算法,2.对偶形式,两种形式的实现步骤基本一样,只是对于权值向量(w)和偏置(b)的计算不同。本文只实现了算法的原始形式。更多信息请读者自行参考相关资料。
关于感知机算法的损失函数计算,对偶形式、以及相关证明等其他信息,读者可以参考李航老师的《统计学习方法》一书。
代码实现
package ML2_3.classification.pla
import breeze.linalg.{DenseVector => densevector}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import scala.util.Random
/**
* Created by WZZC on 2019/3/4
* 针对线性可分的数据集构建感知机模型
**/
object pla {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val data = spark.read.option("inferSchema", true)
.option("header", true)
.csv("F:\\DataSource\\pla.csv")
data .show()
val schema = data.schema
val fts = schema.filterNot(_.name == "lable").map(_.name).toArray
val amountVectorAssembler: VectorAssembler = new VectorAssembler()
.setInputCols(fts)
.setOutputCol("features")
val vec2Array = udf((vec: DenseVector) => vec.toArray)
val dataFeatrus = amountVectorAssembler
.transform(data)
.select($"lable", vec2Array($"features") as "features")
var initW = densevector.rand[Double](fts.length) //创建一个初始化的随机向量作为初始权值向量
var initb = Random.nextDouble() // 初始偏置
var flag = true
val lrate = 0.2 //学习率
// 定义判定函数
val signudf = udf((t: Seq[Double], y: Double) => {
val wx = initW.dot( densevector(t.toArray) )
val d = wx + initb
val ny = if (d >= 0) 1 else -1
ny
})
var resDf = spark.createDataFrame(sc.emptyRDD[Row], dataFeatrus.schema.add("nG", IntegerType))
while (flag) {
val df = dataFeatrus.withColumn("sign", signudf($"features", $"lable"))
val loss = df.where($"sign" =!= $"lable")
val count = loss.count()
println("error:" + count)
if (count == 0) {
resDf = df
flag = false
}
else {
// w1 = w0 + ny1x1
//随机选择一个误判样本
val rand = Random.nextInt(loss.count().toInt) + 1
println(rand)
val randy = loss
.withColumn("r", row_number().over(Window.partitionBy($"lable").orderBy($"lable")))
.where($"r" === rand)
.head()
val y = randy.getAs[Int]("lable")
//更新w和b
initW = initW + densevector(randy.getAs[Seq[Double]]("features").toArray).map(_ * y * lrate)
// b1 = b0 + y
initb = initb + y * lrate
}
}
println(initW, initb)
spark.stop()
}
}
数据原始格式与训练结果对比
// 数据源格式
+---+---+-----+
| x1| x2|lable|
+---+---+-----+
|5.1|3.0| 1|
|5.7|4.1| 1|
|5.1|1.4| -1|
|4.9|1.4| -1|
// 训练结果展示
+-----+----------+----+
|lable| features|sign|
+-----+----------+----+
| 1|[5.1, 3.0]| 1|
| 1|[5.7, 4.1]| 1|
| -1|[5.1, 1.4]| -1|
| -1|[4.9, 1.4]| -1|
// 模型系数
(DenseVector(-1.107892749276022, 2.043043387141929),-0.3484139882261944)
在本案例中,只针对线性可分的数据集进行训练,对于非线性可分的处理在下次分享中再给读者们介绍。由于作者水平有限,在介绍及实现过程中难免有纰漏之处,感谢细心的朋友指正。
参考资料:
《统计学习方法》 --李航
网友评论