美文网首页
Spark2.0 Programming Guide(Spark

Spark2.0 Programming Guide(Spark

作者: 4762d2980c91 | 来源:发表于2016-09-26 20:23 被阅读972次

    1. Overview-概览

    ​ 每一个Spark应用都是由包含一个main方法的driver program组成,并且能够在一个集群上执行一系列的并行操作。Spark的第一个主要抽象概念是RDD(Resilient distributed dataset)-分布在集群的各个节点上能够被并行操作的被分割的数据集。RDD开始可以是由在hdfs(或其他hadoop支持的文件系统)上的文件或者是driver program中的一个集合通过转换来创建,用户可以在内存中persist一个RDD来允许它被高效的重复使用,RDD具备自动恢复能力。

    ​ Spark的第二个抽象概念是:共享变量。共享变量可以在并行操作中被使用。默认情况,Spark通过在不同的节点以任务集的方式来运行并行操作函数,spark会把在并行操作中用到的变量传递到每个节点上。有时,一个变量需要在不同的任务之间共享,或者在任务与主程序driver program之间共享。Spark支持两种类型的共享变量:广播变量(broadcast variables)-用来在所有的节点上缓存一个值;accumulators-可进行叠加操作的变量,比如计数和求和变量。

    2. Resilient Distributed Datasets(RDDs)

    ​ RDD的概念贯穿于Spark的整个生态系统理论中,RDD是一个以并行方式运行具有容错性的元素集合。在Spark中有两种方式来创建RDD数据集:并行化集合- parallelizing一个在driver program中定义的数据集合;外部数据集-指向引用一个外部存储系统中的数据集,比如一个共享文件系统上的文件、HDFS、HBase或者其他提供了Hadoop InputFormat特性接口的任意数据源。

    2.1 并行化集合-Parallelized Collections

    ​ 并行化集合通过在一个存在的java或者scala集合上调用JavaSparkContextparallelize方法来创建。集合的元素被复制来生成一个可并行操作的分布式数据集。以下是一个创建并行化集合的样例:

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);
    

    创建完成,分布式数据集distData就可以被并行操作。比如,我们可以调用distData.reduce((a,b) -> a + b)来计算集合的元素和。

    ​ 并行化集合还有一个重要的参数是把一个集合切分成多少个partitions。Spark会在每个partition上运行一个任务。典型的在集群的每个CPU上会分配2-4个partitions。Spark会根据配置自动把一个集合切分成多少个partition,我们也可以自己通过调用parallelize(data, 10)这个方法来手动设置你想切分的partition数。

    2.2 外部数据集-External Datasets

    ​ Spark可以从任何Hadoop支持的存储源创建分布式数据集;包括本地文件系统、HDFS、HBase、Cassandra、Amazon S3等等。Spark支持文本文件、序列化文件和其他任何Hadoop支持的InputFormat格式。

    ​ 文本文件的RDD可以使用SparkContext的textFile方法来创建。这个方法根据提供的文件URI(可以是一个本地路径或者是hdfs://, s3n://等形式的URI)将文件内容读取为文件中每个行的集合。下面也是一个样例:

    JavaRDD<String> distFile = sc.textFile("data.txt");
    

    创建完成,distFile就可以执行数据集的操作。比如:我们可以计算所有行的sizes:distFile.map(s -> s.length()).reduce((a, b) -> a + b)。

    Spark读取文件需要注意的:

    • 如果使用本地文件系统路径,那么这个文件必须是要所有节点可访问的。拷贝这个文件到所有的节点或者是通过网络挂载方式挂到一个共享文件系统上。
    • Spark支持的文件输入方式:文本文件,目录文件,压缩文件,以及通配符文件。例如:你可以使用textFile("/my/directory"), textFile("/my/directory/*.txt"), txtFile("/my/directory/**.gz")。
    • textFile方法同样也支持一个可选的第二个参数来控制partitions的数目。默认的,Spark给每个文件块(HDFS中的文件分块)创建一个partition,当然你也可以通过传递一个更大的值来要求更多的partitions。但是partitions的数量不能够比blocks的数量少。

    3. RDD操作-RDD Operations

    ​ RDDs支持两种类型的操作:transformations(转换) - 从一个存在的RDD上创建一个新的RDD;actions(动作) - 在数据集上执行一个计算操作之后返回一个值给driver program。例如,map是一个转换操作,将数据集传递给一个函数并返回一个新的RDD结果;reduce是一个动作,使用某些函数集合RDD的所有元素并返回一个最终的结构给driver program

    ​ Spark所有的transformations操作时懒惰的,也就是说它们不会立刻计算它们的结果,它们只会记住这些转换。transformations操作只有当一个action动作执行并需要某个transformations操作的结果时,这个transformation才会被计算。这种设计模式使得Spark运行更加高效。

    ​ 默认情况下,每个transformed RDD在你每次在它上面运行一个action时都会被重新计算。然而,Spark提供了持久化方式,可以让你把第一次transformation后的结果RDD保存在内存或者磁盘上,这样如果下次有需要这个transformed RDD的时候就不用再次计算从而加快整个计算的速度。

    3.1 基本操作-Basic

    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
    int totalLength = lineLengths.reduce((a, b) -> a + b);
    

    第一行从一个外部文件创建了一个RDD。这个RDD并不会被加载到内存中,lines只是引用了这个文件而已。第二行的lineLengths是map转换操作的结果,由于懒惰性这个结果不会马上被计算。最后一行,当执行reduce操作时,由于这是一个action,在这个时候,Spark会把这个计算分成多个任务分发到集群中的不同机器上,每个机器会执行它本地的map和reduce操作,然后返回它的结果值到driver program

    如果我们要多次用到lineLengths的值,那么我们可以添加下面这一行代码:

    lineLengths.persist(StorageLevel.MEMORY_ONLY());
    

    在执行reduce操作前,上面这句代码会在lineLengths第一次被计算出来后保存到内存中。

    3.2 函数传递-Passing Functions to Spark

    ​ Spark提供的API对于函数的传递具有严重的依赖性。在java里面,传递函数只能通过类来展现。有两种方式来创建这样的函数:

    • 实现org.apache.spark.api.java.function.Function接口,或者是匿名内部类;
    • 在Java 8,使用lambda表达式来简化这个实现。

    lanbda表达式的方式上面有样例。下面是匿名内部类和实现接口的方式来实现通上面代码一样的功能:

    // 匿名内部类
    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
      public Integer call(String s) { return s.length(); }
    });
    int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
      public Integer call(Integer a, Integer b) { return a + b; }
    });
    
    // 实现接口方式
    class GetLength implements Function<String, Integer> {
      public Integer call(String s) { return s.length(); }
    }
    class Sum implements Function2<Integer, Integer, Integer> {
      public Integer call(Integer a, Integer b) { return a + b; }
    }
    
    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaRDD<Integer> lineLengths = lines.map(new GetLength());
    int totalLength = lineLengths.reduce(new Sum());
    

    4. 理解闭合-Understanding closures

    ​ 在Spark中最难理解的一件事:当在集群中执行代码时,变量和函数的生命周期和作用域的问题。RDD操作在变量的作用域外能够修改他们的值(注意对这一点的理解:是夸机器导致的这个问题出现)是导致这件事发生的主要原因。

    4.1 例子

    ​ 考虑下面的RDD操作,可能在不同的环境下执行会有不同的结果(取决与是否在同一个jvm上运行)。一种常见情况是在Spark的local模式和Spark的cluster模式运行时:

    int counter = 0;
    JavaRDD<Integer> rdd = sc.parallelize(data);
    
    // Wrong: Don't do this!!
    rdd.foreach(x -> counter += x);
    
    println("Counter value: " + counter);
    

    Local vs. cluster modes

    ​ 上面代码的行为是不确定的。为了执行这个作业,Spark会把RDD操作分配成不同的多个任务进程,每个任务进程都由每个Worker node上的executor执行器来执行。在被每个executor执行器执行之前,Spark会计算每个任务的closure。这个closure是只那些变量和方法-为了执行在RDD上的计算必须让executor可见的变量和方法。这些closure会被序列化并被发送到每个executor上面。

    ​ 在closure中的变量现在被发送到了每个executor上,executor中有了这些变量的副本,当counter变量在foreach函数中被引用的时候,这个counter变量不再是driver program所运行节点上的counter变量了,虽然在driver program节点上任然存在counter这个变量,但是它的变量对所有的executors是不可见。executor只能够访问到从closure上复制过来的在本地机器上的counter。所以,counter的最终结果还是零。

    ​ 在local模式,某些条件下,foreach函数将会在一个相同的jvm虚拟机上运行,可能会引用的同一个counter变量,在这种情况下counter的值可能会被更新。

    ​ 在上面的场景中为了确保确定的行为发生,我们应该使用Accumulator。在Spark中Accumulator提供了一种机制来保证在集群中的夸节点并行任务能够安全的更新变量。Accumulator会在稍后讨论。

    Printing elements of an RDD

    ​ 一种另外的场景是使用rdd.foreach(println)来打印一个RDD中的所有元素。在单机上,这个可以打印出RDD上的元素。然而在集群中,executor的标准输出是写到executor上的标准输出而不是driver program节点上的标准输出,所以并不会在显示相要的结果。为了打印RDD上的所有元素,我们可以使用collect()方法来将RDD数据带到driver program节点上:rdd.collect().foreach(println)。这个操作可能会造成driver program节点内存溢出,因为collect()会把RDD的所有数据抓到driver program单个节点上。如果你需要打印少量元素,一个安全的方式是使用:rdd.take(100).foreach(println)。

    5. 键值对的RDD-Working with Key-Value Pairs

    ​ Spark的大多数操作可以在任何类型的RDD上工作,但是有少部分特殊的操作只能运行在key-value形式的RDD上。最常见的一个是“shuffle”操作,比如说:通过键来分组和聚合的操作。

    ​ key-value形式的RDD通过JavaPairRDD类来表示。我们可以使用mapToPair和flatMapToPair操作来从JavaRDD来构建JavaPairRDD。例如,下面的代码使用reduceByKey操作来计算一个文件中每一行文本出现的次数:

    JavaRDD<String> lines = sc.textFile("data.txt");
    JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
    JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
    

    Shuffle Performance Impact

    ​ Shuffle操作时一个非常昂贵的操作,因为它涉及到磁盘I/O,数据序列化,网络I/O。同时也会浪费很多堆内存,还会产生好多中间文件。这个部分简化了,看得不是很懂

    6. RDD持久化-RDD Persistance

    ​ Spark的一个重要能力是持久化或者缓存一个dataset在内存中。当我们持久化一个RDD,每个节点会存储属于这个RDD中的partitions,并且这个持久化的RDD能被多个需要它的action重复使用。这个特点使得在以后执行的action能够更加快速。

    ​ 我们可以使用persist()和cache()方法来持久化一个RDD,这个RDD第一次被计算之后将会被保存到节点的内存中。Spark的持久化是可容错的-如果这个持久化RDD的任何partition丢失了,那么Spark会自动重新去计算。

    ​ 此外,每个持久化RDD可以允许你存储为不同的级别。这些存储级别可以通过StorageLevel得到。

    存储级别 描述
    MEMORY_ONLY
    MEMORY_AND_DISK
    MEMORY_ONLY_SER(Java and Scala)
    MEMORY_AND_DISK_SER(Java and Scala)
    DISK_ONLY
    MEMORY_ONLY_2,MEMORY_AND_DISK_2
    OFF_HEAP(experimental)

    数据删除

    ​ Spark会自动监控缓存信息并且删除老的数据(使用的LRU least-recently-used算法)。如果要手动删除,可以调用RDD.unpersist()方法。

    7. 共享变量-Shared Variables

    ​ 当一个函数被传递给在远程集群节点运行的Spark的操作(比如map或者reduce),函数所用到的变量都是一个独立的副本。这些变量被复制到每个节点,而且在每个节点上的更新不会反馈到driver program上。Spark提供两种方式来限制共享变量:broadcast variablesaccumulators

    7.1 广播变量-Broadcast Variables

    ​ 广播变量程序员缓存一个只读变量在每个机器上,而不是传递副本到每个任务上。他们能被用来以一种有效方式给每个节点传递一个大数据集的拷贝。Spark也通过高效的广播算法来降低广播变量带来的通信消耗。

    ​ 广播变量通过SparkContext.broadcast(v)的方式来创建,广播变量的值可以通过value()方法获得。代码如下:

    Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
    
    broadcastVar.value();
    // returns [1, 2, 3]
    

    ​ 在一个广播变量被创建以后,应该使用broadcastVar而不要继续使用v来操作。此外,为了确保所有的节点得到相同的广播变量值,v的值在广播之后不应该再被修改。

    7.2 Accumulators

    ​ Accumulators变量只能通过联想和交换操作(associative and commutative operation)来执行added操作。Accumulators变量能够用了实现计数和求和。Spark本身只支持数据类型的Accumulators变量,程序员可以自己增加新的实现类型。

    ​ 如果一个Accumulatos变量被创建,那么它能够在Spark的UI中查看到。 Accumulators in the Spark UI

    ​ 一个Accumulator变量可以通过SparkContext.accumulator(v)的方式来创建。然后每个任务可以通过add方法或者+=(这个操作只在Scala和Python中)操作来对他进行操作。但是,每个任务不能都读取Accumulator的值,只有driver program能够读取Accumulator变量的值。

    下面代码用通过Accumulator变量来计算一个数组中所有元素的和:

    LongAccumulator accum = sc.sc().longAccumulator();
    
    sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
    // ...
    // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    accum.value();
    // returns 10
    

    ​ Accumulator变量原生只支持数值类型,程序员可以创建我们自己的Accumulator变量的数据类型,通过实现AccumulatorParam接口。例如:

    class VectorAccumulatorParam implements AccumulatorParam<Vector> {
      public Vector zero(Vector initialValue) {
        return Vector.zeros(initialValue.size());
      }
      public Vector addInPlace(Vector v1, Vector v2) {
        v1.addInPlace(v2); return v1;
      }
    }
    
    // Then, create an Accumulator of this type:
    Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());
    

    5. 结语

    ​ 我也是刚刚接触Spark,这篇文章也是基于官方文档写的。所以可能有很多细节和概念没有写清楚,但是对于Spark的一个基本理解入门,我觉得是可以的。这篇文章中有什么写的不好和不到位的地方,还请大家多多指出来。

    相关文章

      网友评论

          本文标题:Spark2.0 Programming Guide(Spark

          本文链接:https://www.haomeiwen.com/subject/ofgqyttx.html