美文网首页
二、spark之RDD

二、spark之RDD

作者: 阿亮私语 | 来源:发表于2019-05-24 09:40 被阅读0次
image.png

引导

我们在前一篇已经学习了spark的相关概念,并写了一个简单的demo,那么我们本篇开始深入的学习spark其中的最核心的一个概念RDD

2.1、什么是RDD?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,一个可并行操作的有容错机制的数据集合,是Spark中最基本的数据抽象,我们就把它当成简单的数据集合。

2.2、怎么使用RDD?

有两种方式创建RDD,

  • 1、第二种是初始化一个集合,这种叫做并行集合。
  • 2、第一种是读取外部数据集,如:共享的文件系统,HDFS,HBase,或者其他的Hadoop数据格式的数据源

两种创建方式都需要一样sc对象,sc同上文为JavaSparkContext
创建方法如下图:

//并行集合
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
//外部文件
JavaRDD<String> lines = sc.textFile("/Users/sunliangliang/Documents/personal/csv/000002.csv");

2.3、RDD操作

spark算子:英文Operator,其实就是操作的意思,我们这里面是指操作运算等等,其实就是指函数化,通过调用函数处理,一个函数可以称之为算子。

RDD的操作分为两种

2.3.1、转化操作

这类称之为Transformation(转换/变换算子),这类操作是延迟计算,即从一个RDD转换成另外一个并不会马上执行,需要等待行动操作的时候才执行。
常见的Transformation算子如下

  • map()
public static void main(String[] args) {
    SparkConf conf = new SparkConf()
            .setAppName("WordCountLocal")
            .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<Integer> datas = sc.parallelize(Arrays.asList(1,2,3,4));

    /**通过map算子实现平方**/
    JavaRDD<Integer> result = datas.map(new Function<Integer, Integer>() {
        @Override
        public Integer call(Integer x) throws Exception {
            return x*x; }
    });
    System.out.println(StringUtils.join(result.collect(),","));

}

运行结果如下图:

1,4,9,16
  • fliter()
JavaRDD<Integer> filter = datas.filter(new Function<Integer, Boolean>() {
    @Override
    public Boolean call(Integer x) throws Exception {
        return x>2;
    }
});


我们可以猜想输出为>2的值

3,4
image.png
  • flatMap()
    这是将每个输入元素生成多个输出元素,拍扁的意思,也就是将每个元素按照格式拆分成一行如下图
public static void main(String[] args) {
    SparkConf conf = new SparkConf()
            .setAppName("WordCountLocal")
            .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> datas = sc.textFile("/Users/sunliangliang/Documents/personal/spark.txt");


    JavaRDD<String> flatMap = datas.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterator<String> call(String s) throws Exception {
            return Arrays.asList(s.split("\\s")).iterator();
        }
    });


    JavaRDD<List<String>> map = datas.map(new Function<String, List<String>>() {
        @Override
        public List<String> call(String s) throws Exception {
            return Arrays.asList(s.split("\\s"));
        }
    });

    System.out.println(StringUtils.join(map.collect(),","));
    System.out.println(StringUtils.join(flatMap.collect(),","));

}

输出结果

[hello, world],[a, new, line],[hello],[the, end]

hello,world,a,new,line,hello,the,end

其中spark.txt中的内容如下

hello world
a new line
hello
the end

我们看到将map是按行拆分,而flapMap 拆成了一个个单词,如下图

image.png

其他常见操作如下图

image.png
2.3.2、行动操作

行动操作主要包含,collect(),reduce(),aggregate()等

  • reduce()
    接收一个函数作为参数,这个函数操作两个相同类型的元素,并返回一个同样类型的数据。最常见的就是叠加等。
public static void reduce(){
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
    int sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer x, Integer y) throws Exception {
            return x+y;
        }
    });
    System.out.println(sum);
}

输出结果如下

15
  • aggregate()
    TODO
    求平均值
2.4、不同类型间的转换
JavaRDD<Integer> datas = sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result = datas.mapToDouble(new DoubleFunction<Integer>() {
    @Override
    public double call(Integer x) throws Exception {
        return (double) x+x;
    }
});
2.5、持久化

通过以下两种方式持久化一个rdd,然后将其保存在美国节点内存。该缓存是一个容错技术。
也就是缓存,有两种方式

  • cache():只是缓存到默认的缓存级别:只使用内存
  • persist():可以自定义缓存级别
    使用方式如下
rdd.persist(StorageLevel.DISK_ONLY());
rdd.cache();

RDDS特性

相关文章

网友评论

      本文标题:二、spark之RDD

      本文链接:https://www.haomeiwen.com/subject/simwzqtx.html