美文网首页
RDD的几种创建方式

RDD的几种创建方式

作者: ryancao_b9b9 | 来源:发表于2020-05-04 14:36 被阅读0次

    一、RDD概念
    RDD(Resillient Distributed Dataset):弹性分布式数据集,为抽象对象
    RDD可分为多个分区,每个分区分布在集群中的不同节点上(分区即partition),从而让RDD中的数据可以被并行操作
    RDD提供了容错性,可以自动从节点失败中恢复过来。(如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算partition。)
    RDD的数据默认的情况下是存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性的特性)

    二、RDD创建方式
    1、使用程序中的集合创建RDD
    主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程
    2、使用本地文件创建RDD
    主要用于的场景为:在本地临时性地处理一些存储了大量数据的文件
    3、使用HDFS文件创建RDD
    主要可以针对HDFS上存储的大数据,进行离线批处理操作
    4、通过消息源(例如kafka)创建RDD
    主要用于流式处理应用

    三、示例代码
    1、通过parallelize()方法创建
    针对程序中的集合,调用SparkContext中的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的RDD(数据集合)。

    // 并行化创建RDD部分代码 
    // 实现1到5的累加求和
    val arr = Array(1,2,3,4,5)
    val rdd = sc.parallelize(arr)
    val sum = rdd.reduce(_ + _)
    

    注意点:
    在调用parallelize()方法时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2-4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如,parallelize(arr, 10)

    2、通过textFile方法创建
    Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。

    // 实现文件字数统计
    // textFile()方法中,输入本地文件路径或是HDFS路径
    // HDFS:hdfs://spark1:9000/data.txt
    // local:/home/hadoop/data.txt
    val rdd = sc.textFile(“/home/hadoop/data.txt”)
    val wordCount = rdd.map(line => line.length).reduce(_ + _)
    

    一些来创建RDD的特例方法:
    (a)SparkContext的wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回由(fileName,fileContent)组成的pair,即pairRDD,而不是普通的RDD。该方法返回的是文件名字和文件中的具体内容;而普通的textFile()方法返回的RDD中,每个元素就是文本中一行文本。
    (b)SparkContext的sequenceFile<K,V>方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化机制,比如IntWritable、Text等。
    (c)SparkContext的hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。
    (d)SparkContext的objectFile()方法,可以针对之前调用的RDD的saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。

    注意点:
    (a)如果是针对本地文件的话:

    • 如果是在Windows上进行本地测试,windows上有一份文件即可;
    • 如果是在Spark集群上针对Linux本地文件,那么需要将文件拷贝到所有worker节点上(就是在spark-submit上使用—master指定了master节点,使用standlone模式进行运行,而textFile()方法内仍然使用的是Linux本地文件,在这种情况下,是需要将文件拷贝到所有worker节点上的);
      (b)Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建
      (c)Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少

    3、通过DStream对象的foreachRDD创建
    通过DStream对象将流数据按时间窗口进行切分,每个窗口数据为一个Rdd
    部分示例代码:

    kafkaStream.foreachRDD(
            rdd => {
              this.processRdd(rdd)
              val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
              kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
            }
          )
    

    四、参考资料
    1、https://blog.csdn.net/lemonZhaoTao/article/details/77923337

    相关文章

      网友评论

          本文标题:RDD的几种创建方式

          本文链接:https://www.haomeiwen.com/subject/evvlghtx.html