1、def:
RDD,弹性分布式数据集(Resilient Distributed Datasets),表示一个只读、分区且不变的数据集合,是spark应用中最核心的部分。spark处理数据时会将一整块数据分割成由多个分块数据组成的数据集(RDD),然后找到多于数据集分块个数的执行器进行数据处理,最终将计算的结果进行汇总。
主要优势 :RDD中的批量错做会根据数据存放的位置来调度任务;对于扫描类型的操作,如果内存不足以缓存整个RDD,就进行部分缓存,避免内存溢出。
2、创建
1)通过已存在的并行集合创建(调用SparkContext的parallelize方法将一个已存在的集合变成RDD):
//初始化
private static JavaSparkContext sc;
//初始化本地配置
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDDDemo");
//初始化sparkContext对象
sc = new JavaSparkContext(conf);
sc.setLogLevel("ERROR");
// 原始数据转换成RDD
List<Integer> list1 = Arrays.asList(5, 4, 3, 2, 1, 5);
JavaRDD<Integer> rdd1 = sc.parallelize(list1);
List<Integer> list2 = Arrays.asList(3,4,5,6,7,8);
JavaRDD<Integer> rdd2 = sc.parallelize(list2);
System.out.println("rdd1原始数据:" + rdd1.collect());
System.out.println("rdd2原始数据:" + rdd2.collect());
运行结果:
rdd1原始数据:[5, 4, 3, 2, 1, 5]
rdd2原始数据:[3, 4, 5, 6, 7, 8]
parallelize方法还有一个参数是分区(Partitions)数量,它可以用来指定数据集的分区个数,例如上述代码中sc.parallelize(list)可以写成sc.parallelize(list,10),其中10就是数据集分区的个数。集群中的每一个分区对应一个spark任务,每一个cpu计算2-4个分区时较好,若不设置spark就会根据集群的 情况来自动设置分区数量,一般默认与cpu核心数相同。
2)从外部数据集(Dataset)创建
spark可以从本地文件系统、文本文件、sequenceFiles、HDFS、Cassandra、HBase、Amazon S3以及Hadoop所支持的任何存储源中创建RDD。通过SparkContext的textFile方法将数据源文件转换成RDD,此方法的参数为文件地址。转换后的数据将会以行集合的方式进行存储,例如:
JavaRDD<String> rdd = sc.textFile("wordCount");
System.out.println("原始数据:" + rdd.collect());
运行结果:
原始数据:[here, where, my, your, hello, world, test, file, jump, you, can, you, jump]
注:当使用本地文件系统进行读取操作转换时,必须保证所有工作节点在相同路径下能够访问该文件,可以将文件复制到所有工作节点的相同目录下,或者使用共享文件系统。
3、操作
-
转换(transformations):在一个已存在的RDD上创建一个新的RDD,但实际的计算并没有执行,仅仅记录操作规程,所有的计算都发生在actions环节。
-
map转换
依次取出RDD中的每一个元素,传给表达式进行转换,返回转换后的结果。
/** * 对每个元素进行操作(+10),返回一个新的RDD */ public static void map(JavaRDD<Integer> rdd) { System.out.println("RDD每个元素加10:" + rdd.map(v -> v + 10).collect()); }System.out.println("RDD每个元素乘10:" + rdd.map(v -> v + 10).collect());
运行结果:
RDD每个元素加10:[15, 14, 13, 12, 11, 15]
-
-
filter转换
返回符合指定过滤条件的元素的列表
/** * 最每个元素进行筛选,返回符合条件的元素组成的一个新RDD */ public static void filter(JavaRDD<Integer> rdd) { System.out.println("RDD去掉不为5的元素:" + rdd.filter(v -> v != 5).collect()); }
运行结果:
RDD去掉不为5的元素:[4, 3, 2, 1]
-
union转换
求给定RDD的并集∪
/** * union */ public static void union(JavaRDD<Integer> rdd1,JavaRDD<Integer> rdd2){ System.out.println("rdd1并rdd2:"+rdd1.union(rdd2).collect()); }
运行结果
rdd1并rdd2:[5, 4, 3, 2, 1, 5, 3, 4, 5, 6, 7, 8]
-
intersection转换
求交集
/** * 交 */ public static void intersection(JavaRDD<Integer> rdd1,JavaRDD<Integer> rdd2){ System.out.println("rdd1交rdd2:"+rdd1.intersection(rdd2).collect()); }
运行结果
rdd1交rdd2:[4, 3, 5]
-
distinct转换
去掉重复值
/** * 去重操作 */ public static void distinct(JavaRDD<Integer> rdd) { System.out.println("RDD去重操作:" + rdd.distinct().collect()); }
运行结果
RDD去重操作:[4, 1, 3, 5, 2]
- 动作(actions):执行记录的所有transformations操作并计算结果,结果可返回到driver程序,也可保存到相关存储系统中。
-
reduce(f)动作
对所给的RDD进行聚合
/** * 并行整合RDD中所有数据 */ public static void reduce(JavaRDD<Integer> rdd) { System.out.println("整合RDD中所有数据(sum):" + rdd.reduce((v1, v2) -> v1 + v2)); }
运行结果:
整合RDD中所有数据(sum):20
-
collect()动作
返回一个包含RDD所有元素的list
-
take(num)动作
返回给定RDD的前n个元素的list
/** * 取出rdd返回num个元素 返回一个list */ public static void take(JavaRDD<Integer> rdd) { System.out.println("取出rdd返回2个元素:" + rdd.take(2)); }
运行结果:
取出rdd返回2个元素:[5, 4]
-
first()动作
返回RDD中第一个元素的值
public static void getFirst(JavaRDD<Integer> rdd){ System.out.println("第一个元素:"+rdd.first()); }
运行结果
第一个元素:5
-
top(num)动作
取出RDD中前num个最大值
public static void getTop(JavaRDD<Integer> rdd){ System.out.println("前三个"+rdd.top(3)); }
运行结果
前三个[5, 5, 4]
-
foreach()动作
遍历RDD
public static void foreach(JavaRDD<Integer> rdd) { System.out.print("foreach:"); rdd.foreach(t -> System.out.print(t+" ")); }
运行结果:
foreach:5 4 3 2 1 5
-
count()动作
对RDD中所有元素进行计数
public static void count(JavaRDD<Integer> rdd) { System.out.println("统计RDD的所有元素:" + rdd.count()); }
运行结果:
统计RDD的所有元素:6
-
countByValue()动作
按值计数
/** * 每个元素出现的次数 */ public static void countByValue(JavaRDD<Integer> rdd) { System.out.println("每个元素出现的次数:" + rdd.countByValue()); }
运行结果
每个元素出现的次数:{5=2, 1=1, 2=1, 3=1, 4=1}
PS :lambda表达式不支持解决方法:
case1:maven配置文件setting.xml中JDK改为8以上:
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
<repositories>
<repository>
<id>jdk18</id>
<name>Repository for JDK 1.8 builds</name>
<url>http://www.myhost.com/maven/jdk18</url>
<layout>default</layout>
<snapshotPolicy>always</snapshotPolicy>
</repository>
</repositories>
</profile>
case2:
file --> Project Structure -->modules
将language level改为8以上
apply-->ok
感谢:
https://www.cnblogs.com/diaozhaojian/p/9152530.html
https://www.jianshu.com/p/d573573dd97f
网友评论