美文网首页
spark第一天

spark第一天

作者: 吾为天帝乎 | 来源:发表于2018-11-05 22:44 被阅读5次
  • map方法
val valrdd=sc.parallelize(Array(1,2,3,4)).map(x=>2*x).collect
valrdd: Array[Int] = Array(2, 4, 6, 8)
  • fillter
 val rdd=sc.parallelize(Array(1,2,3,4)).filter(x=>x>1).collect
rdd: Array[Int] = Array(2, 3, 4)
  • flatmap
val array = Array("a b c"
,
"d e f"
,
"h i j")
val arrayRDD = sc.parallelize(array)
val resultRDD = arrayRDD.flatMap(_
.split(" "))
resultRDD.foreach(println)
// 结果:a b c d e f h i j

scala>  val data =Array(Array(1, 2, 3, 4, 5),Array(4,5,6))
data: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(4, 5, 6))

scala> val rdd1=sc.parallelize(data)
rdd1: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[2] at parallelize at <console>:23

scala> val rdd2=rdd1.flatMap(x=>x.map(y=>y))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25

scala> rdd2.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6)
  • mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。:


scala> val f:Iterator[Int] => Iterator[Int] = (iterator) => {
     | val result =
     | scala.collection.mutable.ListBuffer[Int]()
     | while(iterator.hasNext){
     | result.append(iterator.next()*2)
     | }
     | result.iterator
     | }
f: Iterator[Int] => Iterator[Int] = <function1>

scala> val resultRDD=listRDD.mapPartitions(f)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at mapPartitions at <console>:33

scala> resultRDD.foreach(println)
2
4
6
8
10
 val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val list = List(1,2,3,4,5,6,7,8,9)
    val listRDD = sc.parallelize(list,3)
    listRDD.foreach(println)
    val f:Iterator[Int] => Iterator[(Int,Int)] = (it) => {
      // 构建返回结果的集合
      val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
      // 判断下一个元素是否达到末尾 -> 判断后继当中的信息是否为空
      while (it.hasNext){
        // 将游标向后移动一位,取出当前指向的元素
        val value = it.next()
        result.append((value,value * value))
      }
      // 获取集合中的迭代器
      result.iterator
    }
    listRDD.mapPartitions(f).foreach({
      println
    })
  • mapPartitionsWithIndex(function)

mapPartitionsWithIndex函数是mapPartitions函数的一个变种
其中的整型参数代表分区索引


val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list,3)
def f(index:Int, iterator:Iterator[Int]):Iterator[(Int,Int)] = {
val result =scala.collection.mutable.ListBuffer[(Int,Int)]()
while (iterator.hasNext){
result.append((index,iterator.next()))
}
result.iterator
}
val resultRDD = listRDD.mapPartitionsWithIndex(f)
resultRDD.foreach(println)
// 结果:(2,5) (1,3) (1,4) (0,1) (2,6) (0,2)
 val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val list = List(1,2,3,4,5,6,7,8,9)
    val listRDD = sc.parallelize(list,3)
    listRDD.foreach(println)
    val f:(Int, Iterator[Int]) => Iterator[(Int,Int)] = (index, it) => {
      println(index)
      // 构建返回结果的集合
      val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
      // 每个分区进行加和运算
      var sum:Int = 0
      while (it.hasNext){
        sum += it.next()
      }
      result.append((index,sum))
      // 获取集合中的迭代器
      result.iterator
    }
    listRDD.mapPartitionsWithIndex(f).foreach({
      println
    })
 val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val list = List(1,2,3,4,5,6,7,8,9)
    val listRDD = sc.parallelize(list,3)
    listRDD.foreach(println)
    val f:(Int, Iterator[Int]) => Iterator[(Int,Int)] = (index, it) => {
      println(index)
      // 构建返回结果的集合
      val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
      if (index == 0){
        while (it.hasNext){
          val value = it.next()
          result.append((index,value * 2))
        }
      }else if(index == 1){
        while (it.hasNext){
          val value = it.next()
          result.append((index,value * 3))
        }
      }else{
        while (it.hasNext){
          val value = it.next()
          result.append((index,value * 4))
        }
      }
      // 获取集合中的迭代器
      result.iterator
    }
    listRDD.mapPartitionsWithIndex(f).foreach({
      println
    })
  • sample(withReplacement,fraction,seed):

返回一个新的RDD,从原RDD中进行采样,withReplacement用于指定是否放回,fraction指定采样比例,seed用于指定种子值
(是否放回,采样比例,种子值)
要抽样,种子值一样,那多次抽样的数据就会相同
如果为false,不放回抽样 (条件是是否放回抽样)

val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list)
val resultRDD = listRDD.sample(false,0.5,0)
resultRDD.foreach(println)
// 结果:1 3 6
``

-  union(otherRDD):返回一个新的RDD,对原RDD与目标RDD求并集

val list1 = List(1,2,3,4)
val list2 = List(3,4,5,6)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val resultRDD = listRDD1.union(listRDD2)
println(resultRDD.collect().length)
resultRDD.foreach(println)
// 结果:8
// 结果:2 1 4 3 3 4 5 6

val rdd1=sc.parallelize(1 to 5)
val rdd2=sc.parallelize(4 to 8)
rdd1.union(rdd2).collect
res13: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6, 7, 8)
  • intersection
    方法返回两个RDD数据集的交集
scala> rdd1.intersection(rdd2).collect
res14: Array[Int] = Array(4, 5)
  • distinct

distinct函数将去除重复元素

scala> val rdd1=sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val rdd2=sc.parallelize(4 to 8)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21

scala> rdd1.union(rdd2).distinct.collect
res0: Array[Int] = Array(6, 1, 7, 8, 2, 3, 4, 5)

  • groupByKey([numTasks])
    输入数据为(K, V) 对, 返回的是 (K, Iterable) ,numTasks指定task数量,该参数是可选的,下面给出的是无参数的groupByKey方法
scala> val rdd1=sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val rdd2=sc.parallelize(4 to 8)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21

scala> rdd1.union(rdd2).map((_,1)).groupByKey.collect
res2: Array[(Int, Iterable[Int])] = Array((6,CompactBuffer(1)), (1,CompactBuffer(1)), (7,CompactBuffer(1)), (8,CompactBuffer(1)), (2,CompactBuffer(1)), (3,CompactBuffer(1)), (4,CompactBuffer(1, 1)), (5,CompactBuffer(1, 1)))

CompactBuffer

val words = List(("good",1),("good,1),("study",1),("day",1),("day",1),("up",1))
val wordsRDD = sc.parallelize(words)
val resultRDD = wordsRDD.groupByKey()
resultRDD.foreach(println)
// 结果:(study,CompactBuffer(1)) (day,CompactBuffer(1,
1)) (good,CompactBuffer(1, 1)) (up,CompactBuffer(1))


  • reduceByKey(function[, numPartitions]):

返回一个新的RDD,格式为(K,V),要求原RDD为PairRDD,传入的函数为reduce函数

val words = List(("good",1),("good",1),("study",1),("day",1),("day",1),("up",1))
val wordsRDD = sc.parallelize(words)
val resultRDD = wordsRDD.reduceByKey(_
+
_)
resultRDD.foreach(println)
// 结果:(day,2) (up,1) (study,1) (good,2)
  • aggregateByKey(zeroValue[, numPartitions])

aggregate 集合的意思

aggregate 函数将分区的元素与初始化值进行聚合,然后执行combine 操作,将每个分区的的结果进行聚合,最终返回的结果不需要和原RDD 类型一致

下面代码的主要意思是
将list 中的元素求乘积和求和 然后将各片的结果两辆相加

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
    val sc = new SparkContext(sparkConf)
    val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
    val listRDD = sc.parallelize(list,3)
    val seqop:((Int,Int),(Int)) => (Int,Int) = {
      (zero,value) => {
        println(zero)
        (zero._1 * value,zero._2 + value)
      }
    }
    val combop:((Int,Int),(Int,Int)) => ((Int,Int)) = {
      (par1,par2) => {
        // 记录每个分区中的数据
        (par1._1 + par2._1,par1._2 + par2._2)
    }
    }
    println(listRDD.aggregate((1,0))(seqop,combop))
  }

`

相关文章

网友评论

      本文标题:spark第一天

      本文链接:https://www.haomeiwen.com/subject/inkexqtx.html