美文网首页
spark第一天

spark第一天

作者: 吾为天帝乎 | 来源:发表于2018-11-05 22:44 被阅读5次
    • map方法
    val valrdd=sc.parallelize(Array(1,2,3,4)).map(x=>2*x).collect
    valrdd: Array[Int] = Array(2, 4, 6, 8)
    
    • fillter
     val rdd=sc.parallelize(Array(1,2,3,4)).filter(x=>x>1).collect
    rdd: Array[Int] = Array(2, 3, 4)
    
    • flatmap
    val array = Array("a b c"
    ,
    "d e f"
    ,
    "h i j")
    val arrayRDD = sc.parallelize(array)
    val resultRDD = arrayRDD.flatMap(_
    .split(" "))
    resultRDD.foreach(println)
    // 结果:a b c d e f h i j
    
    
    scala>  val data =Array(Array(1, 2, 3, 4, 5),Array(4,5,6))
    data: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(4, 5, 6))
    
    scala> val rdd1=sc.parallelize(data)
    rdd1: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[2] at parallelize at <console>:23
    
    scala> val rdd2=rdd1.flatMap(x=>x.map(y=>y))
    rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25
    
    scala> rdd2.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6)
    
    • mapPartitions

    mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。:

    
    scala> val f:Iterator[Int] => Iterator[Int] = (iterator) => {
         | val result =
         | scala.collection.mutable.ListBuffer[Int]()
         | while(iterator.hasNext){
         | result.append(iterator.next()*2)
         | }
         | result.iterator
         | }
    f: Iterator[Int] => Iterator[Int] = <function1>
    
    scala> val resultRDD=listRDD.mapPartitions(f)
    resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at mapPartitions at <console>:33
    
    scala> resultRDD.foreach(println)
    2
    4
    6
    8
    10
    
     val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        val list = List(1,2,3,4,5,6,7,8,9)
        val listRDD = sc.parallelize(list,3)
        listRDD.foreach(println)
        val f:Iterator[Int] => Iterator[(Int,Int)] = (it) => {
          // 构建返回结果的集合
          val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
          // 判断下一个元素是否达到末尾 -> 判断后继当中的信息是否为空
          while (it.hasNext){
            // 将游标向后移动一位,取出当前指向的元素
            val value = it.next()
            result.append((value,value * value))
          }
          // 获取集合中的迭代器
          result.iterator
        }
        listRDD.mapPartitions(f).foreach({
          println
        })
    
    • mapPartitionsWithIndex(function)

    mapPartitionsWithIndex函数是mapPartitions函数的一个变种
    其中的整型参数代表分区索引

    
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list,3)
    def f(index:Int, iterator:Iterator[Int]):Iterator[(Int,Int)] = {
    val result =scala.collection.mutable.ListBuffer[(Int,Int)]()
    while (iterator.hasNext){
    result.append((index,iterator.next()))
    }
    result.iterator
    }
    val resultRDD = listRDD.mapPartitionsWithIndex(f)
    resultRDD.foreach(println)
    // 结果:(2,5) (1,3) (1,4) (0,1) (2,6) (0,2)
    
     val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        val list = List(1,2,3,4,5,6,7,8,9)
        val listRDD = sc.parallelize(list,3)
        listRDD.foreach(println)
        val f:(Int, Iterator[Int]) => Iterator[(Int,Int)] = (index, it) => {
          println(index)
          // 构建返回结果的集合
          val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
          // 每个分区进行加和运算
          var sum:Int = 0
          while (it.hasNext){
            sum += it.next()
          }
          result.append((index,sum))
          // 获取集合中的迭代器
          result.iterator
        }
        listRDD.mapPartitionsWithIndex(f).foreach({
          println
        })
    
     val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        val list = List(1,2,3,4,5,6,7,8,9)
        val listRDD = sc.parallelize(list,3)
        listRDD.foreach(println)
        val f:(Int, Iterator[Int]) => Iterator[(Int,Int)] = (index, it) => {
          println(index)
          // 构建返回结果的集合
          val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
          if (index == 0){
            while (it.hasNext){
              val value = it.next()
              result.append((index,value * 2))
            }
          }else if(index == 1){
            while (it.hasNext){
              val value = it.next()
              result.append((index,value * 3))
            }
          }else{
            while (it.hasNext){
              val value = it.next()
              result.append((index,value * 4))
            }
          }
          // 获取集合中的迭代器
          result.iterator
        }
        listRDD.mapPartitionsWithIndex(f).foreach({
          println
        })
    
    • sample(withReplacement,fraction,seed):

    返回一个新的RDD,从原RDD中进行采样,withReplacement用于指定是否放回,fraction指定采样比例,seed用于指定种子值
    (是否放回,采样比例,种子值)
    要抽样,种子值一样,那多次抽样的数据就会相同
    如果为false,不放回抽样 (条件是是否放回抽样)

    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)
    val resultRDD = listRDD.sample(false,0.5,0)
    resultRDD.foreach(println)
    // 结果:1 3 6
    ``
    
    -  union(otherRDD):返回一个新的RDD,对原RDD与目标RDD求并集
    
    

    val list1 = List(1,2,3,4)
    val list2 = List(3,4,5,6)
    val listRDD1 = sc.parallelize(list1)
    val listRDD2 = sc.parallelize(list2)
    val resultRDD = listRDD1.union(listRDD2)
    println(resultRDD.collect().length)
    resultRDD.foreach(println)
    // 结果:8
    // 结果:2 1 4 3 3 4 5 6

    val rdd1=sc.parallelize(1 to 5)
    val rdd2=sc.parallelize(4 to 8)
    rdd1.union(rdd2).collect
    res13: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6, 7, 8)
    
    • intersection
      方法返回两个RDD数据集的交集
    scala> rdd1.intersection(rdd2).collect
    res14: Array[Int] = Array(4, 5)
    
    • distinct

    distinct函数将去除重复元素

    scala> val rdd1=sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
    
    scala> val rdd2=sc.parallelize(4 to 8)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21
    
    scala> rdd1.union(rdd2).distinct.collect
    res0: Array[Int] = Array(6, 1, 7, 8, 2, 3, 4, 5)
    
    
    • groupByKey([numTasks])
      输入数据为(K, V) 对, 返回的是 (K, Iterable) ,numTasks指定task数量,该参数是可选的,下面给出的是无参数的groupByKey方法
    scala> val rdd1=sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
    
    scala> val rdd2=sc.parallelize(4 to 8)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21
    
    scala> rdd1.union(rdd2).map((_,1)).groupByKey.collect
    res2: Array[(Int, Iterable[Int])] = Array((6,CompactBuffer(1)), (1,CompactBuffer(1)), (7,CompactBuffer(1)), (8,CompactBuffer(1)), (2,CompactBuffer(1)), (3,CompactBuffer(1)), (4,CompactBuffer(1, 1)), (5,CompactBuffer(1, 1)))
    
    

    CompactBuffer

    val words = List(("good",1),("good,1),("study",1),("day",1),("day",1),("up",1))
    val wordsRDD = sc.parallelize(words)
    val resultRDD = wordsRDD.groupByKey()
    resultRDD.foreach(println)
    // 结果:(study,CompactBuffer(1)) (day,CompactBuffer(1,
    1)) (good,CompactBuffer(1, 1)) (up,CompactBuffer(1))
    
    
    
    • reduceByKey(function[, numPartitions]):

    返回一个新的RDD,格式为(K,V),要求原RDD为PairRDD,传入的函数为reduce函数

    val words = List(("good",1),("good",1),("study",1),("day",1),("day",1),("up",1))
    val wordsRDD = sc.parallelize(words)
    val resultRDD = wordsRDD.reduceByKey(_
    +
    _)
    resultRDD.foreach(println)
    // 结果:(day,2) (up,1) (study,1) (good,2)
    
    • aggregateByKey(zeroValue[, numPartitions])

    aggregate 集合的意思

    aggregate 函数将分区的元素与初始化值进行聚合,然后执行combine 操作,将每个分区的的结果进行聚合,最终返回的结果不需要和原RDD 类型一致

    下面代码的主要意思是
    将list 中的元素求乘积和求和 然后将各片的结果两辆相加

      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
        val listRDD = sc.parallelize(list,3)
        val seqop:((Int,Int),(Int)) => (Int,Int) = {
          (zero,value) => {
            println(zero)
            (zero._1 * value,zero._2 + value)
          }
        }
        val combop:((Int,Int),(Int,Int)) => ((Int,Int)) = {
          (par1,par2) => {
            // 记录每个分区中的数据
            (par1._1 + par2._1,par1._2 + par2._2)
        }
        }
        println(listRDD.aggregate((1,0))(seqop,combop))
      }
    

    `

    相关文章

      网友评论

          本文标题:spark第一天

          本文链接:https://www.haomeiwen.com/subject/inkexqtx.html