构建机器学习系统,根据业务需求和使用工具的不同,可能会有些区别。不过主要流程应该差别不大,基本包括数据抽取、数据探索、数据处理、建立模型、训练模型、评估模型、优化模型、部署模型等阶段,在构建系统前,我们需要考虑系统的扩展性、与其他系统的整合、系统升级及处理方式等。这章我们主要介绍针对基于Spark机器学习的架构设计或系统构建的一般步骤、需要注意的一些问题。
本章主要介绍构建Spark机器学习系统的一般步骤:
介绍系统架构
启动集群
加载数据
探索数据
数据预处理
构建模型
模型评估
模型优化
模型保存
2.1机器学习系统架构
Spark发展非常快,到我们着手编写本书时,Spark已升级为2.1版,这2.0以后,Spark大大增强了数据流水线的内容,数据流水线的思路与SKLearn非常相似,我想这种思路或许是未来的一个趋势,使机器学习的流程标准化、规范化、流程化,很多原来需要自己编写代码都有现成的模块或函数,模型评估、调优这些任务也可实现了更高的封装,这大大降低机器学习门槛。
Spark机器学习系统的架构图
其中数据处理、建模训练,我们可以进行组装成流水线方式,对模型评估及优化可以采用自动化方式。
2.2启动集群
Spark集群的安装配置,这里不做详细介绍,我们提供了本书可操作云平台,对Spark集群的安装配置感兴趣的读者,可参考由我们编写的《自己动手做大数据系统》。
Spark运行方式有本地模式、集群模式,本地模式所有的处理都运行在同一个JVM中,而后者,可以运行在不同节点上。具体运行方式主要有:
Spark运行模式
本文主要以Spark Standalone(独立模式)为例,如果想以其他模式运行,只要改动对应参数即可,并参考我写的另一个文集(PySpark实战)
Spark支持Scala或Python的REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式程序编写,交互式编程,输入的代码执行后立即能看到结果,非常友好和方便。
在2.0之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。SparkContext与驱动程序(Driver Program)和集群管理器(Cluster Manager)间的关系如图所示:
SparkContext与驱动程序、集群管理器间的关系图从图中可以看到SparkContext起中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,Driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext。
不过在Spark 2.0中引入SparkSession对象(spark),运行Spark shell则自动创建一个SparkSession对象,在输入spark时就会发现它已经存在了(参考图2.图2-3),SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,它为用户提供了一个统一的切入点,同时也提供了各种DataFrame和Dataset的API,大大降低了学习Spark的难度。
启动Spark shell界面上图是启动Spark的集群的界面,编程语言是Scala,如果希望使用Python为编辑语句,该如何启动呢?运行pyspark即可。
启动PySpark的客户端2.3加载数据
这里以MovieLens 100k(http://files.grouplens.org/datasets/movielens/ml-100k.zip)数据集中的用户数据(u.data)为例,首先在本地查看数据的基本信息,然后把本地文件复制到HDFS上,Spark或PySpark读取读取hdfs上的数据。
查看u.user文件的基本信息,数据样例,总记录数等信息。
$ head -3 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
$ cat u.user |wc -l
943
u.user用户数据每列的含义为:user id | age | gender | occupation | zip code,即用户ID,用户年龄,用户性别、用户职位、所在地邮编等信息,列间的分隔符为竖线(|),共有943条记录。
如何把用户信息复制到HDFS上?首先,查看当前HDFS的目录信息。
$ hadoop fs -ls /u01/bigdata/
Found 2 items
drwxr-xr-x - hadoop supergroup 0 2017-02-07 03:20 /u01/bigdata/data
drwxr-xr-x - hadoop supergroup 0 2016-07-20 09:16 /u01/bigdata/hive
由此可知在HDFS已有/u01/bigdata/data目录(如果没有目录可以通过hadoop fs -mkdire /u01/bigdata/data命令创建。),通过以下命令,把本地文件u.user复制到HDFS上。
$ hadoop fs -put u.user /u01/bigdata/data
//查看HDFS上的文件
$ hadoop fs -ls /u01/bigdata/data
-rw-r--r-- 1 hadoop supergroup 22628 2017-03-18 13:37 /u01/bigdata/data/u.user
把电影评级数据(u.data)、电影数据(u.item)等复制到HDFS方法相同,把本地数据复制到HDFS后,Spark如何读取加载HDFS上的文件?我们可以通过Spark的textFile方法读取。这里我们以PySpark为例,启动PySpark客户端,导入需要是的包,然后通过textFile方法读取HDFS上的数据,具体请看以下示例:
###以spark独立模式,启动Pyspark客户端
pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2
###导入需要的包
from pyspark.sql import SparkSession
from pyspark.sql import Row
##初始化sparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
###加载数据,并处理分割符数据
sc = spark.sparkContext
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))
###利用反射机制推断模式(Schema),把dataframe注册为一个table
df = userrdd.map(lambda fields: Row(userid=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")
2.4探索数据
生产环境中数据往往包含很多脏数据,如缺失数据、不一致、不规范、奇异数据等等,所以数据加载后,数据建模前,需要对数据进行分析或探索,尤其面对大数据,了解数据的统计信息、数据质量、数据特征等,为数据处理、数据建模提供重要依据,在进行这些数据分析时,如果能实现数据的可视化,当然更利于我们理解数据。
2.4.1 数据统计信息
加载数据后,首先关注的数据的统计信息,有了数据统计信息,我们对数据就有了一个大致了解,如数据特征的最大值、最小值、平均值、分位数、方差等。这些信息有助于我们理解数据质量、数据构成,为数据预处理提供重要依据。
查看用户各字段的统计信息:
schemauser.describe("userid","age", "gender","occupation","zip").show()
+-------+-----------------+-----------------+------+-------------+------------------+
|summary| userid| age|gender| occupation| zip|
+-------+-----------------+-----------------+------+-------------+------------------+
| count| 943| 943| 943| 943| 943|
| mean| 472.0|34.05196182396607| null| null| 50868.78810810811|
| stddev|272.3649512449549|12.19273973305903| null| null|30891.373254138158|
| min| 1| 7| F|administrator| 00000|
| max| 99| 73| M| writer| Y1A6B|
+-------+-----------------+-----------------+------+-------------+------------------+
从以上统计可以看出,用户表总记录数为943条,年龄最小为9岁,最大为73岁,平均年龄为34岁。
2.4.2 数据质量分析
数据质量分析是数据探索阶段重要一环,数据不是完美的,大多数据大多包含缺少数据、不一致数据、异常数据、噪音数据等。没有可信的数据,再好的模型性能都太可能好,正所谓“垃圾进,垃圾出”。
数据质量方面的分析,主要包括以下几个方面:
1)缺失值;
2)异常值;
3)不一致的值
4)错误数据
数据集下载
本节以一份某酒店的销售额的数据为例,来说明在数据探索中,对数据质量的一般分析方法,主要涉及缺少值、异常值、不一致数据等。
##以spark独立模式,启动Pyspark客户端
pyspark --master spark://master:7077 --driver-memory 1G --total-executor-cores 2
###导入需要的库
import pandas as pd
import matplotlib.pyplot as plt
###加载数据,使用标题行
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)
##查看df的统计信息
df.count() ##统计非空值记录数
sale_date 200
sale_amt 198 ###说明sale_amt有两个空值
df.describe() ###获取df的统计信息
sale_amt
count 198.000000
mean 2765.545152
std 709.557639
min 22.000000
25% 2452.725000
50% 2655.850000
75% 3023.500000
max 9106.440000
#建立图像
plt.figure()
#画箱线图
bp = df.boxplot()
# flies为异常值的标签
x = bp['fliers'][0].get_xdata()
y = bp['fliers'][0].get_ydata()
y.sort()
#用annotate添加注释
for i in range(len(x)):
plt.annotate(y[i], xy = (x[i],y[i]), xytext=(x[i]+0.1-0.8/(y[i]-y[i-1]),y[i]))
plt.show()
销售额箱型图检测异常值
从以上分析,可知,销售额列存在两个空值、6个可能的异常值,其中865.0,1060.0有可能属于正常值,当然也需要和也相关业务员沟通,对其他异常值,需要进一步分析异常值产生的原因,然后,确定数据的去留。
2.4.3 数据特征分析
对数据质量有基本了解后,接下来就可就数据的特征进行分析,数据特征分析一般包括以下一些内容:
- 特征分布分析
- 对比分析
- 统计量分析
特征一般指用于模型训练的变量,原始数据中特征,有些是数值,有些是字符或其他格式信息,但在进行机器学习前,都需要转换为数值。根据实际情况,有时需要根据已有特征生成或衍生出新特征,如根据用户年龄衍生出表示老、中、青的新特征;有时需要对一些特征进行规范化、标准化等转换,尤其对回归类模型。
特征分布分析
特征的分布分析有助于发现相关数据的分布特征、分布类型、分布是否对称等,可以使用数据可视化方法,易直观发现特征的异常值等。以用户信息数据为例,分析用户的年龄特征、职业特征等。
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
# 加载textfile文件并转换为行式
userrdd = sc.textFile("hdfs://master:9000/u01/bigdata/data/u.user").map(lambda line: line.split("|"))
#利用反射机制把RDD转换为DataFrame
df = userrdd.map(lambda fields: Row(name=fields[0], age=int(fields[1]),gender=fields[2],occupation=fields[3],zip=fields[4]))
# 把dataframe注册为一个table.
schemauser = spark.createDataFrame(df)
schemauser.createOrReplaceTempView("user")
# 在table上运行SQL.
age = spark.sql("SELECT age FROM user")
#把运行结果转换为RDD
ages = age.rdd.map(lambda p: p.age).collect()
hist(ages, bins=20, color='lightblue', normed=True)
用户年龄特征分布图
从以上图形可以看出,最小年龄在10岁左右,最大年龄超过70岁,大部分是20岁到40岁之间。
我们还可以进一步分析用户职业分布特征。
# 选取用户职业数据.
count_occp = spark.sql("SELECT occupation,count(occupation) as cnt FROM user Group by occupation order by cnt")
#查看前5行数据
count_occp.show(5)
+----------+---+
|occupation|cnt|
+----------+---+
| homemaker| 7|
| doctor| 7|
| none| 9|
| lawyer| 12|
| salesman| 12|
+----------+---+
#获取职业名称及职业数,以便画出各职业对应总数图形
#把运行结果转换为RDD
x_axis = count_occp.rdd.map(lambda p: p.occupation).collect()
y_axis = count_occp.rdd.map(lambda p: p.cnt).collect()
pos = np.arange(len(x_axis))
width = 1.0
###隐式新增一个figure,或为当前figure新增一个axes
ax = plt.axes()
ax.set_xticks(pos + (width / 2)) ###设置x轴刻度
ax.set_xticklabels(x_axis) ####在对应刻度打上标签
plt.bar(pos, y_axis, width, color='orange')
plt.xticks(rotation=30) ####x轴上的标签旋转30度
fig = matplotlib.pyplot.gcf() ###获取当前figure的应用
fig.set_size_inches(16, 10) ###设置当前figure大小
用户职业分布图
从以上用户职业分布图,可以看出,学生占绝大多数,其次其他职业、教育工作者、管理者、工程师等。医生、家庭主妇或许平时较忙,故数量比较少。
特征分布及相关性分析
在数据探索阶段,分析特征分布,特征间的相关性等,对应后续的特征选择、特征提取将提供重要依据,以下是对类似共享单车数据的特征分析
###探索特征间分布、相关性等
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
data_pd=data1.toPandas()
sns.set(style='whitegrid',context='notebook')
cols=['temp','atemp','label']
sns.pairplot(data_pd[cols],size=2.5)
plt.show()
###### 对比分析
###导入需要的库
import pandas as pd
###把日期列作为索引,并转换为日期格式
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0,index_col='sale_date',parse_dates=True)
###把空值置为0
df1=df.fillna(0)
###根据年月求和
df_ym=df1.resample('M',how='sum')
##取年月
df2=df_ym.to_period('M')
##数据可视化
df2.plot(kind='bar',rot=30)
销售月份对比图
2.4.4 数据的可视化
数据的可视化是数据探索、数据分析中重要任务,通过可视化可帮助我们发现数据的异常值、特征的分布情况等,为数据预处理提供重要支持。Spark目前对数据的可视化功能还很弱或还没有,不过,没关系,我们可以借助Python或R等可视化功能,Python和R在数据可视化方面功能很强大,这里以Python的数据可视化为例。Python的数据表现能力很强,可以2D或3D等方式展示,视化可以使用matplotlib或plot等方法。matplotlib是一种比较低级但强大的绘图工具,可以进行很多定制化,但往往需要较大代码来实现;Plot是一种非常简洁的绘图工具,它主要基于pandas基础之上,以下我们通过两个示例来具体说明:
下例是通过matplotlib可视化sin(x)和cos(x)函数的图形。
# -*- coding: utf-8 -*-
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif']=['SimHei'] ###显示中文
plt.rcParams['axes.unicode_minus']=False ##防止坐标轴上的-号变为方块
x = np.linspace(0, 10, 100)
y = np.sin(x)
y1 = np.cos(x)
##绘制一个图,长为10,宽为6(默认值是每个单位80像素)
plt.figure(figsize=(10,6))
###在图列中自动显示$间内容
plt.plot(x,y,label="$sin(x)$",color="red",linewidth=2)
plt.plot(x,y1,"b--",label="$cos(x^2)$") ###b(blue),--线形
plt.xlabel(u"X值") ##X坐标名称,u表示unicode编码
plt.ylabel(u"Y值")
plt.title(u"三角函数图像") ##t图名称
plt.ylim(-1.2,1.2) ##y上的max、min值
plt.legend() ##显示图例
plt.savefig('fig01.png') ##保持到当前目录
plt.show()
运行结果如下:
matplot数据可视化同样的这些数据,如果我们对这些数据使用plot来进行可视化,代码可以非常简洁,但定制化方面可能要弱一些。
from pandas import DataFrame
import pandas as pd
import numpy as np
x = np.linspace(0, 10, 100)
df=DataFrame({'sin(x)':np.sin(x),'cos(x)':np.cos(x)},index=x)
df.plot()
显示图形如图2-11所示下:
plot数据可视化从以上实现代码可以看出,如果使用plot则非常简单,虽然定制化要比matplotlib少些,但其可定制的项也不少,如kind,rot,title,legend等等。
2.5 数据预处理
前面我们介绍了探索数据的一些方法,通过对数据的探索,可以帮助我们发现一些奇异值、缺失值、一些特征的类别及其分布情况等信息。而这些信息正是对数据预处理的重要依据。在数据分析、机器学习中,数据的预处理是一个非常关键、尤其是涉及大数据的处理,往往是比较费时、费神的一个过程,有时,还需要往返多次。当然,如果数据预处理得好,除提高数据质量外,更能极大提高模型的性能,反之,对模型的影响也是很大,甚至可能垃圾进,垃圾出。
数据的预处理一般包括数据清理、数据转换、数据集成、数据归约等。这些预处理主要内容可以通过以下图形2-12来表示:
数据预处理示意图2.5.1数据清理
数据清理主要任务是填补缺失值、光滑噪声数据、处理奇异数据、纠正错误数据、删除重复数据、删除唯一性属性、去除不相关字段或特征、处理不一致数据等。噪声数据的处理方法:分箱、聚类等。以下分别以处理缺失数据、异常数据为例,说明在spark中如何处理。
1. 处理缺失值
import pandas as pd
##读取HDFS上的数据
df=pd.read_csv("/home/hadoop/data/catering_sale.csv",header=0)
##定位数据集中的空值
df[df.isnull().values==True]
##显示结果如下,说明有2个空值
sale_date sale_amt
13 2015/2/14 NaN
32 2015/1/26 NaN
###以0填补空值
df.fillna(0)
##或该列的平均值填补空值
df['sale_amt'].fillna(df['sale_amt'].count())
##或用该列前一行值填补空值
df.fillna(method='pad')
2. 处理奇异值
在数据探索阶段,我们发现销售数据文件catering_sale.csv中有6个可能的奇异值,假设与相关人员核实后,只有22为奇异值或错误数据,对错误数据我们一般采用删除或替换的方法,这里我们采用Spark SQL来处理奇异数据。
首先把数据复制到HDFS,用Spark读取数据,如果启动pyspark,则可以通过spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)读取;如果启动spark-shell启动,则可以采用
spark.read.option("header","true").csv("hdfs://192.168.1.112:9000/home
/hadoop/data/catering_sale.csv")的方式读取。
#读取CSV文件,保留文件标题,并创建spark 的一张derby数据库的表
df=spark.read.csv("/home/hadoop/data/catering_sale.csv",header=True)
##转换数据类型
df1=df.select(df['sale_date'],df['sale_amt'].cast("Double"))
###假设把22.0奇异值替换为200.0
df1.replace(22.0,200.0,'sale_amt')
这里我们使用了DataFrame的select、replace等方法,实际上df还有很多可利用的方法或函数,可以通过df.+Tab键查看:
image这些方法或函数的具体使用,可以通过df.方法名?的方式查看,下例为查看df.filter的详细用法:
image此外,我们还可以使用大量spark.sql.functions或pyspark.sql.functions,以下是使用去除字段左右空格、截取字段长度等内置函数示例:
from pyspark.sql.functions import *
###去空格
df.select(trim(df.sale_date)).show()
###去年份
df.select(substring(df.sale_date,1,4).alias('year'),df.sale_amt).show()
2.5.2 数据变换
数据变换是数据预处理中一项重要内容,如对数据进行数据的规范化、离散化、衍生指标、类别特征数值化、平滑数据等都属于数据变换。数据变换Spark ML有很多现成的算法,利用这些算法可极大提高整个数据处理的效率,下表只是为一个概况,更多更详细信息请可参考第4章。
Spark ML自带的数据变换算法这里我们以卡方检验为例,如何根据特征的贡献率来选择特征。假设我们很多特征,如:表示时间的特征:季节(season)、年月(yr)、月份(mnth)、是否节假日(holiday)、是否周末(weekday);表示天气的特征weathersit,temp等等,为了使用卡方检验来选择这些特征,首先需要把各特征组合一个特征向量,然后,把整合后特征向量、及选择特征个数等代入卡方模型中,详细代码如下:
//定义特征向量
featuresArray =["season","yr","mnth","hr","holiday","weekday","workingday",\
"weathersit","temp","atemp","hum","windspeed"]
###把各特征组合成特征向量features
assembler = VectorAssembler(inputCols=featuresArray,outputCol="features")
###选择贡献度较大的前5个特征
selectorfeature = ChiSqSelector(numTopFeatures=5, featuresCol="features",outputCol="selectedFeatures", labelCol="label")
2.5.3数据集成
数据集成是数据预处理的重要内容之一,将多文件或者多数据库中的数据进行合并,然后存放在一个一致的数据存储中。数据集成一般通过join或union、merge等关键字把两个(或多个)数据集连接在一起,Spark SQL(包括DataFrame)有join方法,Pandas下有merge方法。数据集成往往需要耗费很多资源,尤其是大数据间的集成涉及到shuffle过程,有时需要牵涉到多个节点,数据集成除了数据一致性外,性能问题常常不请自来,需要我们特别留心。
传统数据库一般是单机上采用hash join方法,如果在分布式环境中,采用join时,可以考虑充分利用分布式资源进行平行化,当然,在进行join之前,对数据过滤或归约也是常用的优化方法。
Spark SQL中有三种join方法:
-
broadcast hash join:
如果join的表中有一张大表和一张较少的表,可以考虑把这张小表广播分发到另一张 大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。 -
shuffle hash join:
如果两张表都不小,对数据量较大的表进行广播分发就不太适合。这种情况下,可以根 据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区, 这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。 -
sort merge join:
如果两张表都比较大,可以考虑使用sort merge join方法,先将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理,然后,对单个分区节点的两表数据,分别进行排序,最后,对排好序的两张分区表数据执行join操作。
当然,如果两表都不大,可以直接使用hash join。
DataFrame中join有(或merge):内连接、左连接、右连接等。
2.5.4数据归约
大数据是机器学习的基础,但大数据往往数据量非常大,有时我们可以通过数据归约技术,删除或减少冗余属性(或维)、精简数据集等,使归约后数据比原数据小或小得多,但仍然接近于保持原数据的完整性,并结果与归约前结果相同或几乎相同。
Spark ML 自带的数据选择算法选择特征或降维是机器学习中重要的处理方法,我们可以使用这些方法在减少特征个数、消除噪声等问题的同时,维持原始数据的内在结构或主要特征。尤其是降维,在大数据、机器学习中发挥中重要作用,以下通过两个实例说明SVD、PCA具体使用。目前Spark MLlib支持SVD及PCA。
import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.SingularValueDecomposition import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = Array( Vectors.dense(1,2,3,4,5,6,7,8,9), Vectors.dense(5,6,7,8,9,0,8,6,7), Vectors.dense(9,0,8,7,1,4,3,2,1), Vectors.dense(6,4,2,1,3,4,2,1,5), Vectors.dense(4,5,7,1,4,0,2,1,8)) val dataRDD = sc.parallelize(data, 2) val mat: RowMatrix = new RowMatrix(dataRDD) //保留前3个奇异值,需要获得U成员 val svd = mat.computeSVD(3, computeU = true) //通过访问svd对象的V、s、U成员分别拿到进行SVD分解后的 //右奇异矩阵、奇异值向量和左奇异矩阵: val U: RowMatrix = svd.U //左奇异矩阵 val s: Vector = svd.s //从大到小的奇异值向量 [30.88197557931219,10.848035248251415,8.201924156089822] val V: Matrix = svd.V //右奇异矩阵 -0.33309047675110115 0.6307611082680837 0.10881297540284612 -0.252559026169606 -0.13320654554805747 0.4862541277385016 -0.3913180354223819 0.3985110846022322 0.20656596253983592 -0.33266751598925126 0.25621153877501424 -0.3575093420454635 -0.35120996186827147 -0.24679309180949208 0.16775460006130793 -0.1811460330545444 0.03808707142157401 -0.46853660508460787 -0.35275045425261 -0.19100365291846758 -0.26646095393100677 -0.2938422406906167 -0.30376401501983874 -0.4274842789454556 -0.44105410502598985 -0.4108875465911952 0.2825275707788212
同样这个矩阵data,以下我们用PCA进行分解,看一下效果及与SVD的异同,SVD分解后右奇异矩阵V与PCA降维后的矩阵pc很相似。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = Array( Vectors.dense(1,2,3,4,5,6,7,8,9), Vectors.dense(5,6,7,8,9,0,8,6,7), Vectors.dense(9,0,8,7,1,4,3,2,1), Vectors.dense(6,4,2,1,3,4,2,1,5), Vectors.dense(4,5,7,1,4,0,2,1,8)) val dataRDD = sc.parallelize(data, 2) val mat: RowMatrix = new RowMatrix(dataRDD) val pc: Matrix = mat.computePrincipalComponents(3) -0.3948204553820511 -0.3255749878678745 0.1057375753926894 0.1967741975874508 0.12066915005125914 0.4698636365472036 -0.09206257474269655 -0.407047128194367 0.3210095555021759 0.12315980051885281 -0.6783914405694824 -0.10049065563002131 0.43871546256175087 -0.12704705411702932 0.2775911848440697 -0.05209780173017968 0.10583033338605327 -0.6473697692806737 0.422474587406277 -0.27600606797384 -0.13909137208338707 0.46536643478632944 -0.172268807944553 -0.349731653791416 0.4376262507870099 0.3469015236606571 0.13076351966313637
使用PCA降维,利用pyspark的画图功能,可以新生成的特征的方差贡献度进行可视化,下图为对hour.csv数据,通过PCA处理后,重要特征的排序情况:
hour.csv数据的PCA分析图2.6 构建模型
前面我们介绍了准备阶段,包括加载数据、探索数据、预处理数据等,数据准备阶段往往是最费时间和精力的,常常这个问题解决了,又会出现新问题,经常需要返回多次。一般而言数据准备阶段从时间上来说可能要占据60%左右,有时更多。 数据准备后以后,接下来就是构建模型,模型是机器学习、数据挖掘等的核心,构建模型涉及确定模型或算法、设置参数、运算模型等,其大致流程如图所示。
构建模型流程选择算法主要依据业务需求、数据特征等,Spark目前支持分类、回归、推荐等这些常用而且重要的算法,具体可参考表所示。一种类型往往有几种算法,如分类可以逻辑、决策树等,如何选择算法,需要考虑业务需求、数据特征、算法适应性、个人经验等,当然,也可选择几种方法,然后进行比较,或采用集成学习的方式,复合多种算法也是选项之一,如先采用聚类方法对数据进行聚类,然后对不同类别的数据进行预测或推荐,有时会得到更好的结果。如果你觉得选择比较难或还不好确定,可以先从简单或熟悉的方法开始,然后,不断完善和优化。
Spark ML 目前支持的算法确定算法后,一般还需要设置一些参数,如训练决策树时需要选择迭代次数、纯度计算方法、树的最大高度等,此外,对准备好的数据需要进行划分,一般划分为训练数据和测试数据,有的会把训练数据进一步划分为训练数据集、验证数据集。Spark 提供多种随机划分数据的方法,如randomSplit、CrossValidator等。这些方法的具体使用在2.8节模型调优中将会具体说明。 训练数据用于训练模型,测试数据用于验证模型,因这个环节的验证是在模型训练过程中验证,所以它一般也认为隶属于模型建立过程。这种验证方法一般称为交叉验证(CrossValidator,CV),有些交叉验证把数据分成K组,如K折交叉验证(K-fold Cross Validator,K-CV ),在K折-交叉验证中,采用不重复地随机将数据集划分为K对,如果K=3,则将产生3个(训练,测试)数据集对,每个数据集使用2/3的数据进行训练,1/3进行测试。,这样会得到3个模型,用这3个模型的平均数作为最终模型的性能指标。K-CV可以有效的避免欠学习状态的发生,其结果也比较具有说服性。
2.7 模型评估
模型构建以后,接下来就需要对该模型的性能、与目标的切合度等进行一些评估,模型评估是模型开发过程的不可或缺的一部分。在构建模型的过程中,会产生一些评估指标,如精确度、ROC、RMSE等等,这这些指标是重要而且基础的,但应该不是唯一和最终指标,除了这些指标外,我们还应该评估模型对业务的提示或商业目标的达成等方面贡献。一个好的模型不但要有好的技术指标,更要为解决实际问题提供帮助,有时后者显得更为重要。 Spark中常用的几个评估算法有: 均方差(MSE,Mean Squared Error): (∑(prec-act)**2)/n(prec为预测值,act为实际值,n为总样本数) 均方根差(RMSE,Root Mean Squared Error): 就是MSE开根号 平均绝对值误差(MAE,Mean Absolute Error): (∑|prec-act|)/n 在了解正确率、准确率之前,我们先看一个所谓的混淆矩阵(confusion matrix):
image2.15混淆矩阵
混淆矩阵是一个简单矩阵,用于展示一个二分类器的预测结果,其中,T-True、F-False、N-Negative、P-Postitive。 真正(TP)被模型预测为正的正样本数;可以称作判断为真的正确率; 真负(TN) 被模型预测为负的负样本数;可以称作判断为假的正确率; 假正(FP) 被模型预测为正的负样本数;可以称作误报率; 假负(FN) 被模型预测为负的正样本数;可以称作漏报率. 正确率(Accuracy): A = (TP + TN)/(P+N) = (TP + TN)/(TP + FN + FP + TN) 反映了分类器统对整个样本的判定能力——能将正的判定为正,负的判定为负。 错误率(Error): E= (FP + FN)/(P+N) = (FP + FN)/(TP + FN + FP + TN) 准确率(Precision) P = TP/(TP+FP) ; 反映了被分类器判定的正例中真正的正例样本的比重 召回率(Recall): R = TP/(TP+FN) = 1 - FN/T; 反映了被正确判定的正例占总的正例的比重 F1-Measure: F1=2P*R/(P+R) 真阳性率(TPR): TPR= TP/(TP+FN),代表分类器预测的正类中实际正实例占所有正实例的比例。 假阳性率(FPR): FPR= FP/(FP+TN),代表分类器预测的正类中实际负实例占所有负实例的比例。 以上这些都属于静态的指标,当正负样本不平衡时它会存在着严重的问题。极端情况下比如正负样本比例为1:99(有些领域并不少见),那么一个分类器只要把所有样本都判为负,它就拥有了99%的精确度,但这时的评价指标是不具有参考价值的。另外,很多分类器都不是简单地给出一个正或负(0或1)的分类判定,而是给出一个分类的倾向程度,比如贝叶斯分类器输出的分类概率。对于这些分类器,当你取不同阈值,就可以得到不同的分类结果及分类器评价指标,依此人们又发明出来ROC曲线以及AUC(ROC曲线包围面积)指标来衡量分类器的总体可信度。ROC曲线将FPR和TPR定义为x和y轴,这样就描述了真阳性和假阳性不同决策阈值下之间的关系。AUC越大说明模型性能越好,ROC曲线如下图:
ROC曲线示意图 下面通过一个实例说明Spark一些评估指标的使用import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.evaluation.RegressionMetrics val path="file:///u01/bigdata/spark/data/mllib/sample_libsvm_data.txt" val data=spark.read.format("libsvm").load(path) val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L) //参数说明 // threshold变量用来控制分类的阈值,默认值为0.5 val lr = new LogisticRegression() .setThreshold(0.6).setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val lrModel = lr.fit(trainingData) val predictions = lrModel.transform(testData) predictions.show() //计算MSE、MAE、 RMSE等 val evaluator = new BinaryClassificationEvaluator() .setLabelCol("label") val accuracy = evaluator.evaluate(predictions) val rm2 = new RegressionMetrics(predictions.select("prediction", "label").rdd.map(x =>(x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
println("MSE: " + rm2.meanSquaredError)
println("MAE: " + rm2.meanAbsoluteError)
println("RMSE Squared: " + rm2.rootMeanSquaredError)
//将其作为多分类结果进行评估,可计算F1、准确率、召回率、正确率
val multiclassClassificationEvaluator = new MulticlassClassificationEvaluator()
def printlnMetric(metricName: String): Unit = {
println(metricName + " = " + multiclassClassificationEvaluator.setMetricName(metricName).evaluate(predictions))
}
printlnMetric("f1")//f1 = 0.9646258503401359
printlnMetric("weightedPrecision")//weightedPrecision = 0.9675324675324675
printlnMetric("weightedRecall")//weightedRecall = 0.9642857142857142
printlnMetric("accuracy")//accuracy = 0.9642857142857143
//将其作为二分类结果进行评估,可计算areaUnderROC、areaUnderPR
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
def printlnMetric(metricName: String): Unit = {
println(metricName + " = " + binaryClassificationEvaluator.setMetricName(metricName).evaluate(predictions))
}
printlnMetric("areaUnderROC") //结果为areaUnderROC = 0.9944444444444444
printlnMetric("areaUnderPR")//结果为areaUnderPR = 0.9969948018193632
//分类正确且分类为1的样本数量 TP 是17
predictions.filter($"label" === $"prediction").filter($"label"===1).count
//分类正确且分类为0的样本数量 TN 是10
predictions.filter($"label" === $"prediction").filter($"label"===0).count
//分类错误且分类为0的样本数量 FN是1
predictions.filter($"label" !== $"prediction").filter($"prediction"===0).count
//分类错误且分类为1的样本数量 FP是0
predictions.filter($"label" !== $"prediction").filter($"prediction"===1).count
准确率:TP/(TP+FP)=17/(17+0)=1
召回率:TP/(TP+FN) = 17/(17+1)=0.944444
2.8 组装
我们对数据集进行了探索,之后进行大量的数据清理、转换等工作,对数据预处理后,构建模型、评估模型。评估模型前我们需要对数据集随机划分为训练集和测试集。假如数据有变化,如新增数据,如何保证训练集和测试集上的操作保持一致?如果数据清理、数据转换等有很多步骤,如何保证这些步骤依次执行?
采用Spark pipeline能很好解决这些问题。我们只要把这些任务,作为pipeline的stage,按照其本身的执行次序把这些stages组装到一个pipeline上。(当然如果任务比较复杂,我们也可以采用多个pipeline,然后把这些作为pipeline的stage,组装到一个新的pipeline。)
组装的步骤一般是:
1、创建pipeline,并各个stages依次组装在一起,如:
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
2、在训练集上拟合这个pipeline
val model = pipeline.fit(training)
3、在测试集上,做预测。
model.transform(test).select("label", "prediction")
通过这种方式,既可保证stages有序执行,也可保证在训练集和测试集上所做逻辑操作的一致性,这里只是举了一个简单例子,下一章将详细介绍有关Pipeline的内容,第7章后,还有详细的使用实例。
2.9 模型选择或调优
在ML中一个重要的任务就是模型选择,或者使用给定的数据为给定的任务寻找最适合的模型或参数。这个过程也叫做调优。调优可以是对单个的Estimator,比如LogisticRegression,或者是包含多个算法、特征工程和其他步骤的工作流(Pipeline)中完成。用户可以一次性对整个Pipline进行调优,而不必对Pipline中的每一个元素进行单独的调优。
MLlib支持使用像CrossValidator和TrainValidationSplit这样的工具进行模型选择。这些工具需要以下的组件:
- Estimator:用户调优的算法或Pipline。
- ParamMap集合:提供参数选择,有时也叫作用户查找的参数网格(parameter grid),参数网格可以使用ParamGridBuilder来构建。
- Evaluator:衡量模型在测试数据上的拟合程度。
模型选择工具工作原理如下:
1.将输入数据划分为训练数据和测试数据。
- 对于每个(训练,测试)对,遍历一组ParamMaps。用每一个ParamMap参数来拟合估计器,得到训练后的模型,再使用评估器来评估模型表现。
3.选择性能表现最优模型对应参数表。
2.9.1 交叉验证 (CrossValidator)
交叉验证(CrossValidator)会从将数据集切分成K折数据集合,分别用于训练和测试,。例如,K=3折时,CrossValidator会生成3个(训练数据,测试数据)对,每一个数据对的训练数据占2/3,测试数据占1/3。为了评估一个ParamMap,CrossValidator 会计算这三个不同的(训练,测试)数据集对在Estimator拟合出的模型上的平均评估指标。
在找出最好的ParamMap后,CrossValidator 会利用此ParamMap在整个训练集上可以训练(fit)出一个泛化能力强,误差相对小的的最佳模型,整个过程处于流程化管理之中,其工作流程图如下:
Spark CrossValidator流程图虽然利用CrossValidator来训练模型,可以提升泛化能力,但其的代价也比较高,如选择k=3,regParam=(0.1,0.01),numIters=(10,20)这样就需要对模型训练322=12次。然而,对比启发式的手动调优,这是选择参数的行之有效的方法。
2.9.2 训练-验证切分(TrainValidationSplit)
交叉验证的代价比较高昂,为此Spark也为超参数调优提供了训练-验证切分(TrainValidationSplit)。TrainValidationSplit创建单一的(训练,测试)数据集对。它使用trainRatio参数将数据集切分成两部分。例如,当设置trainRatio=0.8时,TrainValidationSplit将会将数据切分80%作为数据集,20%作为验证集,来生成训练、测试集对,并最终使用最好的ParamMap和完整的数据集来拟合评估器。
相对于CrossValidator对每一个参数进行k次评估,TrainValidationSplit只对每个参数组合评估1次。因此它的评估代价没有这那么高,但是当训练数据集不够大的时候其结果相对不够可信。
2.10 保存模型
训练、优化模型后,我们需要保存模型,然后把模型移植或部署到其他环境中。
这节主要介绍如何保存模型,如何部署模型等内容,以下是具体示例代码。
1)保存拟合后流水线(pipeline)到磁盘
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
2)保存未拟合的流水线(pipeline)到磁盘
pipeline.write.overwrite().save("/tmp/spark-logistic-regression-model")
3)把拟合后流水线部署到其他环境中。
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
2.11小结
这一章主要介绍了如何构建Spark学习系统、构建的一般步骤等。,实际上,构建Spark学习系统与我们构建其他平台的学习系统基本相同或相似,一般都包括数据加载、数据探索、数据预测、建模、训练模型、评估模型、优化模型等步骤,但这里我们特别增加一个利用pipeline组装各个任务(stages),这也是Spark ML中基于DataFrame数据集的重要内容,下一章我们将详细介绍有关pipeline的内容。
网友评论