美文网首页
弹性分布式数据集(RDD)

弹性分布式数据集(RDD)

作者: 竞媒体 | 来源:发表于2020-05-20 18:12 被阅读0次

    RDD 不仅是一组不可变的JVM对象的分布集,可以让你执行高速运算。改数据集是分布式的。基于某种关键字,该数据集被划分成块,同时分发到执行器节点。RDD将跟踪(记入日志)应用于每个块的所有转换,以加快计算速度,并在发生错误和部分数据丢失时提供回退。

    内部运行方式:RDD并行操作,每个转换并行执行,从而大大提高速度。数据集转换通常是惰性的。这就意味着任何转换仅在调用数据集上的操作时才执行。

    一个示例数据集:

    1.统计出某一列中不同值出现的次数

    2.选出以字母 t 开头的

    3.将结果打印到屏幕上

    import findspark

    findspark.init()

    from pyspark import SparkContext,SparkConf

    conf = SparkConf().setAppName("wordcount")

    sc =SparkContext(conf=conf)

    text_example = " Hooray! It's snowing! It's time to make a snowman.James runs out. He makes a big pile of sn

    ow. He puts a big snowball on top. He adds a scarf and a hat. He adds an orange for the nose. He adds coal f

    or the eyes and buttons.In the evening, James opens the door. What does he see? The snowman is moving! James

    invites him in. The snowman has never been inside a house. He says hello to the cat. He plays with paper to

    wels.A moment later, the snowman takes James's hand and goes out.They go up, up, up into the air! They are f

    lying! What a wonderful night!The next morning, James jumps out of bed. He runs to the door.He wants to than

    k the snowman. But he's gone."

    wordCount= sc.parallelize(text_example.split(" ")).map(lambda word:(word,1)).filter(lambda val: val[0].startswith('t')).reduceByKey(lambda a, b : a + b

    )

    print(wordCount.collect())

    输出:[('time', 1), ('to', 4), ('top.', 1), ('the', 9), ('towels.A', 1), ('takes', 1), ('thank', 1)]

    创建RDD:

    使用.parallelize(...) 集合(元素list 或 array)

    data = sc.parallelize([('time', 1), ('to', 4), ('top.', 1), ('the', 9), ('towels.A', 1), ('takes', 1), ('thank', 1)])

    引用位于本地或外部的某个文件(或者多个文件)

    text_file = sc.textFile("/root/workdir/charlotte.txt")

    wordCount= text_file.flatMap(lambda line: line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a, b : a + b)

    Schema

    RDD是无schema的数据结构,可以使用任何类型的数据结构:tuple、dict、list。

    data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':1000000},['Spain','visited',4504]]).collect()

    可以访问对象中的数据:data_heterogeous[1]['Porsche']

    .collect()方法把RDD的所有元素返回给驱动程序,驱动程序将其序列化成了一个列表。

    转换

    转换可以调整数据集。包括映射、筛选、连接、转换数据集中的值。

    .map(...)转换

    该方法应用在每个RDD元素上

    .filter(...)转换

    该方法可以让你从数据集中选择元素,该元素符合特定的标准。

    .flatMap(...)转换

    .flatMap(...)返回一个扁平的结果,而不是一个列表。

    .distinct(...)转换

    该方法返回指定列中不同值的列表。

    .sample(...)转换

    该方法返回数据集中的随机样本。

    .leftOuterJoin(...)转换

    根据两个数据集中都有的值来连接两个RDD,并返回左侧的RDD记录,而右边的记录附加在两个RDD匹配的地方。

    .repartition(...)转换

    重新对数据集进行分区,改变了数据集分区的数量。

    操作

    和转换不同,操作执行数据集上的计划任务。一旦完成数据转换,则可以执行相应转换。

    .take(...)方法

    返回单个数据分区的前n行

    .takeSample(...)

    返回随机记录

    .collect(...)

    返回所有RDD的元素给驱动程序。

    .reduce(...)方法

    使用指定的方法减少RDD中的元素。

    .reduceByKey(...)方法

    和.reduce(...)类似,但只在键-键基础上进行。

    .count(...)方法

    统计RDD里的元素数量

    .saveAsTextFile(...)方法

    对RDD执行.saveAsTextFile(...)可以让RDD保存为文本文件:每个文件一个分区。

    .foreach(...)方法

    对RDD里的每个元素,用迭代的方法应用相同的函数。

    总结:RDD是无schema的数据结构,是Spark的核心。Spark中的转换是惰性的,它们只在操作被调用是执行。

    相关文章

      网友评论

          本文标题:弹性分布式数据集(RDD)

          本文链接:https://www.haomeiwen.com/subject/vsenohtx.html