美文网首页
离线计算组件篇-自定义分区、算子以及RDD

离线计算组件篇-自定义分区、算子以及RDD

作者: CoderInsight | 来源:发表于2023-02-08 08:38 被阅读0次

    10.Spark自定分区、算子、RDD

    (1)、spark自定义分区

    与mr当中的分区类似,spark当中的分区主要有HashPartitioner以及RangePartitioner,例如我们可以通过spark-shell来实现查看函数的分区类

    在我们开发的spark程序当中,只要涉及到shuffle的过程,那么就会有数据分区的操作,在spark当中,默认的分区规则有HashPartitioner以及RangePartitioner

    1590996849291.png

    在前面单词计数统计当中,通过partitioner就可以查看reduceByKey这个算子使用的分区策略


    1590996420555.png

    除了系统自带得两个分区策略之外,我们也可以通过自定义分区的方式来实现数据的分区,可以通过自定义分区的方式来解决某些特定情况下的数据的分区策略问题

    ==需求==:针对各个网站来的不同的url,如果域名相同,那么就认为是同一组数据,发送到同一个分区里面去,并且输出到同一个文件里面去进行保存,比如:http://www.baidu.com.comhttp://www.baidu.com.com/archives/1368,如果你使用HashPartitioner,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,

    数据字段之间以@zolen@来进行分割,表当中的字段详情如下

    5.网页访问记录

    属性上报表相关字段:

    用户umac iumac
    设备dmac idmac
    区编码 area_code
    派出所ID policeid
    上网场所服务类型 netsite_type
    获取时间 capture_time
    用户umac sumac
    设备dmac sdmac
    场所编码 netbar_wacode
    上网IP来源 src_ip
    上网来源端口 src_port
    上网分配IP dst_ip
    上网分配端口 dst_port
    请求方式 http_method
    请求路径域 http_domain
    请求路径匹配 http_action_match
    Web访问的URL地址 web_url
    http分类 http_categoryid
    网页名称 web_title
    安全厂商生成的场所编码 wxid
    省编码 province_code
    市编码 city_code
    类型名称 typename
    安全厂商组织机构代码 security_software_orgcode

    其中第17个字段,表示的是上网访问的URL地址,字段之间使用@zolen@来进行切割的

    代码开发实现如下:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    object SparkPartition {
      def main(args: Array[String]): Unit = {
        val context: SparkContext = SparkContext.getOrCreate(new SparkConf().setMaster("local[4]").setAppName("sparkPartitioner"))
        context.setLogLevel("WARN")
        val partition = new MyPartition(8)
        val getTextFile: RDD[String] = context.textFile("file:///D:\\开课吧课程资料\\15、scala与spark课程资料\\2、spark课程\\spark_day03\\2、数据准备\\上网数据",4)
        val splitTuple: RDD[(String, String)] = getTextFile.map(x => {
          val strings: Array[String] = x.split("@zolen@")
          // val host = new URL(strings(16)).getHost
          // (host, (url, t._2))
          if(strings.length >= 16){
            (strings(16), x)
          }else{
            ("http://www.baidu.com",x)
          }
        })
        //执行分区策略,将相同的hosts划分到同一个分区里面去
        splitTuple.partitionBy(partition).saveAsTextFile("file:///d:\\out_partition.txt")
        context.stop()
      }
    }
    class MyPartition(partitionsNum:Int) extends  Partitioner{
      override def numPartitions: Int = {
        partitionsNum
      }
      override def getPartition(key: Any): Int = {
        if(key.toString.startsWith("http")){
          val domain = new java.net.URL(key.toString).getHost()
          val returnResult : Int = (domain.hashCode & Integer.MAX_VALUE ) % partitionsNum
          returnResult
        }else{
          0
        }
    
      }
    }
    

    (2)、spark自定义算子以及自定义RDD

    RDD已经给我们提供了很多的各种类型的算子,例如transformation类型的算子或者action类型的算子,如果这些算子还不够我们使用的话,我们还可以自定义各种算子(其实就是定义方法)给RDD进行扩种,例如我们可以自定义transformation或者自定义action的方法,给RDD进一步的扩充算子

    需求:读取文件内容,将数据进行切分转换成为对象,然后通过自定义算子,将数据中的金额全部抽取出来成为一个RDD[Double],然后通过自定义算子对当中的数据进行计算,计算金额总和,最后自定义RDD,通过对金额进行打折之后返回一个新的RDD

    • 程序运行主入口类
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    case class SalesRecord(val transactionId: String,
                           val customerId: String,
                           val itemId: String,
                           val itemValue: Double) extends  Serializable
    
    
    object SparkMain {
    
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        val dataRDD = sc.textFile("file:///D:\\开课吧课程资料\\15、scala与spark课程资料\\2、spark课程\\spark_day03\\2、数据准备\\sales.txt")
        val salesRecordRDD = dataRDD.map(row => {
          val colValues = row.split(",")
          new SalesRecord(colValues(0),colValues(1),colValues(2),colValues(3).toDouble)
        })
    
        import com.kkb.own.CustomFunctions._
    
        println("Spark RDD API : "+salesRecordRDD.map(_.itemValue).sum)
    
        //通过隐式转换的方法,增加rdd的transformation算子
        val moneyRDD: RDD[Double] = salesRecordRDD.changeDatas
        println("customer RDD  API:" +  salesRecordRDD.changeDatas.collect().toBuffer)
    
    
        //给rdd增加action算子
        val totalResult: Double = salesRecordRDD.getTotalValue
        println("total_result" + totalResult)
    
        //自定义RDD,将RDD转换成为新的RDD
        val resultCountRDD: CustomerRDD = salesRecordRDD.discount(0.8)
    
        println(resultCountRDD.collect().toBuffer)
    
        sc.stop()
    
    
      }
    }
    
    • 定义增强函数:
    import org.apache.spark.rdd.RDD
    
    class CustomFunctions(rdd:RDD[SalesRecord]) {
      def changeDatas:RDD[Double] =  rdd.map(x => x.itemValue )
      def getTotalValue:Double =  rdd.map(x => x.itemValue).sum()
      def discount(discountPercentage:Double) = new CustomerRDD(rdd,discountPercentage)
    }
    
    object CustomFunctions{
      implicit def addIteblogCustomFunctions(rdd: RDD[SalesRecord]) = new CustomFunctions(rdd)
    
    }
    
    • 自定义RDD实现:
    import org.apache.spark.{Partition, TaskContext}
    import org.apache.spark.rdd.RDD
    
    class CustomerRDD(prev:RDD[SalesRecord], discountPercentage:Double)extends RDD[SalesRecord](prev){
    
    
      //def discount(discountPercentage:Double) = new IteblogDiscountRDD(discountPercentage)
    
      //继承compute方法
      override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] =  {
    
        firstParent[SalesRecord].iterator(split, context).map(salesRecord => {
          val discount = salesRecord.itemValue * discountPercentage
          new SalesRecord(salesRecord.transactionId,
            salesRecord.customerId,salesRecord.itemId,discount)
        })
    
      }
    
      //继承getPartitions方法
      override protected def getPartitions: Array[Partition] =
        firstParent[SalesRecord].partitions
    }
    

    相关文章

      网友评论

          本文标题:离线计算组件篇-自定义分区、算子以及RDD

          本文链接:https://www.haomeiwen.com/subject/gbqqkdtx.html