spark partitioner自定义分区
一、Spark partition和block的区别
分区也被称为分片。
名称 | 定义 | 备注 |
---|---|---|
block | Spark数据分布式存储在HDFS最小物理单位。假如存在一个RDD,其数据大小为600M,如果一个block size为128M,那么就需要占用5个block的存储空间 | 比如spark.io.compression.snappy.block.size可以设置Snappy压缩中用到的块大小 |
partition | Spark RDD的的最小单元 | 一个RDD的 partition 大小不一、数量不定,是根据application里的算子和最初读入的数据分块数量决定的。partition数量 = task数量 |
注意:分区和RDD的宽窄依赖的关系
- 窄依赖是指每个父RDD的Partition最多被子RDD的一个Partition所使用(这样各task就可以按照pipeline方式处理,不需要等其他的task),例如map、filter,见图左部分;
-
宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用(这样某个task就需要等其他task处理完成后才能够开始下一个操作,存在shuffle),例如groupByKey、reduceByKey等,见图右部分。
窄依赖(左)和宽依赖(右)
二、Spark自带的分区器
首先需要明白,只有key-value形式的RDDMap[Int, Row]才能够自定义分区。Spark中自带有两种分区器,分别是 HashPartitioner 和 RangePartitioner,两者都继承 org.apache.spark.Partioner 类,且都定义在org.apache.spark.Partioner.scala中。
分析源码可知,
- Partitioner是一个抽象类,有两个方法:
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
- Partitioner 有一个伴生对象,HashPartitioner 是Spark默认的分区器,除非RDD已经指定了一个分区器;对于分区数量,如果设置了配置项spark.default.parallelism,那么使用该配置,否则使用the max number of upstream partitions:
object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
}
2.1 HashPartitioner 分区方法
分析源码可知,HashPartitioner 是通过对 RDDMap[Int, Row] 中的key求取hash值,再对hash值对分区数partitions 取余数得到,如果余数<0,那么就取“余数+partitions”,作为该row对应的分区编号。
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
2.2 RangePartitioner
RangePartitioner分区器的主要作用就是:将一定范围内的数映射到某一个分区内,所以它的实现中分界的方法rangeBounds尤为重要
- 确定各partition对应的边界,即rangeBounds;
- 首先,利用 RangePartitioner伴生对象的sketch()方法对input RDD 的每一个 partition进行抽样,抽样方法采取的是水塘抽样(Reservoir Sampling);
- RangePartitioner伴生对象的**determineBounds()方法获得边界值组成的数组Array[K];
- Array[K]被赋值给rangeBounds,即各partition对应的边界。
- 根据rangeBounds确定分区个数numPartitions;
- 根据rangeBounds确定 getPartition(key: Any): Int。
/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* @note The actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
}
def numPartitions: Int = rangeBounds.length + 1
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
}
如果“被取样的record数量” < 指定的分区数partitions,那么可能会导致“实际分区数不等于指定的分区数partitions”。
2.3 如何选择哪种分区器?
- HashPartitioner 虽说是默认的分区器,但是由于其是对key求取hash值再对partitions 取余数的方法,因此如果大部分key是相同的话将会导致,各partition之间存在数据倾斜的问题,极端情况下,RDD的所有row被分配到了同一个partition中。
- RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
总的说来,HashPartitioner 的适用范围更广,但是 RangePartitioner 数据更均衡。
当然,Partitioner还有其他的子类,比如machine learning模块中的GridPartitioner。
三、Spark自定义分区器的方法和业务代码
3.1 自定义分区方法
要实现自定义的分区,需要完成如下两步:
-
自定义一个 class,比如UDFPartitioner, 该 class 继承 org.apache.spark.Partitioner 类并实现下面4个方法:
- numPartitions: Int: 总分区数;
- getPartition(key: Any): Int: 返回给定键对应的分区编号(0 <= 且 <= numPartitions-1)。
- equals(): Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
- hashCode() :有一个问题需要注意,当你的算法依赖于 Java 的 hashCode() 方法时,这个方法有可能会 键值对操作返回负数,需要十分谨慎,确保 getPartition() 永远返回一个非负数。
-
对RDDMap[Int, Row]使用自定义分区器进行分区
RDDMap[Int, Row]是由key-row对组成的,自定义分区器比如UDFPartitioner会根据每个row要被归纳到的partition编号(每个row对应的key计算出)来完成分区。
3.2 业务代码示例-scala
public class Config {
public static String JDBC_PARA_URL = "url";
public static String JDBC_PARA_USER = "user";
public static String JDBC_PARA_PASSWORD = "password";
public static String JDBC_PARA_DRIVER = "driver";
public static String JDBC_PARA_TABLE = "dbtable";
public static String JDBC_PARA_FETCH_SIZE = "fetchsize";
}
// 自定义分区策略
import org.apache.spark.Partitioner
import org.slf4j.LoggerFactory
class UDFPartitioner(numParts: Int) extends Partitioner {
val logger = LoggerFactory.getLogger(classOf[SaicPartitioner])
// 总分区数
override def numPartitions: Int = numParts
// 这里的key就是rdd map中的key,返回的是该key-row对对应的partition序号(0 ~ numParts - 1)
override def getPartition(key: Any): Int = {
key.asInstanceOf[Int]
}
override def hashCode(): Int = super.hashCode()
override def equals(obj: scala.Any): Boolean = super.equals(obj)
}
// 主类
object Main {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("yarn").appName("test")getOrCreate()
val sqlContext = sparkSession.sqlContext
val sc = sparkSession.sparkContext
val partitionNum = 16
val fetchSize = 1000
val jdbcUrl = "..."
val userName = "..."
val schema_table = "..."
val password = "..."
val jdbcDriver = "com.mysql.jdbc.Driver"
// 注意需要将mysql jdbc driver jar放置在spark lib jars目录下,或者spark2-submit提交spark application时添加--jars参数
val jdbcDF = sqlContext.read.format("jdbc").options(
Map(Config.JDBC_PARA_URL -> jdbcUrl,
Config.JDBC_PARA_USER -> userName,
Config.JDBC_PARA_TABLE -> schema_table,
Config.JDBC_PARA_PASSWORD -> password,
Config.JDBC_PARA_DRIVER -> jdbcDriver,
Config.JDBC_PARA_FETCH_SIZE -> s"$fetchSize")).load()
val rdd = jdbcDF.rdd
// 得到rdd map,key为该row对应的分区序号
val rddMap: RDD[(Int, Row)] = rdd.mapPartitions {
partition => {
partition.map(row => (getPartitionOrder(row), row))
}
}
// 使用自定义分区策略进行分区
val rddPartitions = rddMap.partitionBy(new UDFPartitioner(partitionNum))
......
}
// 输入row,return 该row对应的分区序号
def getPartitionOrder(row: Row): Int = {
var rowkeyValue = ""
val seq = row.toSeq
val newSeq = seq.map(element => String.valueOf(element))
......
splitNum
}
}
水平有限,如有错误,敬请指出!
网友评论