import org.apache.spark.{Partitioner, SparkConf}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
class Spark06(number:Int) extends Partitioner{
//number 字段来确定分区数
override def numPartitions: Int = number
//分区规则
override def getPartition(key: Any): Int = {
try {
val str = key.toString.substring(0, 1)
if (str.matches("[A-Z]")) {
0
} else if (str.matches("[a-z]")) {
1
} else if (str.matches("[1-9]")) {
2
} else {
3
}
} catch {
case _ => 3
}
}
}
object Spark06 {
//自定义分区
//分区是什么? 分区就是在shuffle阶段, 用来确定每个key要发送到哪个reduce中
//实现? 继承org.apache.spark.Partitioner 类, 并且重写其中的方法
//案例: 按照key的首字母进行分区,划分规则: 数字,小写字母,大写字母, 其他
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//计算逻辑
compute(spark)
spark.stop()
}
def compute(spark:SparkSession):Unit ={
import spark.implicits._
val date =
List(
"ab Ab 3a R ## $a ", " #s s 3g 5g Fa ob Bb ¥la a", "a c 4 88 90 _a "
)
spark.sparkContext.parallelize(date)
.flatMap( line => {
val arr = ArrayBuffer[(String,Int)]()
line.split(" ").foreach( word => arr.+=( (word ,1) ) )
arr
} )
//reduceByKey 是shuffle 算子,在此处设置分区
.reduceByKey( new Spark06(4), _+_ )
//将结果保存在文件中, 才能查看分区效果
.saveAsTextFile("g://result")
}
}
网友评论