美文网首页
Apache Spark入门 - Java架构和应用程序

Apache Spark入门 - Java架构和应用程序

作者: 李穆 | 来源:发表于2019-08-19 17:54 被阅读0次

    https://medium.com/@atul94/getting-started-with-apache-spark-ad9d59e71f6f
    Apache Spark被解释为“用于大规模数据处理的快速通用引擎”。然而,这甚至没有开始包含它成为大数据空间中如此突出的参与者的原因。Apache Spark是一个分布式计算平台,大数据公司的采用率一直在以惊人的速度增长。

    Spark Architecture

    spark的架构如下:


    Spark生态系统

    Spark是一个分布式处理引擎,但它没有自己的资源分布式存储和集群管理器。它运行在开箱即用的集群资源管理器和分布式存储之上。

    Spark核心有两个部分:

    • 核心API:非结构化API(RDD),结构化API(DataFrame,数据集)。可用于Scala,Python,Java和R.
    • 计算引擎:内存管理,任务调度,故障恢复,与Cluster Manager交互。

    注意:我们将在本文末尾看到Java中的Core API实现。

    在核心API之外,Spark提供:

    • Spark SQL:通过类似查询的SQL与结构化数据交互。
    • 流:消费和处理连续的数据流。
    • MLlib:机器学习库。但是,我不建议在这里培训深度学习模型。
    • GraphX:典型的图处理算法。

    以上四种都直接依赖于用于分布式计算的spark核心API。

    Spark的优点

    • Spark为批处理,结构化数据处理,流媒体等提供了统一的平台。
    • 与Hadoop的map-reduce相比,spark代码更易于编写和使用。
    • Spark最重要的特性,它抽象了并行编程方面。Spark核心抽象了分布式存储,计算和并行编程的复杂性。

    Apache Spark的主要用例之一是大规模数据处理。我们创建程序并在spark集群上执行它们。

    计划在集群上的执行

    主要有两种方法在spark集群上执行程序:

    1. 互动客户,如spark-shellpy-spark,笔记本等。
    2. 提交一份工作。

    大多数开发过程都发生在交互式客户端上,但是当我们必须将我们的应用程序投入生产时,我们使用提交作业方法。

    新闻Feed - 流媒体工作。每日YouTube分析 - 批量作业

    对于长时间运行的流作业或定期批处理作业,我们打包应用程序并将其提交给Spark集群以供执行。

    1.png

    Spark是一种分布式处理引擎,遵循主从架构。在spark术语中,master是* Driver(驱动程序)*,slave是执行者(executors)

    1.png

    Driver负责:

    1. 分析
    2. 分布。
    3. 监测。
    4. 调度。
    5. 在Spark过程的生命周期内保持所有必要的信息。
      执行程序(executors)仅负责执行驱动程序(Driver)分配给它们的部分代码,并将状态报告给驱动程序。
      每个Spark过程都有一个单独的Driver和独有的执行程序(executors)。

    执行方式

    客户端模式:驱动程序是您提交应用程序的本地VM。默认情况下,spark以客户端模式提交所有应用程序。由于驱动程序是整个Spark过程中的主节点,因此在生产设置中,建议不要这样做。对于调试,使用客户端模式更有意义。
    群集模式:驱动程序是群集中的执行程序之一。在spark-submit中,您可以按如下方式传递参数:

    --deploy-mode cluster

    群集资源管理器

    1.png

    Yarn和Mesos是常用的集群管理器。
    Kubernetes是一个通用的容器协调器。

    注意:Kubernetes上的Spark不是生产就绪的。

    Yarn是最受欢迎的Spark资源管理器,让我们看看它的内在工作:
    在客户端模式应用程序中,驱动程序(Driver)是我们的本地VM,用于启动spark应用程序:
    步骤1:一旦Driver启动Spark会话请求就转到Yarn以创建Yarn应用程序。
    第2步: Yarn Resource Manager创建一个Application Master。对于客户端模式,AM充当执行程序(executor)启动器。
    步骤3: AM将联系Yarn Resource经理以请求进一步的容器。
    步骤4:资源管理器将分配新容器,AM将在每个容器中启动执行程序(executor)。之后,执行程序(executor)直接与驱动程序(Driver)通信。


    1.png

    注意:在群集模式下,驱动程序(Driver)在AM中启动。

    执行程序和内存调整

    硬件 - 6个节点,每个节点16个内核,64 GB RAM
    让我们从核心数量开始。核心数表示执行程序可以运行的并发任务。研究表明,任何具有超过5个并发任务的应用程序都会导致糟糕的表现。因此,我建议坚持5。

    注意:上述数字来自执行程序的性能,而不是系统具有的核心数。因此,32核系统也将保持不变。
    1核心操作系统和Hadoop守护进程需要1 GB RAM。因此我们留下了63 GB Ram和15 Core。
    对于15个核心,每个节点可以有3个executors。这总共给了我们18个executors。AM Container需要1个executors。因此我们可以得到17位executors。
    到内存了,每个executors得到63/3 = 21 GB。但是,在计算完整内存请求时需要考虑很小的开销。
    Formula for that over head = max(384, .07 * spark.executor.memory)
    Calculating that overhead = .07 * 21 = 1.47

    因此内存降至约19 GB。
    因此系统变成:

    --num-executors 17 --executor- memory 19G --executor-cores 5

    注意:如果我们需要更少的内存,我们可以减少内核数量以增加执行程序(executor)的数量。

    Spark Core

    现在我们来看一下Spark提供的一些核心API。Spark需要一个数据结构来保存数据。我们有三个备选方案RDD,DataFrame和Dataset。从Spark 2.0开始,建议仅使用Dataset和DataFrame。这两个内部编译到RDD本身。
    这三个是弹性,分布式,分区和不可变的数据集合。


    1.png

    任务: Spark中最小的工作单元,由执行程序(executor)执行。
    数据集提供两种类型的操作:
    转换:从现有数据集创建新数据集。它很懒惰,数据仍然是分布式的。
    操作: Action将数据返回给驱动程序(driver),本质上是非分布式的。对数据集的操作会触发作业。
    随机和排序:重新分区数据集以对其执行操作。它是spark的抽象,我们不需要为它编写代码。这项活动需要一个新阶段。

    通用行为和转换

    1)lit,geq,leq,gt,lt

    lit:创建一个文字值的列。可用于与其他列进行比较。
    geq(大于等于),leq(小于等于),gt(大于),lt(小于):用于与其他列值进行比较。例如:

    // [https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java](https://gist.githubusercontent.com/atul94/b1c473a3e9ad748776119707c5fec741/raw/b24eb6f00438c244a6919d19e3749f90a0cdb14e/spark_ds_opt_1.java)
    
    // Filter dataset with GENERIC_COL value >= 0
    Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).geq(0));
    // Filter dataset with GENERIC_COL value != "qwerty"
    Dataset<Row> newRowDataset = rowDataset.filter(col(GENERIC_COL).notEqual(lit("QWERTY")));
    

    2)join(加入)

    Spark让我们以各种方式加入数据集。将尝试用示例示例进行解释

    // [https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java](https://gist.githubusercontent.com/atul94/5451ee7a40c9f5199eb9f4905d754318/raw/d8b2450e23e9a87e5793e9b87852c6a0b3dd09d1/spark_ds_opt_2.java)
    
            new StructField("id", DataTypes.IntegerType, true, Metadata.empty()),
            new StructField("value", DataTypes.StringType, true, Metadata.empty()),
        };
    StructType structType = new StructType(structFields);
    List<Row> rowListA = new ArrayList<>();List<Row> rowListB = new ArrayList<>();
    
    rowListA.add(RowFactory.create(1, "A1"));rowListA.add(RowFactory.create(2, "A2"));
    rowListA.add(RowFactory.create(3, "A3"));rowListA.add(RowFactory.create(4, "A4"));
    
    rowListB.add(RowFactory.create(3, "A3"));rowListB.add(RowFactory.create(4, "A4"));
    rowListB.add(RowFactory.create(4, "A4_1"));rowListB.add(RowFactory.create(5, "A5"));
    rowListB.add(RowFactory.create(6, "A6"));
    
    // Create 2 sample dataset
    dsA = sparkSession.createDataFrame(rowListA, structType);
    dsB = sparkSession.createDataFrame(rowListB, structType);
    
    String[] typesOfJoins = {"inner", "outer", "full", "full_outer", "left",
            "left_outer", "right", "right_outer", "left_semi", "left_anti"};
    
    //Print both the dataset         
    System.out.println("Dataset A");
    dsA.show();
    System.out.println("Dataset B");
    dsB.show();
    
    //Print all possible types of join
    for (int i = 0; i < typesOfJoins.length; i++) {
      System.out.println(typesOfJoins[i].toUpperCase());
      dsA.join(dsB, dsA.col("id").equalTo(dsB.col("id")), typesOfJoins[i]).drop(dsB.col("id")).show();
    }
    

    结果如下:

    Dataset A                       Dataset B
    +---+-----+                     +---+-----+
    | id|value|                     | id|value|
    +---+-----+                     +---+-----+
    |  1|   A1|                     |  3|   A3|
    |  2|   A2|                     |  4|   A4|
    |  3|   A3|                     |  4| A4_1|
    |  4|   A4|                     |  5|   A5|
    +---+-----+                     |  6|   A6|
                                +---+-----+
    ----------------------------------------------------------------------------------------------------------------------------
    
    INNER JOIN                  OUTER JOIN                  FULL JOIN
    +---+-----+-----+               +---+-----+-----+               +---+-----+-----+
    | id|value|value|               | id|value|value|               | id|value|value|
    +---+-----+-----+               +---+-----+-----+               +---+-----+-----+
    |  3|   A3|   A3|               |  1|   A1| null|               |  1|   A1| null|           
    |  4|   A4| A4_1|               |  2|   A2| null|               |  2|   A2| null|
    |  4|   A4|   A4|               |  3|   A3|   A3|               |  3|   A3|   A3|
    +---+-----+-----+               |  4|   A4|   A4|               |  4|   A4| A4_1|
                                |  4|   A4| A4_1|               |  4|   A4|   A4|
                                |  5| null|   A5|               |  5| null|   A5|
                                |  6| null|   A6|               |  6| null|   A6|
                            +---+-----+-----+               +---+-----+-----+
    
    FULL_OUTER JOIN                 LEFT JOIN                   LEFT_OUTER JOIN 
    +---+-----+-----+               +---+-----+-----+               +---+-----+-----+
    | id|value|value|               | id|value|value|               | id|value|value|
    +---+-----+-----+               +---+-----+-----+               +---+-----+-----+
    |  1|   A1| null|               |  1|   A1| null|               |  1|   A1| null|
    |  2|   A2| null|               |  2|   A2| null|               |  2|   A2| null|
    |  3|   A3|   A3|               |  3|   A3|   A3|               |  3|   A3|   A3|
    |  4|   A4| A4_1|               |  4|   A4| A4_1|               |  4|   A4| A4_1|
    |  4|   A4|   A4|               |  4|   A4|   A4|               |  4|   A4|   A4|
    |  5| null|   A5|               +---+-----+-----+               +---+-----+-----+
    |  6| null|   A6|
    +---+-----+-----+
    
    RIGHT JOIN                  RIGHT_OUTER JOIN                LEFT_SEMI JOIN
    +---+-----+-----+               +---+-----+-----+               +---+-----+
    | id|value|value|               | id|value|value|               | id|value|
    +---+-----+-----+               +---+-----+-----+               +---+-----+
    |  3|   A3|   A3|               |  3|   A3|   A3|               |  3|   A3|
    |  4|   A4|   A4|               |  4|   A4| A4_1|               |  4|   A4|
    |  4|   A4| A4_1|               |  4|   A4|   A4|               +---+-----+
    |  5| null|   A5|               |  5| null|   A5|
    |  6| null|   A6|               |  6| null|   A6|
    +---+-----+-----+               +---+-----+-----+
    
    LEFT_ANTI JOIN
    +---+-----+
    | id|value|
    +---+-----+
    |  1|   A1|
    |  2|   A2|
    +---+-----+
    

    3)union(联合)

    Spark联合函数允许我们在两个数据集之间建立联合。数据集应该具有相同的模式。

    4)window(窗口)

    Spark中的基本功能之一。它允许您基于一组称为Frame的行计算表的每个输入行的返回值。
    Spark为翻滚窗口,希望窗口,滑动窗口和延迟窗口提供API。
    我们将它用于排名,总和,普通旧窗口等。一些用例是:

    // Window Function different use cases
    
    //Ranking
    //Filter top SOME_INTEGER_VAL_COL
    WindowSpec w1 = Window.orderBy(col(SOME_COUNT_COL).desc());
    inputDataset = inputDataset.select(col(SOME_COL), rank().over(w1).as(RANK_COL)).filter(col(RANK_COL).leq(SOME_INTEGER_VAL));
    
    //Get values over a moving Window
    //Helpful for calculating Directed Graphs in data based on time, moving average etc.
    mainDataset = mainDataset.groupBy(col(SOME_COL), window(col(DATE_TIME_COL), INTERVAL_STRING))
            .agg(collect_list(struct(col(SOME_COL), col(DATE_TIME_COL))).as(SOME_OTHER_COL));
    // Above program creates a directed graph based on window
    
    //Getting sum over some col
    WindowSpec w2 = Window.groupBy(col(SOME_COL));
    rowDataset = rowDataset.select(col(SOME_COL_A), sum(col(SOME_COL_B).over(w2)).as(SOME_COL_C));
    

    其他功能(如lag,lead等)允许您执行其他操作,使您可以对数据集执行复杂的分析。
    但是,如果仍需要对数据集执行更复杂的操作,则可以使用UDF。UDF的使用示例:

    //UDF
    //Some class to calculate similarity between two Wrapped Array with Schema
    public class SimilarityCalculator implements AnalyticsUDF {
    
      private Double varACoff;
      private Double varBCoff;
      private Double varCCoff;
      private Double varDCoff;
    
      public SimilarityCalculator(Double varACoff, Double varBCoff, Double varCCoff, Double varDCoff) {
        this.varACoff = varACoff;
        this.varBCoff = varBCoff;
        this.varCCoff = varCCoff;
        this.varDCoff = varDCoff;
      }
    
      public Double doCal(WrappedArray<GenericRowWithSchema> varA,
          WrappedArray<GenericRowWithSchema> varB) {
        int lenA = varA.length();
        int lenB = varB.length();
        double ans = 0;
        double temp;
        Map<String, Double> mapA = new HashMap<>();
        Map<String, Double> mapB = new HashMap<>();
        for (int i = 0; i < lenA; i++) {
          mapA.put(varA.apply(i).getString(0), varA.apply(i).getDouble(1));
        }
        for (int i = 0; i < lenB; i++) {
          mapB.put(varB.apply(i).getString(0), varB.apply(i).getDouble(1));
        }
        for (int i = 0; i < lenA; i++) {
          if (mapB.containsKey(varA.apply(i).getString(0))) {
            temp = varA.apply(i).getDouble(1) - mapB.get(varA.apply(i).getString(0));
            ans += temp * temp;
          } else {
            ans += varA.apply(i).getDouble(1) * varA.apply(i).getDouble(1);
          }
        }
        for (int i = 0; i < lenB; i++) {
          if (!mapA.containsKey(varB.apply(i).getString(0))) {
            ans += varB.apply(i).getDouble(1) * varB.apply(i).getDouble(1);
          }
        }
        return ans;
      }
    
      public Double doCal(Double varA, Double varB, Double varC, Double varD) {
        return Math.sqrt(varA * varACoff + varB * varBCoff + varC * varCCoff + varD * varDCoff);
      }
      
      public String getName() {
        return "L2DistanceScore";
      }
      
      public String getSecondName() {
        return "MergeScore";
      }
    }
    
    ///////////////////////////////////////////////////////////////////////////////////////////////////////////////
    
    /// At the Spark Session Level
    sparkSession.udf().register(similarityCalc.getName(),
            (WrappedArray<GenericRowWithSchema> colA, WrappedArray<GenericRowWithSchema> colB) -> similarityCalc
                .doCal(colA, colB), DataTypes.DoubleType);
    sparkSession.udf().register(similarityCalc.getSecondName(),
            (Double a, Double b, Double c, Double d) -> similarityCalc.doCal(a, b, c, d),
            DataTypes.DoubleType);
    
    

    注意:使用UDF应该是最后的手段,因为它们没有针对Spark进行优化; 他们可能需要更长的时间来执行死刑。建议在UDF上使用本机spark函数。

    这只是Apache Spark的冰山一角。它的实用程序扩展到各种领域,不仅限于数据分析。观看这个空间了解更多。

    参考

    相关文章

      网友评论

          本文标题:Apache Spark入门 - Java架构和应用程序

          本文链接:https://www.haomeiwen.com/subject/zpyesctx.html