美文网首页
Java Spark 简单示例(一)

Java Spark 简单示例(一)

作者: 憨人Zoe | 来源:发表于2018-06-22 19:31 被阅读0次

    上一篇文章简单介绍了Spark的一些基本概念,看起来蛮抽象的,很多部分是摘自网络,有兴趣的朋友可以看看,传送门:初识Apache Spark(附示例)

    我准备了两个入门示例,其实官网上也有,只是分开讲解的而已,我做了整合并在本机运行。

    这两个例子非常简单,主要是区分两种rdd的创建方式。复杂的spark应用以后有机会再来分享(等我先学会了再说)。

    Maven 引用:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    

    第一种:并行化集合。点此进入官网查看

    上一篇文章的示例就是此种方式创建,这里我再用一个接近官网官方的例子展示一下

    示例一:将一组数值求和

    package com.yzy.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class demo{
        private static String appName = "spark.demo";
        private static String master = "local[*]";
    
        public static void main(String[] args) {
            JavaSparkContext sc = null;
            try {
                //初始化 JavaSparkContext
                SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
                sc = new JavaSparkContext(conf);
    
                // 构造数据源
                List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    
                //并行化创建rdd
                JavaRDD<Integer> rdd = sc.parallelize(data);
    
                //map && reduce
                Integer result = rdd.map(new Function<Integer, Integer>() {
                    public Integer call(Integer integer) throws Exception {
                        return integer;
                    }
                }).reduce(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer o, Integer o2) throws Exception {
                        return o + o2;
                    }
                });
    
                System.out.println("执行结果:" + result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (sc != null) {
                    sc.close();
                }
            }
        }
    }
    
    

    执行结果

    // 省略若干行
    18/06/22 19:15:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 81 ms on localhost (executor driver) (1/4)
    18/06/22 19:15:33 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 70 ms on localhost (executor driver) (2/4)
    18/06/22 19:15:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 72 ms on localhost (executor driver) (3/4)
    18/06/22 19:15:33 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 72 ms on localhost (executor driver) (4/4)
    18/06/22 19:15:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    18/06/22 19:15:33 INFO DAGScheduler: ResultStage 0 (reduce at demo.java:37) finished in 0.256 s
    18/06/22 19:15:33 INFO DAGScheduler: Job 0 finished: reduce at demo.java:37, took 0.319634 s
    执行结果:15
    

    第二种:外部数据集。点此进入官网查看

    本例以textfile 为例

    示例二:读取txt文件,计算包含【spark】的每一行字符长度之和

    //test.txt 内容
    spark demo
    this is a spark demo file
    hello world 
    
    package com.yzy.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    
    public class demo2 {
        private static String appName = "spark.demo";
        private static String master = "local[*]";
    
        public static void main(String[] args) {
            JavaSparkContext sc = null;
            try {
                //初始化 JavaSparkContext
                SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
                sc = new JavaSparkContext(conf);
    
                //从test.txt 构建rdd,test.txt 放在项目根目录下
                JavaRDD<String> rdd = sc.textFile("test.txt");
    
                //过滤
                rdd = rdd.filter(new Function<String, Boolean>() {
                    public Boolean call(String s) throws Exception {
                        return s.contains("spark");
                    }
                });
    
                //map && reduce
                Integer result = rdd.map(new Function<String, Integer>() {
                    public Integer call(String s) throws Exception {
                        return s.length();
                    }
                }).reduce(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                });
    
                System.out.println("执行结果:" + result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (sc != null) {
                    sc.close();
                }
            }
        }
    }
    
    

    执行结果

    //省略若干行
    18/06/22 19:27:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 78 ms on localhost (executor driver) (1/2)
    18/06/22 19:27:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 94 ms on localhost (executor driver) (2/2)
    18/06/22 19:27:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    18/06/22 19:27:33 INFO DAGScheduler: ResultStage 0 (reduce at demo2.java:38) finished in 0.203 s
    18/06/22 19:27:33 INFO DAGScheduler: Job 0 finished: reduce at demo2.java:38, took 0.243153 s
    执行结果:35
    

    相关文章

      网友评论

          本文标题:Java Spark 简单示例(一)

          本文链接:https://www.haomeiwen.com/subject/lqklyftx.html