常见RDD操作
textFile
在数据分析中最常见的时从外部获取数据集,这就需要textFile操作
val path = "/home/hadoop/Downloads/用户安装列表数据/*.gz"
//通过textfile文件读取数据最终获得的也是一个RDD,所以datas是一个RDD
val datas = sc.textFile(path)
textFile在读取数据时默认是按照换行符作为分割,所以一行为一个元素
datas.count()
统计行数,就是统计元素的个数
同时RDD计算具有惰性,只有涉及action操作才会执行,所以当出现count是,textFile 这些tranform操作,才会进行执行
textFile可以读取本地/HDFS目录下的文件
同时textFile可以读取多个文件(用逗号隔开),读取目录下文件,直接读取压缩文件
textFile("/input/*.gz")
map,filter
map的意思是对不同分片对每一个元素执行一个函数操作
val rdd1 = sc.parallelize(1 to 9 , 3)
val rdd2 = rdd1.map(x => x * 2)
val rdd3 = rdd2.collect()
println(rdd3.mkString(","))
map的第一个x代表是列表中一个每一个元素, => 表示的是call-by-name在需要计算的时候计算,在每一个元素都进行调用个计算
rdd2 为MapPartitionsRDD类型,经过collect转化为Int数组类型
rdd2.collect 对每一个分片进行收集变为int数组,并转换为字符串,输出
val rdd3 = rdd2.filter(x => x > 10)
val intlist2 = rdd3.collect
intlist2.foreach(println)
filter就是对于每一个元素进行过滤的操作
flatMap
flatMap是map的一对多的形式,输入一个可以对应输出多个
val rdd4 = rdd3.flatMap(x => x until 20)
println(rdd4.collect.mkString(","))
当然最常见的是对于字符串分片的操作
var rdd4 = rdd3.flatMap(x => x.split("\t"))
val stringList = datardd1.collect()
mapPartition
mapPartition的输入函数是每一个分区的数据
arrayRDD.mapPartitions(datas=>{
dbConnect = getDbConnect() //获取数据库连接
datas.foreach(data=>{
dbConnect.insert(data) //循环插入数据
})
dbConnect.commit() //提交数据库事务
dbConnect.close() //关闭数据库连接
})
分批将数据插入数据库
arrayRDD.mapPartitions(elements=>{
var result = new ArrayBuffer[Int]()
elements.foreach(element=>{
result.+=(element)
})
result.iterator
}).foreach(println)
分片求和
mapPartitionWihIndex和mapPartition是基本一样的,只是mapPartitionWihIndex是带有索引的二元组的数据
sample
对样本进行抽样,根据给定的随机种子,是否放回的抽样
val sampleData = datas.sample(false,0.01,10)
println(sampleData.count())
union,intersection,distinct
union,intersection分别是两个rdd的交集和并集
val rdd5 = rdd4.union(rdd3)
rdd5.collect.foreach(println)
使用distinct进行去重操作
groupByKey,reduceByKey
是针对key/value类型的数据进行分组操作,groupbykey是对数据进行分组
val rdd0 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,2),(2,3),(2,4)),3)
val rddk = rdd0.groupByKey()
val tuples = rddk.collect()
for((k,v)<-tuples){
println(k,v)
for(s<-v){
print(s)
}
}
group操作是只分组,reduce操作对分组的数据进行联合计算
val rdd6 = rdd0.reduceByKey((x,y) => x + y)
val arrayrdd = rdd6.collect
for((k,v)<-arrayrdd){
println(k,v)
}
aggregateByKey
第一个参数为初始值,第二参数为一个函数负责将初始值合并到分组中,第三个参数是一个函数,负责将每一个分组进行合并。
def seqFunc(a,b):
print "seqFunc:%s,%s" %(a,b)
return max(a,b) #取最大值
def combFunc(a,b):
print "combFunc:%s,%s" %(a ,b)
return a + b #累加起来
'''
aggregateByKey这个算子内部肯定有分组
'''
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
rest = aggregateRDD.collectAsMap()
for k,v in rest.items():
print k,v
combineByKey
combineByKey是对RDD中的数据集按照key进行聚合的操作
val data = Array((1,1.0),(1,3.0),(2,4.0),(2,5.0),(2,6.0))
val rdd = sc.parallelize(data,2)
val combine1 = rdd.combineByKey(createCombiner = (v:Double) =>(v:Double,1),
mergeValue = (c:(Double,Int),v:Double) => (c_1 + v ,c_2 + 1),
mergeCombiners = (c1:(Double,Int),c2:(Double,Int)) => (c1_1 + c2_1,c1_2+c2_2),
numPartitions = 2)
矩阵向量
使用breeze创建矩阵和向量
//创建0矩阵和0向量
val m1 = DenseMatrix.zeros[Double](2,3)
println(m1)
val v1 = DenseVector.zeros[Double](3)
println(v1)
//初始化自定义数值的向量,给向量填充指定数值
val v2 = DenseVector.fill(3){10.0}
//创建单位向量
val v3 = DenseVector.ones[Double](3)
//创建等差列表向量
val v4 = DenseVector.range(1,10,2)
println(v4)
//创建单位矩阵
val m2 = DenseMatrix.eye[Double](3)
println(m2)
//创建对角矩阵
val m3 = diag(DenseVector(1.0,2.0,3.0))
println(m3)
//从函数创建矩阵和向量
//i 代表的是索引下标
val v9 = DenseVector.tabulate(7){i =>2*i}
println(v9)
//i,j都是坐标
val m4 = DenseMatrix.tabulate(3,2){case(i,j) => i+j}
//将数组直接转换为向量或者矩阵
val v10 = new DenseVector(Array(1,2,3,4))
println(v10)
val m5 = new DenseMatrix(2,3,Array(11,2,3,2,3,6))
println(m5)
//随机生成向量和矩阵
val v11 = DenseVector.rand(4)
val m6 = DenseMatrix.rand(2,3)
println(m6)
对向量矩阵的访问
//访问向量数组
var a = DenseVector(1,2,3,4,5,6,7,8,9,10)
println(a.valueAt(3))
println(a(5 to 0 by -1))
println(a(0))
println(a(1 to -1))
//访问矩阵
val m = DenseMatrix((1.0,2.0,3.0),(3.0,4.0,5.0))
println(m)
//访问单元素
println(m(0,1))
//获取一列元素
println(m(::,1))
//获取一行
println(m(1,::))
Breeze元素操作
//矩阵重塑
val m1 = DenseMatrix((1.0,2.0,3.0),(3.0,4.0,5.0))
println(m1)
println(m1.reshape(3,2))
//矩阵的转置
//从函数创建矩阵和向量
//i 代表的是索引下标
val v9 = DenseVector.tabulate(7){i =>2*i}
println(v9)
//矩阵转换为向量
println(m1.toDenseVector)
DenseVector(1.0, 3.0, 2.0, 4.0, 3.0, 5.0)
主对角线/上三角/下三角
val m2 = DenseMatrix((1.0,2.0,3.0),(4.0,5.0,6.0),(7.0,8.0,9.0))
//下三角/上三角
println(lowerTriangular(m2))
println(upperTriangular(m2))
println(m2.copy)
//主对角向量
println(diag(m2))
//重新赋值
//给最后一列重新赋值
m2(::,2):=5.0
println(m2)
//矩阵进行垂直拼接和横向拼接
val m3 = DenseMatrix((1,2,3),(4,5,6))
val m4 = DenseMatrix((1,1,1),(2,2,2))
var m5 = DenseMatrix.vertcat(m3,m4)
println(m5)
m5 = DenseMatrix.horzcat(m3,m4)
println(m5)
```
向量的拼接同矩阵
##### 数值计算
//相同大小的矩阵相加
val m3 = DenseMatrix((1,2,3),(4,5,6))
val m4 = DenseMatrix((1,1,1),(2,2,2))
/数值计算
//矩阵相加
println(m3 + m4)
//对应每一项相乘
println(m3*:*m4)
//每一项相除
println(m3/:/m4)
//每一项小于
println(m3<:<m4)
println(m3:==m4)
println(m3:+=1)
println(m3:*=2)
println(max(m3))
println(argmax(m3))
//向量的点乘
println(DenseVector(1,2,3,4) dot DenseVector(1,1,1,1))
}
//求和
//对行/列求和
println(sum(m4))
println(sum(m4,Axis._0))
println(sum(m4,Axis._1))
//求迹
println(trace(m2))
//累加
accumulate(DenseVector(1,2,3,4))
##### 布尔操作
m3&:&m4 相与
m3|:|m4 相或
!m3 非操作
any(m3) 存在不为0返回true
all(m3) 所有不为0返回true
##### 线性代数矩阵间操作
val c = DenseMatrix((1.0,2.0,3.0),(4.0,5.0,6.0),(7.0,8.0,9.0))
val d = DenseMatrix((1.0,1.0,1.0),(1.0,1.0,1.0),(1.0,1.0,1.0))
println(c\d)
println(c.t)
println(det(c))
// 求逆
inv(c)
//求特征值和特征向量
eigSym(c)
//求范数
norm(c)
//求矩阵的质
rank(c)
网友评论