1.spark基础-RDD

作者: 水墨点滴 | 来源:发表于2019-07-18 20:51 被阅读0次

    1.RDD创建

    Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集,这个存储系统可以是一个共享文件系统,比如HDFS、HBase或任意提供了Hadoop输入格式的数据来源。

    (1) RDD的创建—— 并行化集合
    并行化集合是通过在驱动程序中一个现有的迭代器或集合上调用SparkContext的parallelize方法建立的。为了创建一个能够并行操作的分布数据集,集合中的元素都会被拷贝

    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)    #建立了分布数据集,可以进行一些并行的操作
    

    并行化中可以自己设置数据集划分成分片的数量(一般是spark集群自动进行设定的),比如sc.parallelize(data, 10)

    (2)外部数据集
    PySpark可以通过Hadoop支持的外部数据源(包括本地文件系统、HDFS、 Cassandra、HBase、亚马逊S3等等)建立分布数据集。Spark支持文本文件、序列文件以及其他任何Hadoop输入格式文件.

    (1)通过文本文件创建RDD要使用SparkContext的textfile方法

    from pyspark import SparkContext
    
    if __name__ == "__main__":
        sc = SparkContext(appName="zzz_KMeans")
        #调用文件的url/本地文件路径等
        lines = sc.textFile("your_hdfs_path")
    

    注意

    • 包括textFile在内的所有基于文件的Spark读入方法,都支持将文件夹、压缩文件、包含通配符的路径作为参数
    • textFile方法也可以传入第二个可选参数来控制文件的分片数量。默认情况下,Spark会为文件的每一个块(在HDFS中块的大小默认是64MB)创建一个分片。但是你也可以通过传入一个更大的值来要求Spark建立更多的分片。注意,分片的数量绝不能小于文件块的数量。

    (3)其他

    除了文本文件之外,pyspark还支持一些其他的数据格式

    • SparkContext.wholeTextFiles能够读入包含多个小文本文件的目录,然后为每一个文件返回一个(文件名,内容)对。这是与textFile方法为每一个文本行返回一条记录相对应的。
    • RDD.saveAsPickleFile和SparkContext.pickleFile支持将RDD以串行化的Python对象格式存储起来。串行化的过程中会以默认10个一批的数量批量处理。
    • 序列文件和其他Hadoop输入输出格式。

    数据库

    2.RDD基本操作

    RDD的操作,整体上分为两类: 转化操作和启动操作

    转化操作

    • 都是惰性求值的,就是说它们并不会立刻真的计算出结果。相反,它们仅仅是记录下了转换操作的操作对象(比如:一个文件)。只有当一个启动操作被执行,要向驱动程序返回结果时,转化操作才会真的开始计算
    • 每一个由转化操作得到的RDD都会在每次执行启动操作时重新计算生成。也可以调用persist或者cache方法将RDD永久化到内存中。

    启动操作

    常见的转化操作:

    转化操作 作用
    map(func) 返回一个新的分布数据集,由原数据集元素经func处理后的结果组成
    filter(func) 返回一个新的数据集,由传给func返回True的原数据集元素组成
    flatMap(func) 与map类似,但是每个传入元素可能有0或多个返回值,func可以返回一个序列而不是一个值
    mapParitions(func) 类似map,但是RDD的每个分片都会分开独立运行,所以func的参数和返回值必须都是迭代器
    mapParitionsWithIndex(func) 类似mapParitions,但是func有两个参数,第一个是分片的序号,第二个是迭代器。返回值还是迭代器
    sample(withReplacement, fraction, seed) 使用提供的随机数种子取样,然后替换或不替换
    union(otherDataset) 返回新的数据集,包括原数据集和参数数据集的所有元素
    intersection(otherDataset) 返回新数据集,是两个集的交集
    distinct([numTasks]) 返回新的集,包括原集中的不重复元素
    groupByKey([numTasks]) 当用于键值对RDD时返回(键,值迭代器)对的数据集
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 用于键值对RDD时返回(K,U)对集,对每一个Key的value进行聚集计算sortByKey([ascending], [numTasks])用于键值对RDD时会返回RDD按键的顺序排序,升降序由第一个参数决定
    join(otherDataset, [numTasks]) 用于键值对(K, V)和(K, W)RDD时返回(K, (V, W))对RDD
    cogroup(otherDataset, [numTasks]) 用于两个键值对RDD时返回(K, (V迭代器, W迭代器))RDD
    cartesian(otherDataset) 用于T和U类型RDD时返回(T, U)对类型键值对RDD
    pipe(command, [envVars]) 通过shell命令管道处理每个RDD分片
    coalesce(numPartitions) 把RDD的分片数量降低到参数大小
    repartition(numPartitions) 重新打乱RDD中元素顺序并重新分片,数量由参数决定
    repartitionAndSortWithinPartitions(partitioner) 按照参数给定的分片器重新分片,同时每个分片内部按照键排序

    常见的启动操作:

    启动操作 作用
    reduce(func) 使用func进行聚集计算,func的参数是两个,返回值一个,两次func运行应当是完全解耦的,这样才能正确地并行运算
    collect() 向驱动程序返回数据集的元素组成的数组
    count() 返回数据集元素的数量
    first() 返回数据集的第一个元素
    take(n) 返回前n个元素组成的数组
    takeSample(withReplacement, num, [seed]) 返回一个由原数据集中任意num个元素的suzuki,并且替换之
    takeOrder(n, [ordering]) 返回排序后的前n个元素
    saveAsTextFile(path) 将数据集的元素写成文本文件
    saveAsSequenceFile(path) 将数据集的元素写成序列文件,这个API只能用于Java和Scala程序
    saveAsObjectFile(path) 将数据集的元素使用Java的序列化特性写到文件中,这个API只能用于Java和Scala程序
    countByCount() 只能用于键值对RDD,返回一个(K, int) hashmap,返回每个key的出现次数
    foreach(func) 对数据集的每个元素执行func, 通常用于完成一些带有副作用的函数,比如更新累加器(见下文)或与外部存储交互等

    RDD持久化

    主要用的两个方法persistcache

    • Spark的一个重要功能就是在将数据集持久化(或缓存)到内存中以便在多个操作中重复使用。当我们持久化一个RDD是,每一个节点将这个RDD的每一个分片计算并保存到内存中以便在下次对这个数据集(或者这个数据集衍生的数据集)的计算中可以复用。这使得接下来的计算过程速度能够加快(经常能加快超过十倍的速度).

    • 每一个持久化的RDD都有一个可变的存储级别,这个级别使得用户可以改变RDD持久化的储存位置.

    • Spark会自动监视每个节点的缓存使用同时使用LRU算法丢弃旧数据分片。如果你想手动删除某个RDD而不是等待它被自动删除,调用RDD.unpersist()方法。

    共享变量

    • 广播变量
    • 累加器

    3.RDD分区

    有时候需要重新设置Rdd的分区数量:

    • 比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。
    • 还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。

    有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()

    spark中的数据是分布式的

    4.数据输出/保存

    rdd.saveAsTextFile()

    【参考资料】

    相关文章

      网友评论

        本文标题:1.spark基础-RDD

        本文链接:https://www.haomeiwen.com/subject/mzvrlctx.html