美文网首页spark学习
Spark之RDD基础学习

Spark之RDD基础学习

作者: 风之舟 | 来源:发表于2019-05-26 20:06 被阅读0次

    一、RDD概念

    RDD(Resilient Distributed Dataset):弹性分布式数据集,Spark计算的基石,为用户屏蔽了底层对数据的复杂抽象和处理,为用户提供了一组方便的数据转换与求值方法。

    1、RDD是不可变的,如果需要在一个RDD上进行转换操作,则会生成一个新的RDD

    2、RDD是分区的,RDD里面的具体数据是分布在多台机器上的Executor里面的。堆内内存和堆外内存+磁盘。

    3、RDD是弹性的:

    a、存储:Spark会根据用户的配置或者当前Spark的应用运行情况去自动将RDD的数据缓存到内存或者磁盘。它是一个对用户不可见的封装的功能。

    b、容错:当你的RDD数据被删除或者丢失的时候,可以通过血统或者检查点机制恢复数据,这个对用户透明的。

    c、计算:计算是分层的,有应用->Job->Stage->TaskSet->Task每一层都有对应的计算的保障与重复机制,保障你的计算不会由于一些突发因素而终止。

    d、分片:可以根据业务需求或者一些算子来重新调整RDD中的数据分布。

    其中Spark Core就是在操作RDD

    RDD的创建->RDD的转换->RDD的缓存->RDD的行动->RDD的输出

    二、RDD的创建

    创建RDD有三种方式:

    1、可以从一个Scala集合里面创建:

    sc.parallelize(seq):把seq这个数据并行化分片到节点

    sc.makeRDD(seq):把seq这个数据并行化分片到节点,它的实现就是parallelize

    sc.makeRDD(seq[(T,seq)]):这种方式可以指定RDD的存放位置

    2、从外部存储来创建,比如sc.textFile("path")

    3、从另外一个RDD转换过来

    三、RDD的操作

    RDD中操作分为两大类型:转换(transformation)和行动(action)

    转换:通过操作将RDD转换成另外一个RDD

    行动:将一个RDD进行求值或者输出

    所有这些操作主要针对两种类型的RDD:数值RDD和键值对RDD

    RDD的所有转换操作都是懒执行的,只有当行动操作出现的时候spark才会真的去运行


    常见的转换操作:

    1、def map[U: ClassTag](f: T => U): RDD[U]   将函数应用于RDD的每一元素,并返回一个新的RDD

    2、def filter(f: T => Boolean): RDD[T]        通过提供的产生boolean条件的表达式来返回符合结果为True的新的RDD

    3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]   将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。

    4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]    将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。

    5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning:Boolean = false): RDD[U]  将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。

    6、def sample(withReplacement:Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。

    7、def union(other: RDD[T]): RDD[T]  将两个RDD中的元素进行合并,返回一个新的RDD

    8、def intersection(other:RDD[T]): RDD[T]  将两个RDD做交集,返回一个新的RDD

    9、def distinct(): RDD[T]  将当前RDD进行去重后,返回一个新的RDD

    10、def partitionBy(partitioner:Partitioner): RDD[(K, V)]  根据设置的分区器重新将RDD进行分区,返回新的RDD。

    11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]   根据Key值将相同Key的元组的值用func进行计算,返回新的RDD

    12、def groupByKey(): RDD[(K, Iterable[V])]   将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD

    13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]   根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。

    14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner:Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]   通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。

    15、def foldByKey(zeroValue:V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]  aggregateByKey的简化操作,seqop和combop相同,

    16、def sortByKey(ascending:Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]   在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    17、def sortBy[K](f: (T) => K,ascending: Boolean =true,numPartitions: Int =this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]   底层实现还是使用sortByKey,只不过使用fun生成的新key进行排序。

    18、def join[W](other: RDD[(K, W)], partitioner: Partitioner):RDD[(K, (V, W))]  在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,但是需要注意的是,他只会返回key在两个RDD中都存在的情况。

    19、def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner):RDD[(K, (Iterable[V], Iterable[W]))]  在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD,注意,如果V和W的类型相同,也不放在一块,还是单独存放。

    20、def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]  做两个RDD的笛卡尔积,返回对偶的RDD

    21、def pipe(command: String): RDD[String]   对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD,注意,如果你是本地文件系统中,需要将脚本放置到每个节点上。

    22、def coalesce(numPartitions:Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null) : RDD[T]  缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

    23、def repartition(numPartitions:Int)(implicit ord: Ordering[T] = null): RDD[T] 根据你传入的分区数重新通过网络分区所有数据,重型操作。

    24、def repartitionAndSortWithinPartitions(partitioner:Partitioner): RDD[(K, V)]  性能要比repartition要高。在给定的partitioner内部进行排序

    25、def glom(): RDD[Array[T]] 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

    26、def mapValues[U](f: V => U): RDD[(K, U)]   将函数应用于(k,v)结果中的v,返回新的RDD

    27、def subtract(other: RDD[T]): RDD[T] 计算差的一种函数去除两个RDD中相同的元素,不同的RDD将保留下来。

    常见的行动操作:

    1、def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 抽样但是返回一个scala集合。

    2、def reduce(f: (T, T) => T): T   通过func函数聚集RDD中的所有元素

    3、def collect(): Array[T]  在驱动程序中,以数组的形式返回数据集的所有元素

    4、def count():Long   返回RDD中的元素个数

    5、def first():T   返回RDD中的第一个元素

    6、def take(num: Int): Array[T]  返回RDD中的前n个元素

    7、def takeOrdered(num:Int)(implicit ord: Ordering[T])   返回前几个的排序

    8、def aggregate[U: ClassTag](zeroValue:U)(seqOp: (U,T) =>U, combOp: (U,U) =>U):U  aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    9、def fold(zeroValue:T)(op: (T,T) =>T):T  折叠操作,aggregate的简化操作,seqop和combop一样。

    10、def saveAsTextFile(path:String):Unit   将RDD以文本文件的方式保存到本地或者HDFS中

    11、def saveAsObjectFile(path: String):Unit  将RDD中的元素以序列化后对象形式保存到本地或者HDFS中。

    12、def countByKey(): Map[K, Long]  针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

    13、def foreach(f: T =>Unit):Unit  在数据集的每一个元素上,运行函数func进行更新。

    注意当你在RDD中使用到了class的方法或者属性的时候class需要继承java.io.Serializable接口或者可以将属性赋值为本地变量来防止整个对象的传输


    四、Spark应用提交与调试

    1、进入到spark安装目录的bin,调用Spark-submit脚本

    2、在脚本后面传入参数(部分)

    --class:应用的启动类

    --master:集群master的URL(应用运行的模式)

    --deploy-mode:是否发布你的驱动到worker节点(cluster)或者作为一个本地客户端(client)(default:client)(提示:client:在master上面新建一个jvm(driver运行其中负责调度回收结果等功能),在worker中申请executor,是jvm独有的;cluster:在其中一个worker上申请excutor和jvm,在其他的worker申请的executor)

    --conf:任意的spark配置属性,格式key=vlaue,如果值包含空格,可以加引号“key=value”

    application-jar:打包好的应用jar,包含依赖,这个URL在集群中全局可见。比如hdfs://共享存储系统,如果是file://path,那么所有节点的path都包含同样的jar

    application-arguments:传给main()方法的参数

    应用运行的模式

    五、Spark调试

    1、本地调试

    【以单节点的方式来运行整个Spark应用】

        a、写好你的程序

        b、将master设置为local或者local[n]

        c、如果你用到了HDFS,可能会遇到winuntils找不到的问题,需要将HADOOP_HOME环境变量加入到IDEA中。

        d、打断点

        e、Debug模式运行你的程序

    2、远程调试

    【把IDEA当做你的Driver来运行,保持和整个Spark集群的连接关系】

        前提:你的本机和Spark集群是在同一网段。

        a、写好你的程序

        b、修改Master地址为你的Spark集群的地址

        c、将最终需要运行的jar包加入到setJars方法中

        d、设置你的本地地址到spark.driver.host 这个变量里面

        e、打断点

        f、Debug模式运行你的程序

    相关文章

      网友评论

        本文标题:Spark之RDD基础学习

        本文链接:https://www.haomeiwen.com/subject/rxiazqtx.html