美文网首页
spark学习笔记-RDD(win+java)

spark学习笔记-RDD(win+java)

作者: Legents | 来源:发表于2020-03-09 00:16 被阅读0次

1、def:

​ RDD,弹性分布式数据集(Resilient Distributed Datasets),表示一个只读、分区且不变的数据集合,是spark应用中最核心的部分。spark处理数据时会将一整块数据分割成由多个分块数据组成的数据集(RDD),然后找到多于数据集分块个数的执行器进行数据处理,最终将计算的结果进行汇总。

主要优势 :RDD中的批量错做会根据数据存放的位置来调度任务;对于扫描类型的操作,如果内存不足以缓存整个RDD,就进行部分缓存,避免内存溢出。

2、创建

1)通过已存在的并行集合创建(调用SparkContext的parallelize方法将一个已存在的集合变成RDD):

        //初始化
        private static JavaSparkContext sc;
        //初始化本地配置
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDDDemo");
        //初始化sparkContext对象
        sc = new JavaSparkContext(conf);
        sc.setLogLevel("ERROR");
        // 原始数据转换成RDD
        List<Integer> list1 = Arrays.asList(5, 4, 3, 2, 1, 5);
        JavaRDD<Integer> rdd1 = sc.parallelize(list1);
        List<Integer> list2 = Arrays.asList(3,4,5,6,7,8);
        JavaRDD<Integer> rdd2 = sc.parallelize(list2);
        System.out.println("rdd1原始数据:" + rdd1.collect());
        System.out.println("rdd2原始数据:" + rdd2.collect());

运行结果:

rdd1原始数据:[5, 4, 3, 2, 1, 5]
rdd2原始数据:[3, 4, 5, 6, 7, 8]

​ parallelize方法还有一个参数是分区(Partitions)数量,它可以用来指定数据集的分区个数,例如上述代码中sc.parallelize(list)可以写成sc.parallelize(list,10),其中10就是数据集分区的个数。集群中的每一个分区对应一个spark任务,每一个cpu计算2-4个分区时较好,若不设置spark就会根据集群的 情况来自动设置分区数量,一般默认与cpu核心数相同。

2)从外部数据集(Dataset)创建

​ spark可以从本地文件系统、文本文件、sequenceFiles、HDFS、Cassandra、HBase、Amazon S3以及Hadoop所支持的任何存储源中创建RDD。通过SparkContext的textFile方法将数据源文件转换成RDD,此方法的参数为文件地址。转换后的数据将会以行集合的方式进行存储,例如:

 JavaRDD<String> rdd = sc.textFile("wordCount");
 System.out.println("原始数据:" + rdd.collect());

运行结果:

原始数据:[here, where, my, your, hello, world, test, file, jump, you, can, you, jump]

注:当使用本地文件系统进行读取操作转换时,必须保证所有工作节点在相同路径下能够访问该文件,可以将文件复制到所有工作节点的相同目录下,或者使用共享文件系统。

3、操作

  1. 转换(transformations):在一个已存在的RDD上创建一个新的RDD,但实际的计算并没有执行,仅仅记录操作规程,所有的计算都发生在actions环节。

    • map转换

      依次取出RDD中的每一个元素,传给表达式进行转换,返回转换后的结果。

         /**
           * 对每个元素进行操作(+10),返回一个新的RDD
           */
          public static void map(JavaRDD<Integer> rdd) {
              System.out.println("RDD每个元素加10:" + rdd.map(v -> v + 10).collect());
          }System.out.println("RDD每个元素乘10:" + rdd.map(v -> v + 10).collect());
      

      运行结果:

      RDD每个元素加10:[15, 14, 13, 12, 11, 15]
      
  • filter转换

    返回符合指定过滤条件的元素的列表

       /**
         * 最每个元素进行筛选,返回符合条件的元素组成的一个新RDD
         */
        public static void filter(JavaRDD<Integer> rdd) {
            System.out.println("RDD去掉不为5的元素:" + rdd.filter(v -> v != 5).collect());
        }
    

    运行结果:

    RDD去掉不为5的元素:[4, 3, 2, 1]
    
  • union转换

    求给定RDD的并集∪

       /**
         * union
         */
        public static void union(JavaRDD<Integer> rdd1,JavaRDD<Integer> rdd2){
            System.out.println("rdd1并rdd2:"+rdd1.union(rdd2).collect());
        }
    

    运行结果

    rdd1并rdd2:[5, 4, 3, 2, 1, 5, 3, 4, 5, 6, 7, 8]
    
  • intersection转换

    求交集

       /**
         * 交
         */
        public static void intersection(JavaRDD<Integer> rdd1,JavaRDD<Integer> rdd2){
            System.out.println("rdd1交rdd2:"+rdd1.intersection(rdd2).collect());
        }
    

    运行结果

    rdd1交rdd2:[4, 3, 5]
    
  • distinct转换

    去掉重复值

       /**
         * 去重操作
         */
        public static void distinct(JavaRDD<Integer> rdd) {
            System.out.println("RDD去重操作:" + rdd.distinct().collect());
        }
    

    运行结果

    RDD去重操作:[4, 1, 3, 5, 2]
    
  1. 动作(actions):执行记录的所有transformations操作并计算结果,结果可返回到driver程序,也可保存到相关存储系统中。
  • reduce(f)动作

    对所给的RDD进行聚合

     /**
         * 并行整合RDD中所有数据
         */
        public static void reduce(JavaRDD<Integer> rdd) {
            System.out.println("整合RDD中所有数据(sum):" + rdd.reduce((v1, v2) -> v1 + v2));
        }
    

    运行结果:

    整合RDD中所有数据(sum):20
    
  • collect()动作

    返回一个包含RDD所有元素的list

  • take(num)动作

    返回给定RDD的前n个元素的list

     /**
         * 取出rdd返回num个元素 返回一个list
         */
        public static void take(JavaRDD<Integer> rdd) {
            System.out.println("取出rdd返回2个元素:" + rdd.take(2));
        }
    

    运行结果:

    取出rdd返回2个元素:[5, 4]
    
  • first()动作

    返回RDD中第一个元素的值

     public static void getFirst(JavaRDD<Integer> rdd){
            System.out.println("第一个元素:"+rdd.first());
        }
    

    运行结果

    第一个元素:5
    
  • top(num)动作

    取出RDD中前num个最大值

    public static void getTop(JavaRDD<Integer> rdd){
            System.out.println("前三个"+rdd.top(3));
        }
    

    运行结果

    前三个[5, 5, 4]
    
  • foreach()动作

    遍历RDD

    public static void foreach(JavaRDD<Integer> rdd) {
            System.out.print("foreach:");
            rdd.foreach(t -> System.out.print(t+" "));
        }
    

    运行结果:

    foreach:5 4 3 2 1 5      
    
  • count()动作

    对RDD中所有元素进行计数

    public static void count(JavaRDD<Integer> rdd) {
            System.out.println("统计RDD的所有元素:" + rdd.count());
        }        
    

    运行结果:

    统计RDD的所有元素:6
    
  • countByValue()动作

    按值计数

    /**
         * 每个元素出现的次数
         */
        public static void countByValue(JavaRDD<Integer> rdd) {
            System.out.println("每个元素出现的次数:" + rdd.countByValue());
        }
    

    运行结果

    每个元素出现的次数:{5=2, 1=1, 2=1, 3=1, 4=1}
    

PS :lambda表达式不支持解决方法:

case1:maven配置文件setting.xml中JDK改为8以上:

    <profile>
      <id>jdk-1.8</id>

      <activation>
        <activeByDefault>true</activeByDefault>
        <jdk>1.8</jdk>
      </activation>

       <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    </properties>

      <repositories>
        <repository>
          <id>jdk18</id>
          <name>Repository for JDK 1.8 builds</name>
          <url>http://www.myhost.com/maven/jdk18</url>
          <layout>default</layout>
          <snapshotPolicy>always</snapshotPolicy>
        </repository>
      </repositories>
    </profile>

case2:

file --> Project Structure -->modules

将language level改为8以上

apply-->ok

感谢:

https://www.cnblogs.com/diaozhaojian/p/9152530.html

https://www.jianshu.com/p/d573573dd97f

https://www.cnblogs.com/dhrwawa/p/10981167.html

https://blog.csdn.net/wolf2s/article/details/78958275?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

相关文章

  • spark学习笔记-RDD(win+java)

    1、def: ​ RDD,弹性分布式数据集(Resilient Distributed Dataset...

  • Spark Architecture

    OReilly.Learning.Spark 学习笔记 Spark里所有操作都是对RDD来的。分为两种 1. Tr...

  • Spark RDD学习笔记

    一、学习Spark RDD RDD是Spark中的核心数据模型,一个RDD代表着一个被分区(partition)的...

  • spark任务执行过程

    ​ 在学习了Spark RDD和RDD操作之后,是不是很想快点写个Spark程序来巩固一下所学的知识。学习大数...

  • Spark入门(Python)--1.1 RDD基础

    该系列spark学习笔记基于Python Spark. RDD(弹性分布式数据集)是一个不可变的分布式对象集合,可...

  • Spark RDD 笔记

    本文内容是 是学习 范东来《Spark 课程》 笔记 RDD 不可变, 只读,经过变化会生成新的对象 弹性 表...

  • 【Spark学习笔记】RDD篇

    每个Spark应用都由一个驱动器程序(driver program)(例如Spark Shell本身)来发起集群上...

  • 【Spark学习笔记】详解RDD

    1.Driver program 包含程序的main()方法,RDDs的定义和操作。它管理很多节点,我们称为exe...

  • Spark学习笔记(1)RDD

    RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最...

  • Spark RDD Api使用指南

    ​ 在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换和行动操作。在进行spark程...

网友评论

      本文标题:spark学习笔记-RDD(win+java)

      本文链接:https://www.haomeiwen.com/subject/epasdhtx.html