使用Spark处理数据时,常见的情形时临时的处理数据需要合并处理下游导出。
比如 HBase -> Mysql 转储,或者时 HDFS -> API调用提交。
一般情况下,rdd.foreach( a -> API.commit(a))
按顺序提交就可以了(这里假设批量接口为API.commit(a[])
)。 但是大多数情况batch提交更有效率,怎样对于rdd对元素进行分组和拆分呢?
- 使用分区特性操作,可以一定程度避免无意义合并,而且有批量特性。
rdd.foreachPartition(API::commit)
- 使用合并操作。这里推荐Guava的工具类
Iterators.partition(a, 1000).forEachRemaining(API::commit)
合并以上方法就是:
rdd.foreachPartition(it ->
Iterators.partition(it, batchSize)
.forEachRemaining(API::commit));
网友评论