美文网首页
SparkRDDNote

SparkRDDNote

作者: 时待吾 | 来源:发表于2017-03-24 20:14 被阅读27次

    sparkContext.textFile("") 读取文件时 注意路径格式:本地文件 file:///D://fileDirctory1//file.txt

    hdfs:  hdfs:/user/hdfs/test/textFile  只支持String类型

    以下sc均代表sparkContext。

    sc.parallelize(Seq(....))  将一个本地的scala集合构造一个分布式的RDD 默认两个分区

    如果传递给Parallelize的seq参数是一个可变的集合实例,并且在parallelize调用之后 在针对该RDD的第一次action之前,该集合发生了改变,则相应的RDD 结果也会随之而发生改变,传递一份拷贝即可。

    避免使用Seq() 或者 Seq[T]() 创建一个空RDD 因为无分区  并不会报异常

    val rangeRdd = sc.parallelize(1 to 9)

    def mapFunc[T](iter:Iterator[T]):Iterator[(T,T)] = {

    var res = List[(T,T)]()

    var pre = iter.next()

    while(iter.hasNext()) {

    val cur = iter.next()

    res = res.:+(pre.cur)

    pre = cur

    }

    res.toIterator  // 由于 mapPartitions  的参数为 Iterator  返回值也为一个Iterator

    }

    // 针对当前RDD的每个分区使用一个函数 返回一个新的RDD ‘preserverPartitioning’ 保护分区标志位

    // 表示 input函数 是否保护当前分区 其默认为false,除非是一个 key-value对RDD input function不修改keys

    rangeRdd.mapPartitions(x => mapFunc(x)).foreach(println)

    rangeRdd.mapPartitions(x => mapFunc(x),true).foreach(println)

    rangeRdd.mapPartitions(mapFunc(_)).foreach(println)

    rangeRdd.mapPartitions(mapFunc).foreach(println)

    // 针对RDD的每个分区使用function返回一个新的RDD 同时跟踪 原始分区的索引序号 index

    // 注意 其中的 (x,iter)  其中x代表的是当前的分区号,iter代表的时元素的集合迭代器

    val e = rangeRdd.mapPartitionsWithIndex{

    (x,iter) => {

    var result = List[String]()

    var i = 0

    while(iter.hasNext()) { i += iter.next()}

    result = result.:+(x + "|" + i)

    result.toIterator

    }}

    e.collect().foreach(println)

    sc.parallelize(Seq()).foreach(println)

    // 针对RDD 的每个分区执行 function f

    rangeRdd.foreachPartition(println) // 打印的是  N个分区 打印N行 non-empty iterator

    // 针对RDD 的all elements  所有元素 执行 function f

    rangeRdd.foreachPartition(println) // 打印的是  所有元素

    // 将 key-value 对RDD的每个value使用map 函数,不改变keys,仍然保留原始RDD的分区

    rangeRdd.map(x=>(x,x)).mapValues(_+"|").foreach(println)

    // 针对当前RDD的所有元素 第一次应用一个map function  返回一个新的RDD  并将结果进行扁平化处理

    rangeRdd.flatMap(x => 1 to x).foreach(println)

    // 将 key-value 对RDD的每个value使用flatmap 函数,不改变keys,仍然保留原始RDD的分区,扁平化处理时,如果对应的value 为null  该key-value 对 直接删除

    rangeRdd.map(x=>(x,x)).flatMapValues(x => x.to(5)).foreach(println)

    // 使用指定的 交换的 和 组合的  二进制操作 针对RDD 的元素 进行reduce操作,首先reducePartition 然后 mergeResult

    println(rangeRdd.reduce(_+_))

    println(rangeRdd.reduce((x,y)=>x+y))

    // 使用组合的reduce函数 将每个key的values 进行合并操作,在将结果发送至Reducer之前每个mapper首先会在本地执行一次合并操作,类似于MapReduce的combiner操作

    println(rangeRdd.map(x => (x,x)).reduceByKey(_+_))

    // 使用组合的reduce函数 将每个key的values 进行合并操作,并立即将结果以Map形式返回至master节点(而不是以Rdd[(T,T)]的形式返回Reducer),在将结果发送至Reducer之前每个mapper首先会在本地执行一次合并操作,类似于MapReduce的combiner操作

    println(rangeRdd.map(x => (x,x)).reduceByKeyLocally(_+_))

    相关文章

      网友评论

          本文标题:SparkRDDNote

          本文链接:https://www.haomeiwen.com/subject/fqfqottx.html