- 每当第一次学习一门语言时,都会写个helloWorld程序,spark也不例外,让我们从spark的helloWorld(word count)开始。在github中,spark有完整的word count源码,代码清晰整洁,是用RDD完成编码的,但上一节提到过spark 2.0之后推荐使用Dataset进行编码,所以本节笔者试着使用Dataset进行word count程序编写。大家可以点击代码连接 查看源码。废话不多说,先看看代码,感受一下。
代码中统计了10遍 The sound of silence的歌词,结果如下,为减少篇幅省略了一些词数统计:import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Objects; /** * @program: sparkstudy * @description: * @author: lin wang * @create: 2019-10-29 **/ public class WordsCount { private static SparkSession sparkSession; @Data @AllArgsConstructor @NoArgsConstructor public static class Words { private String word; private Long count; } @BeforeClass public static void before() { sparkSession = SparkSession .builder() .appName("words count") .master("local[*]") .getOrCreate(); } @Test public void wordsCountInvoke() { // read file contents from sound_of_silence.txt Dataset<String> dataset = sparkSession .read() .textFile(Objects.requireNonNull(this.getClass().getClassLoader().getResource("sound_of_silence.txt")).toString()); // calculator words List<Words> wordsList = dataset .flatMap((line) -> { List<Words> words = new ArrayList<>(); for (String word : line.split(" ")) { words.add(new Words(word, 1L)); } return words.iterator(); }, Encoders.bean(Words.class)) .groupByKey(Words::getWord, Encoders.STRING()) .reduceGroups((x, y) -> { x.setCount(x.getCount() + y.getCount()); return x; }) .map(x -> x._2, Encoders.bean(Words.class)) .collectAsList(); System.out.println("The results are :"); wordsList.forEach(words -> System.out.println(String.join(":", words.getWord(), String.valueOf(words.getCount())))); } }
The results are : naked:10 But:10 speaking:10 wells:10 not:10 softly:10 Left:10 sleeping:10 you:40 raindrops:10 more:10 was:20 ...
后面章节讲解各个算子的使用,本节只感受下spark的运行流程。
- web UI
- 当提交spark任务后就可以在UI界面看到spark执行情况,这使得spark调优和debug非常的方便,UI主界面如下,
-
image.png
UI界面有很多spark的基本概念,需要弄懂spark中各个术语的含义才能真正利用UI界面解决实际问题。在后面的章节中UI会帮助我们更好的理解spark。
-
image.png
- 当提交spark任务后就可以在UI界面看到spark执行情况,这使得spark调优和debug非常的方便,UI主界面如下,
- spark基本概念
- 上面UI界面包含很多的标签页和术语,我们先了解下spark的基本概念吧。
- Application:用户提交的spark应用程序;
- Driver:执行main函数,进行创建管理sparkContext,进行资源申请,任务分配等;
- Executor:运行在worker节点上的一个进程,该进程负责运行某些task,负责数据存到内存或者磁盘上。并行运行task的数据取决于分配的cpu数量;
- Worker:集群中可以运行application代码的节点。spark on yarn模式中指的node manager;
- Task:在executor进程中执行任务的工作单元,多个task组成一个stage;
- Job:由行动操作划分的一组并行计算;
- Stage:由shuffle算子划分的一组并行计算;
- DAGScheduler:根据job构建的基于stage的DAG(有向无环图);
- TaskScheduler:由taskset提交给woker,决定每个executor运行哪些task;
-
集群上部署spark程序
- image.png
- spark application 在集群上做为独立的进程运行,在主程序中通过SparkContext进行工作协调;
- SparkContext可以连接不同种类的Cluster Manager,Cluster Manger负责资源申请;
- SparkContext申请到集群中的资源后(Executor),会在此Executor上进行数据的计算和存储;
- SparkContext发送应用程序代码到Executor上;
- SparkContext发送任务到Executor上执行;
- 关于spark运行的一些知识:
- 每个应用都有独立的Executor进程,它存在于整个应用程序,并且使用多线程计算执行任务计算;
- spark不关心Cluster Manager的运行流程;
- driver端在整个应用程序中一直监听Executor运行;
- driver端需要和各个Executor通信,所以需要靠近worker nodes;
本节就到这里吧,下节会进行RDD相关知识的学习。
网友评论