美文网首页热门首稿spark大数据
【Spark Java API】Action(3)—foreac

【Spark Java API】Action(3)—foreac

作者: 小飞_侠_kobe | 来源:发表于2016-08-16 19:50 被阅读2153次

    foreach


    官方文档描述:

    Applies a function f to all elements of this RDD.
    

    函数原型:

    def foreach(f: VoidFunction[T])
    

    **
    foreach用于遍历RDD,将函数f应用于每一个元素。
    **

    源码分析:

    def foreach(f: T => Unit): Unit = withScope {  
      val cleanF = sc.clean(f)  
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    

    实例:

    List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
    javaRDD.foreach(new VoidFunction<Integer>() {    
      @Override    
      public void call(Integer integer) throws Exception {        
        System.out.println(integer);    
      }
    });
    

    foreachPartition


    官方文档描述:

    Applies a function f to each partition of this RDD.
    

    函数原型:

    def foreachPartition(f: VoidFunction[java.util.Iterator[T]])
    

    **
    foreachPartition和foreach类似,只不过是对每一个分区使用f。
    **

    源码分析:

    def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {  
      val cleanF = sc.clean(f)  
      sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
    }
    

    实例:

    List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
    
    //获得分区ID
    JavaRDD<String> partitionRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {    
      @Override    
      public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception {        
          LinkedList<String> linkedList = new LinkedList<String>();        
          while(v2.hasNext()){            
            linkedList.add(v1 + "=" + v2.next());        
          }
         return linkedList.iterator();    
      }
    },false);
    System.out.println(partitionRDD.collect());
    javaRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {    
      @Override    
       public void call(Iterator<Integer> integerIterator) throws Exception {        
        System.out.println("___________begin_______________");        
        while(integerIterator.hasNext())            
          System.out.print(integerIterator.next() + "      ");        
        System.out.println("\n___________end_________________");    
       }
    });
    

    lookup


    官方文档描述:

    Return the list of values in the RDD for key `key`. This operation is done efficiently 
    if the RDD has a known partitioner by only searching the partition that the key maps to.
    

    函数原型:

    def lookup(key: K): JList[V]
    

    **
    lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
    **

    源码分析:

    def lookup(key: K): Seq[V] = self.withScope {  
      self.partitioner match {    
        case Some(p) =>      
          val index = p.getPartition(key)      
          val process = (it: Iterator[(K, V)]) => {        
            val buf = new ArrayBuffer[V]        
            for (pair <- it if pair._1 == key) {          
              buf += pair._2        
            }        
            buf      
          } : Seq[V]      
          val res = self.context.runJob(self, process, Array(index), false)      
          res(0)    
        case None =>      
          self.filter(_._1 == key).map(_._2).collect()  
      }
    }
    

    **
    从源码中可以看出,如果partitioner不为空,计算key得到对应的partition,在从该partition中获得key对应的所有value;如果partitioner为空,则通过filter过滤掉其他不等于key的值,然后将其value输出。
    **

    实例:

    List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
    JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data, 3);
    JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
      int i = 0;    
      @Override    
      public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
        i++;        
        return new Tuple2<Integer, Integer>(integer,i + integer);    
      }
    });
    System.out.println(javaPairRDD.collect());
    System.out.println("lookup------------" + javaPairRDD.lookup(4));
    

    相关文章

      网友评论

        本文标题:【Spark Java API】Action(3)—foreac

        本文链接:https://www.haomeiwen.com/subject/qwbpsttx.html