上一篇文章简单介绍了Spark的一些基本概念,看起来蛮抽象的,很多部分是摘自网络,有兴趣的朋友可以看看,传送门:初识Apache Spark(附示例)
我准备了两个入门示例,其实官网上也有,只是分开讲解的而已,我做了整合并在本机运行。
这两个例子非常简单,主要是区分两种rdd的创建方式。复杂的spark应用以后有机会再来分享(等我先学会了再说)。
Maven 引用:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
第一种:并行化集合。点此进入官网查看
上一篇文章的示例就是此种方式创建,这里我再用一个接近官网官方的例子展示一下
示例一:将一组数值求和
package com.yzy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.util.Arrays;
import java.util.List;
public class demo{
private static String appName = "spark.demo";
private static String master = "local[*]";
public static void main(String[] args) {
JavaSparkContext sc = null;
try {
//初始化 JavaSparkContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
// 构造数据源
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
//并行化创建rdd
JavaRDD<Integer> rdd = sc.parallelize(data);
//map && reduce
Integer result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer integer) throws Exception {
return integer;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer o, Integer o2) throws Exception {
return o + o2;
}
});
System.out.println("执行结果:" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (sc != null) {
sc.close();
}
}
}
}
执行结果
// 省略若干行
18/06/22 19:15:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 81 ms on localhost (executor driver) (1/4)
18/06/22 19:15:33 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 70 ms on localhost (executor driver) (2/4)
18/06/22 19:15:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 72 ms on localhost (executor driver) (3/4)
18/06/22 19:15:33 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 72 ms on localhost (executor driver) (4/4)
18/06/22 19:15:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/06/22 19:15:33 INFO DAGScheduler: ResultStage 0 (reduce at demo.java:37) finished in 0.256 s
18/06/22 19:15:33 INFO DAGScheduler: Job 0 finished: reduce at demo.java:37, took 0.319634 s
执行结果:15
第二种:外部数据集。点此进入官网查看
本例以textfile 为例
示例二:读取txt文件,计算包含【spark】的每一行字符长度之和
//test.txt 内容
spark demo
this is a spark demo file
hello world
package com.yzy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
public class demo2 {
private static String appName = "spark.demo";
private static String master = "local[*]";
public static void main(String[] args) {
JavaSparkContext sc = null;
try {
//初始化 JavaSparkContext
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
sc = new JavaSparkContext(conf);
//从test.txt 构建rdd,test.txt 放在项目根目录下
JavaRDD<String> rdd = sc.textFile("test.txt");
//过滤
rdd = rdd.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {
return s.contains("spark");
}
});
//map && reduce
Integer result = rdd.map(new Function<String, Integer>() {
public Integer call(String s) throws Exception {
return s.length();
}
}).reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println("执行结果:" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (sc != null) {
sc.close();
}
}
}
}
执行结果
//省略若干行
18/06/22 19:27:33 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 78 ms on localhost (executor driver) (1/2)
18/06/22 19:27:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 94 ms on localhost (executor driver) (2/2)
18/06/22 19:27:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/06/22 19:27:33 INFO DAGScheduler: ResultStage 0 (reduce at demo2.java:38) finished in 0.203 s
18/06/22 19:27:33 INFO DAGScheduler: Job 0 finished: reduce at demo2.java:38, took 0.243153 s
执行结果:35
网友评论