10.Spark自定分区、算子、RDD
(1)、spark自定义分区
与mr当中的分区类似,spark当中的分区主要有HashPartitioner以及RangePartitioner,例如我们可以通过spark-shell来实现查看函数的分区类
在我们开发的spark程序当中,只要涉及到shuffle的过程,那么就会有数据分区的操作,在spark当中,默认的分区规则有HashPartitioner以及RangePartitioner
1590996849291.png在前面单词计数统计当中,通过partitioner就可以查看reduceByKey这个算子使用的分区策略
1590996420555.png
除了系统自带得两个分区策略之外,我们也可以通过自定义分区的方式来实现数据的分区,可以通过自定义分区的方式来解决某些特定情况下的数据的分区策略问题
==需求==:针对各个网站来的不同的url,如果域名相同,那么就认为是同一组数据,发送到同一个分区里面去,并且输出到同一个文件里面去进行保存,比如:http://www.baidu.com.com
和http://www.baidu.com.com/archives/1368
,如果你使用HashPartitioner
,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,
数据字段之间以@zolen@来进行分割,表当中的字段详情如下
5.网页访问记录
属性上报表相关字段:
用户umac | iumac |
---|---|
设备dmac | idmac |
区编码 | area_code |
派出所ID | policeid |
上网场所服务类型 | netsite_type |
获取时间 | capture_time |
用户umac | sumac |
设备dmac | sdmac |
场所编码 | netbar_wacode |
上网IP来源 | src_ip |
上网来源端口 | src_port |
上网分配IP | dst_ip |
上网分配端口 | dst_port |
请求方式 | http_method |
请求路径域 | http_domain |
请求路径匹配 | http_action_match |
Web访问的URL地址 | web_url |
http分类 | http_categoryid |
网页名称 | web_title |
安全厂商生成的场所编码 | wxid |
省编码 | province_code |
市编码 | city_code |
类型名称 | typename |
安全厂商组织机构代码 | security_software_orgcode |
其中第17个字段,表示的是上网访问的URL地址,字段之间使用@zolen@来进行切割的
代码开发实现如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object SparkPartition {
def main(args: Array[String]): Unit = {
val context: SparkContext = SparkContext.getOrCreate(new SparkConf().setMaster("local[4]").setAppName("sparkPartitioner"))
context.setLogLevel("WARN")
val partition = new MyPartition(8)
val getTextFile: RDD[String] = context.textFile("file:///D:\\开课吧课程资料\\15、scala与spark课程资料\\2、spark课程\\spark_day03\\2、数据准备\\上网数据",4)
val splitTuple: RDD[(String, String)] = getTextFile.map(x => {
val strings: Array[String] = x.split("@zolen@")
// val host = new URL(strings(16)).getHost
// (host, (url, t._2))
if(strings.length >= 16){
(strings(16), x)
}else{
("http://www.baidu.com",x)
}
})
//执行分区策略,将相同的hosts划分到同一个分区里面去
splitTuple.partitionBy(partition).saveAsTextFile("file:///d:\\out_partition.txt")
context.stop()
}
}
class MyPartition(partitionsNum:Int) extends Partitioner{
override def numPartitions: Int = {
partitionsNum
}
override def getPartition(key: Any): Int = {
if(key.toString.startsWith("http")){
val domain = new java.net.URL(key.toString).getHost()
val returnResult : Int = (domain.hashCode & Integer.MAX_VALUE ) % partitionsNum
returnResult
}else{
0
}
}
}
(2)、spark自定义算子以及自定义RDD
RDD已经给我们提供了很多的各种类型的算子,例如transformation类型的算子或者action类型的算子,如果这些算子还不够我们使用的话,我们还可以自定义各种算子(其实就是定义方法)给RDD进行扩种,例如我们可以自定义transformation或者自定义action的方法,给RDD进一步的扩充算子
需求:读取文件内容,将数据进行切分转换成为对象,然后通过自定义算子,将数据中的金额全部抽取出来成为一个RDD[Double],然后通过自定义算子对当中的数据进行计算,计算金额总和,最后自定义RDD,通过对金额进行打折之后返回一个新的RDD
- 程序运行主入口类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
case class SalesRecord(val transactionId: String,
val customerId: String,
val itemId: String,
val itemValue: Double) extends Serializable
object SparkMain {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val dataRDD = sc.textFile("file:///D:\\开课吧课程资料\\15、scala与spark课程资料\\2、spark课程\\spark_day03\\2、数据准备\\sales.txt")
val salesRecordRDD = dataRDD.map(row => {
val colValues = row.split(",")
new SalesRecord(colValues(0),colValues(1),colValues(2),colValues(3).toDouble)
})
import com.kkb.own.CustomFunctions._
println("Spark RDD API : "+salesRecordRDD.map(_.itemValue).sum)
//通过隐式转换的方法,增加rdd的transformation算子
val moneyRDD: RDD[Double] = salesRecordRDD.changeDatas
println("customer RDD API:" + salesRecordRDD.changeDatas.collect().toBuffer)
//给rdd增加action算子
val totalResult: Double = salesRecordRDD.getTotalValue
println("total_result" + totalResult)
//自定义RDD,将RDD转换成为新的RDD
val resultCountRDD: CustomerRDD = salesRecordRDD.discount(0.8)
println(resultCountRDD.collect().toBuffer)
sc.stop()
}
}
- 定义增强函数:
import org.apache.spark.rdd.RDD
class CustomFunctions(rdd:RDD[SalesRecord]) {
def changeDatas:RDD[Double] = rdd.map(x => x.itemValue )
def getTotalValue:Double = rdd.map(x => x.itemValue).sum()
def discount(discountPercentage:Double) = new CustomerRDD(rdd,discountPercentage)
}
object CustomFunctions{
implicit def addIteblogCustomFunctions(rdd: RDD[SalesRecord]) = new CustomFunctions(rdd)
}
- 自定义RDD实现:
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
class CustomerRDD(prev:RDD[SalesRecord], discountPercentage:Double)extends RDD[SalesRecord](prev){
//def discount(discountPercentage:Double) = new IteblogDiscountRDD(discountPercentage)
//继承compute方法
override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] = {
firstParent[SalesRecord].iterator(split, context).map(salesRecord => {
val discount = salesRecord.itemValue * discountPercentage
new SalesRecord(salesRecord.transactionId,
salesRecord.customerId,salesRecord.itemId,discount)
})
}
//继承getPartitions方法
override protected def getPartitions: Array[Partition] =
firstParent[SalesRecord].partitions
}
网友评论