第十二章 弹性分布式数据集
- 什么是低级API:用于处理分布式数据(RDD)、用于分发和处理分布式共享变量
- 如何使用低级API:SparkContext是低级API函数库的入口,可以通过SparkSession来获取SparkContext
RDD
- DataFrame Dataset运行时都编译为RDD
- RDD是一个只读不可变的且已分块的记录集合,可以被并行处理
- RDD中记录的是对象
RDD类型
通用型RDD
key-value RDD,支持特殊操作并支持按key自定义数据分片
每个RDD具有五个主要内部属性:
- 数据分区列表
- 作用在每个分区的计算函数
- 描述与其他RDD的依赖关系列表
- 为key-value RDD配置的分区方法
- 优先位置列表,指定每个分区的处理位置偏好
RDD使用方法
创建RDD
spark.range(500).rdd
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
spark.sparkContext.textFile("/some/path/withTextFiles")
转换操作
- filter
def startsWithS(individual:String) = {
individual.startsWith("S")
}
words.filter(word => startsWithS(word)).collect()
- map
将给定数据集中的记录一条一条地输入该函数处理以得到你期望的结果(1对1)
比如这样把一条记录打成了三行
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
- flatmap
每个当前行会映射为多行(1对多),输出一个可展开的迭代器
words.flatMap(word => word.toSeq).take(5)
- randomSplit
将一个RDD随机分成若干个RDD,这些RDD组成一个RDD的Array返回
两个参数,一个是权重Array,一个是随机种子
val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))
动作操作
- reduce
reduce方法在指定一个函数将RDD中任何类型的值“规约”为一个值
给定一组数字求和,reduce方法制定一个函数来接收两个参数,然后对它们求和返回一个新值,新值再作为其中一个参数继续传递给该函数,另一个参数为下一个数字,最终得到一个数字 - take
从RDD中读取一定数量的值
缓存
默认情况下,仅对内存中的数据进行缓存和持久化
words.cache()
通过pipe方法调用系统命令操作RDD
将每个数据分区交给指定的外部进程来计算得到结果RDD,每个输入分区的所有元素被当做另一个外部进程的标准输入,输入元素由换行符分隔
- mapPartitions
map是mapPartitions基于行操作的一个别名,mapPartitions函数每次处理一个数据分区
words.mapPartitions(part => IteratorInt).sum() // 2
- foreachPartition
mapPartition函数需要返回值,但是foreachPartition函数不需要
words.foreachPartition { iter =>
import java.io._
import scala.util.Random
val randomFileName = new Random().nextInt()
val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
while (iter.hasNext) {
pw.write(iter.next())
}
pw.close()
}
- glom
可以将数据集中每个分区都转换为数组
spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom().collect()
// Array(Array(Hello), Array(World))
第十三章 高级RDD
Key-Value RDD
- 操作通常形如<some-operation>ByKey
最简单的操作:
words.map(word => (word.toLowerCase, 1))
也可通过keyBy函数生成
val keyword = words.keyBy(word => word.toLowerCase.toSeq(0).toString)
// 输出的形式:[('s', 'Spark'), ('t', 'THE')]
- lookup
keyword.lookup("s")
聚合操作
- countByKey
KVcharacters.countByKey()
- groupByKey
KVcharacters.groupByKey().map(row => (row._1, row._2.reduce(addFunc))).collect()
会把所以key放到一起进行计算,可能出现倾斜
- reduceByKey
reduce发生在每个分组,不需要将所有内容放在内存,此操作不会导致shuffle过程
KVcharacters.reduceByKey(addFunc).collect()
- groupByKey和reduceByKey
https://www.jianshu.com/p/0c6705724cff
当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey - aggregateBykey
基于key聚合,不基于分区聚合 - combineByKey
可以指定聚合函数和合并函数
val valToCombiner = (value:Int) => List(value)
val mergeValuesFunc = (vals:List[Int], valToAppend:Int) => valToAppend :: vals
val mergeCombinerFunc = (vals1:List[Int], vals2:List[Int]) => vals1 ::: vals2
// now we define these as function variables
val outputPartitions = 6
KVcharacters
.combineByKey(
valToCombiner,
mergeValuesFunc,
mergeCombinerFunc,
outputPartitions)
.collect()
- foldByKey
KVcharacters.foldByKey(0)(addFunc).collect() //0为加法,1为乘法
连接
co group
可以将三个key-value RDD一起分组,key在一边,所有value在另一边
import scala.util.Random
val distinctChars = words.flatMap(word => word.toLowerCase.toSeq).distinct
val charRDD = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))
val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))
charRDD.cogroup(charRDD2, charRDD3).take(5)
zip
zip把两个RDDyuansu匹配在一起,要求两个RDD元素个数相同,分区数也相同,生成一个PairRDD
控制分区
使用RDD可以控制数据在整个集群上的物理分布
自定义分区
- 自定义分区是使用RDD的主要原因之一,结构化API不支持自定义数据分区
- 自定义分区的唯一目标是将数据均匀地分布到整个集群中
import org.apache.spark.Partitioner
class DomainPartitioner extends Partitioner {
def numPartitions = 3
def getPartition(key: Any): Int = {
val customerId = key.asInstanceOf[Double].toInt
if (customerId == 17850.0 || customerId == 12583.0) {
return 0 //把这两个数据量太大的用户放到一个分区里
} else {
return new java.util.Random().nextInt(2) + 1
}
}
}
keyedRDD
.partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)
.take(5)
第十四章 分布式共享变量
- 第二种低级API是分布式共享变量,有两种:广播变量和累加器
- 累加器将所有任务中的数据累加到一个共享结果中
- 广播变量允许你在所有工作节点上保存一个共享值
广播变量
-
广播变量是共享的、不可修改的变量,它们缓存在集群中的每个节点上,不需要每个任务都反复序列化
广播变量
val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200,
"Big" -> -300, "Simple" -> 100)
val suppBroadcast = spark.sparkContext.broadcast(supplementalData)
累加器
- 累加器可以将转换操作更新的值以高效和容错的方式传输到驱动节点
- 累加器仅支持由满足交换律和结合律的操作进行累加的变量,因此对累加器的操作可以被高效并行
- 对于仅发生在动作操作内执行的累加器更新,Spark保证每个任务对累加器的更新只发生一次,重新启动的任务不会再次更新该值
-
在转换操作中,累加器可能由于重新执行发生多次变化
累加器
一个统计例子
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flights = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet")
.as[Flight]
import org.apache.spark.util.LongAccumulator
val accUnnamed = new LongAccumulator
val acc = spark.sparkContext.register(accUnnamed)
val accChina = new LongAccumulator
val accChina2 = spark.sparkContext.longAccumulator("China")
spark.sparkContext.register(accChina, "China")
def accChinaFunc(flight_row: Flight) = {
val destination = flight_row.DEST_COUNTRY_NAME
val origin = flight_row.ORIGIN_COUNTRY_NAME
if (destination == "China") {
accChina.add(flight_row.count.toLong)
}
if (origin == "China") {
accChina.add(flight_row.count.toLong)
}
}
flights.foreach(flight_row => accChinaFunc(flight_row))
accChina.value // 953
网友评论