美文网首页spark学习
spark学习(二)从hello world开始

spark学习(二)从hello world开始

作者: mumu_cola | 来源:发表于2019-11-03 10:53 被阅读0次
    • 每当第一次学习一门语言时,都会写个helloWorld程序,spark也不例外,让我们从spark的helloWorld(word count)开始。在github中,spark有完整的word count源码,代码清晰整洁,是用RDD完成编码的,但上一节提到过spark 2.0之后推荐使用Dataset进行编码,所以本节笔者试着使用Dataset进行word count程序编写。大家可以点击代码连接 查看源码。废话不多说,先看看代码,感受一下。
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Encoders;
      import org.apache.spark.sql.SparkSession;
      import org.junit.BeforeClass;
      import org.junit.Test;
      
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Objects;
      
      /**
       * @program: sparkstudy
       * @description:
       * @author: lin wang
       * @create: 2019-10-29
       **/
      
      public class WordsCount {
          private static SparkSession sparkSession;
      
          @Data
          @AllArgsConstructor
          @NoArgsConstructor
          public static class Words {
              private String word;
              private Long count;
          }
      
          @BeforeClass
          public static void before() {
              sparkSession = SparkSession
                      .builder()
                      .appName("words count")
                      .master("local[*]")
                      .getOrCreate();
          }
      
          @Test
          public void wordsCountInvoke() {
              // read file contents from sound_of_silence.txt
              Dataset<String> dataset = sparkSession
                      .read()
                      .textFile(Objects.requireNonNull(this.getClass().getClassLoader().getResource("sound_of_silence.txt")).toString());
              // calculator words
              List<Words> wordsList = dataset
                      .flatMap((line) -> {
                          List<Words> words = new ArrayList<>();
                          for (String word : line.split(" ")) {
                              words.add(new Words(word, 1L));
                          }
                          return words.iterator();
                      }, Encoders.bean(Words.class))
                      .groupByKey(Words::getWord, Encoders.STRING())
                      .reduceGroups((x, y) -> {
                          x.setCount(x.getCount() + y.getCount());
                          return x;
                      })
                      .map(x -> x._2, Encoders.bean(Words.class))
                      .collectAsList();
              System.out.println("The results are :");
              wordsList.forEach(words -> System.out.println(String.join(":", words.getWord(), String.valueOf(words.getCount()))));
          }
      }
      
      代码中统计了10遍 The sound of silence的歌词,结果如下,为减少篇幅省略了一些词数统计:
      The results are :
      naked:10
      But:10
      speaking:10
      wells:10
      not:10
      softly:10
      Left:10
      sleeping:10
      you:40
      raindrops:10
      more:10
      was:20
      ...
      

    后面章节讲解各个算子的使用,本节只感受下spark的运行流程。

    • web UI
      • 当提交spark任务后就可以在UI界面看到spark执行情况,这使得spark调优和debug非常的方便,UI主界面如下,
        • image.png

          UI界面有很多spark的基本概念,需要弄懂spark中各个术语的含义才能真正利用UI界面解决实际问题。在后面的章节中UI会帮助我们更好的理解spark。

    • spark基本概念
      • 上面UI界面包含很多的标签页和术语,我们先了解下spark的基本概念吧。
      • Application:用户提交的spark应用程序;
      • Driver:执行main函数,进行创建管理sparkContext,进行资源申请,任务分配等;
      • Executor:运行在worker节点上的一个进程,该进程负责运行某些task,负责数据存到内存或者磁盘上。并行运行task的数据取决于分配的cpu数量;
      • Worker:集群中可以运行application代码的节点。spark on yarn模式中指的node manager;
      • Task:在executor进程中执行任务的工作单元,多个task组成一个stage;
      • Job:由行动操作划分的一组并行计算;
      • Stage:由shuffle算子划分的一组并行计算;
      • DAGScheduler:根据job构建的基于stage的DAG(有向无环图);
      • TaskScheduler:由taskset提交给woker,决定每个executor运行哪些task;
    • 集群上部署spark程序
      • image.png
      • spark application 在集群上做为独立的进程运行,在主程序中通过SparkContext进行工作协调;
      • SparkContext可以连接不同种类的Cluster Manager,Cluster Manger负责资源申请;
      • SparkContext申请到集群中的资源后(Executor),会在此Executor上进行数据的计算和存储;
      • SparkContext发送应用程序代码到Executor上;
      • SparkContext发送任务到Executor上执行;
      • 关于spark运行的一些知识:
        • 每个应用都有独立的Executor进程,它存在于整个应用程序,并且使用多线程计算执行任务计算;
        • spark不关心Cluster Manager的运行流程;
        • driver端在整个应用程序中一直监听Executor运行;
        • driver端需要和各个Executor通信,所以需要靠近worker nodes;

    本节就到这里吧,下节会进行RDD相关知识的学习。

    相关文章

      网友评论

        本文标题:spark学习(二)从hello world开始

        本文链接:https://www.haomeiwen.com/subject/njmvvctx.html