前几天写完了MapReduce的小例子之后,今天再来学习Spark的例子代码就通透了。
MapReduce分为Map和Reduce部分,而Spark实际在上边写代码方面就简单一些,实际上就是RDD的处理了,那么RDD是啥?
Spark的核心数据模型是RDD, Spark将常用的大数据操作都转化成为RDD的子类(RDD是抽象类,具体操作由各子类实现,如MappedRDD、Shuffled RDD)。
说人话就是Spark对数据的操作都是通过RDD来进行的,例如读取文件,文件处理,统计文字个数这一系列的操作都是RDD完成。
咱们接下来看看java如何写Spark的代码的。
一、Spark例子代码
通过以下代码可以很容易的看出来,没有那么多的Map,Reduce以及输入输出的格式指定,代码逻辑简单了,但是难点是在于lambda表达式的写法,写的很容易,能读懂,但是再让我写一次,我可能还是不会写。。。以后有机会重点学习下这部分。
引入maven
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
主方法
package com.sparkwordcount;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.Arrays;
public class SparkMain {
public static void main(String[] args) throws Exception {
// 设定appName(为本脚本取个名字)
String appName = "testSpark";
// 设定spark master(默认支持local)
String master = "local";
// 处理的源文件,输出的结果,这个文件是咱们前几天在MapReduce中的文件
String filePath = "/test/input/testFile.txt";
String outputPath = "/test/output/testSpartResult";
// 初始化Spark环境,为后边运行读取环境配置
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取文件并处理
JavaRDD<String> lines = sc.textFile(filePath);
// 将每一行通过空格截取成新的RDD
JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
// 将所有的文字组成键值对,并对不同的key进行计数
JavaPairRDD<String, Integer> pairs = words.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
// 循环输出每一个字的出现次数
counts.foreach(s -> System.out.println(s._1()+","+s._2()));
// 持久化到内存和硬盘中,能够为后期新的程序方便读取
counts.persist(StorageLevel.MEMORY_AND_DISK());
// 输出成文本到指定目录
counts.saveAsTextFile(outputPath);
}
}
二、打包
设定Artifacts打包
v2-8b90aee391d1ae8efd69c21e8a145b1f_720w.jpg执行Build Artifaces打包
v2-6cf90796f5e8744cc18eed083113ffc9_720w.jpg会在指定目录生成jar包SparkWordCount.jar
三、上传到docker并运行
#复制文件到docker中
docker cp /Users/XuesongBu/Documents/git_code/SparkWordCount/out/artifacts/SparkWordCount_jar/SparkWordCount.jar master:/usr/local
#进入docker
docker exec -it master bash
#进入Spark目录
cd /usr/local/spark-3.0.3-bin-hadoop2.7
#提交到Spark执行
./bin/spark-submit \
--class com.sparkwordcount.SparkMain \
--master local \
../SparkWordCount.jar \
100
#输出,其实还有别的很多数据,咱们忽视吧,太多了
tFile,1
Hello,1
dd,2
ddd,1
242343,1
123,1
tes,1
sdfs,1
43252,1
world,1
df,2
3434s,1
dfdsf,1
#通过hadoop查看文件命令查看本次执行的输出的文件结果
hadoop fs -cat /test/output/testSpartResult/*
(tFile,1)
(Hello,1)
(dd,2)
(ddd,1)
(242343,1)
(123,1)
(tes,1)
(sdfs,1)
(43252,1)
(world,1)
(df,2)
(3434s,1)
(dfdsf,1)
四、总结
这就是一个简单的Spark的小例子,这只是个入门,其实更复杂的是针对大数据统计的算法,我写出来的一切实际都是CRUD,都是利用工具进行的简单的操作,算法才是最重要的。
大家有什么不懂得可以在评论回复我,我来给大家详细解答。
谢各位的阅读,谢谢您动动手指点赞,万分感谢各位。另外以下是我之前写过的文章,感兴趣的可以点进去继续阅读。
历史文章
Hadoop系列-入门安装
Hadoop系列-HDFS命令
Hadoop系列-Hive安装
Hadoop系列-Hive数据库常见SQL命令
Hadoop系列-HBase数据库
Hadoop系列-HBase数据库(二)
Hadoop系列-HBase数据库JAVA篇
Hadoop系列-Spark安装以及HelloWorld
Hadoop系列-MapReduce小例子
Hadoop系列-Spark小例子
JAVA面试汇总(五)数据库(一)
JAVA面试汇总(五)数据库(二)
JAVA面试汇总(五)数据库(三)
JAVA面试汇总(四)JVM(一)
JAVA面试汇总(四)JVM(二)
JAVA面试汇总(四)JVM(三)
JAVA面试汇总(三)集合(一)
JAVA面试汇总(三)集合(二)
JAVA面试汇总(三)集合(三)
JAVA面试汇总(三)集合(四)
JAVA面试汇总(二)多线程(一)
JAVA面试汇总(二)多线程(二)
JAVA面试汇总(二)多线程(三)
JAVA面试汇总(二)多线程(四)
JAVA面试汇总(二)多线程(五)
JAVA面试汇总(二)多线程(六)
JAVA面试汇总(二)多线程(七)
JAVA面试汇总(一)Java基础知识
网友评论