美文网首页
Java的WordCount实现

Java的WordCount实现

作者: 丸蛋蟹 | 来源:发表于2016-10-17 20:54 被阅读116次

    WordCount 是用来统计一个文件中相同单词出现次数的程序, 是一个可以用来描述Spark运行的经典问题:通过将单词拆分映射(map)和对映射进行统计(reduce)实现任务

     public final class JavaWordCount 
     {
        //定义一个空格的字符模式
        private static final Pattern SPACE = Pattern.compile(" ");
    
        public static void main(String[] args) throws Exception 
        {
            // 0. 判断是否输入参数
             if (args.length < 1) 
            {
                System.err.println("Usage: JavaWordCount <file>");
                System.exit(1);
            }
    
            // 1. 创建一个 Spark 的上下文; 在创建上下文的过程中,程序会向集群申请资源以及构建相应的运行环境。
            SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
            JavaSparkContext jSparkContext = new JavaSparkContext(sparkConf);
            
            // 2. 内存中的集合或者外部文件系统作为输入源
             JavaRDD<String> lines = ctx.textFile(args[0], 1);
    
            // 3. flatMap 函数类似于 map 函数,但是每一个输入元素,会被映射为 0 到多个输出元素,因此,func 函数的返回值是一个 Seq,而不是单一元素
            // 本例中将文本按空格拆分成一个List
             JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() 
             {
                 @Override
                 public Iterable<String> call(String s) 
                 {
                    return Arrays.asList(SPACE.split(s));
                 }
             });
    
            // 4. MapToPair 函数类类似于map函数, 对于每一个输入元素, 映射为有特殊Key-Value的函数;
            // 本例中将每一个单词对应计数1
             JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() 
             {
                @Override
                public Tuple2<String, Integer> call(String s)
                {
                    return new Tuple2<String, Integer>(s, 1);
                }
             });
    
            // 5. reduceByKey函数在一个(K,V) 对的数据集上使用,返回一个(K,V)对的数据集,key 相同的值,都被使用指定的 reduce 函数聚合到一起
            // 本例中将相同单词的计数合并(i1+i2),达到合计总数的作用
            // reduceByKey是一个Action, 将transformation懒操作(Map)中的操作一并进行实现
             JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() 
             {
                @Override
                public Integer call(Integer i1, Integer i2)
                {
                    return i1 + i2;
                }
             });
    
            // 6. collect() 以数组的形式返回数据集的所有元素
             List<Tuple2<String, Integer>> output = counts.collect();
             for (Tuple2<?,?> tuple : output) 
            {
               System.out.println(tuple._1() + ": " + tuple._2());
            }
            
            // 7. JVM一次只能启动一个JavaSparkContext, 所以运行完成后需要关闭当前SparkContext
             jSparkContext.stop();
         }
    }
    

    参考资料:
    http://spark.apache.org/docs/latest/api/java/index.html
    http://www.ibm.com/developerworks/cn/opensource/os-cn-spark-deploy1/index.html

    相关文章

      网友评论

          本文标题:Java的WordCount实现

          本文链接:https://www.haomeiwen.com/subject/wimoettx.html