mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理。
定义:def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
灌入Tair Demo
object DataHandler {
def saveData(data:RDD[(String, String)], partitionNum:Int, qps:Double, area:Short) = {
data.repartition(partitionNum).mapPartitions{
partition =>
val client = TairClient.getClient()
val rateLimiter = RateLimiter.create(qps)
partition.map {
case (key, value) =>
rateLimiter.acquire()
client.putData((key, value), area)
}
}
}
}
- 如果在映射的过程中要频繁创建大对象(如数据库、Tair连接等),使用mapPartitions要比map高效的多。
- 使用RateLimiter可以达到限流作用,流量为partitionNum* qps
网友评论