美文网首页
Spark学习笔记

Spark学习笔记

作者: 锦绣拾年 | 来源:发表于2020-03-29 21:38 被阅读0次

内容来自《Spark快速大数据分析》一本很好的书

分布式和集群

集群:多台机器处理相同工作。

分布式:多台机器处理同一大工作的不同流程。

大数据核心问题

https://www.zhihu.com/question/27974418

大数据具有“4V”特性,这4V即数据量大、类型多、价值密度低、速度快时效高这样四个特点 需要处理四个核心问题。

存储,海量的数据怎样有效的存储?主要包括hdfs、Kafka;

计算,海量的数据怎样快速计算?主要包括MapReduce、Spark、Flink等;

查询,海量数据怎样快速查询?主要为Nosql和Olap,Nosql主要包括Hbase、 Cassandra 等,其中olap包括kylin、impla等,其中Nosql主要解决随机查询,Olap技术主要解决关联查询;

挖掘,海量数据怎样挖掘出隐藏的知识?也就是当前火热的机器学习和深度学习等技术,包括TensorFlow、caffe、mahout等

spark

从上层来看,每个 Spark 应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的 main 函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。在前面的例子里,实际的驱动器程序就是 Spark shell 本身,你只需要输入想要运行的操作就可以了。
驱动器程序通过一个 SparkContext 对象来访问 Spark。这个对象代表对计算集群的一个连接。shell 启动时已经自动创建了一个 SparkContext 对象,是一个叫作 sc 的变量。我们可以通过例 2-3 中的方法尝试输出 sc 来查看它的类型。

一旦有了 SparkContext,你就可以用它来创建 RDD。在例 2-1 和例 2-2 中,我们调用了sc.textFile() 来创建一个代表文件中各行文本的 RDD。我们可以在这些行上进行各种操作,比如 count() 。

spark-RDD

本章介绍 Spark 对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)。RDD 其实就是分布式的元素集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。

创建出来后,RDD 支持两种类型的操作:转化操作(transformation)和行动操作(action)。

转化操作会由一个 RDD 生成一个新的 RDD。转化操作会由一个 RDD 生成一个新的 RDD。例如,根据谓词匹配情况筛选数据就是一个常见的转化操作。在我们的文本文件示例中,我们可以用筛选来生成一个只存储包含单词 Python 的字符串的新的 RDD。

比如 map() 和 filter()

行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。 first() 就是我们之前调用的一个行动操作,它会返回 RDD 的第一个元素。【向驱动器程序返回结果或
把结果写入外部系统的操作】

比如count() 和 first()

转化操作和行动操作的区别在于 Spark 计算 RDD 的方式不同。虽然你可以在任何时候定义新的 RDD,但 Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。

最后,默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。我们可以让 Spark 把数据持久化到许多不同的地方,可用的选项会在表 3-6 中列出。在第一次对持久化的 RDD 计算之后,Spark 会把 RDD 的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。我们也可以把 RDD 缓存到磁盘上而不是内存中。

总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。
(1) 从外部数据创建出输入 RDD。
(2) 使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。
(3) 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。
(4) 使用行动操作(例如 count() 和 first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。

创建RDD

Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

创建 RDD 最简单的方式就是把程序中一个已有的集合传给 SparkContext 的 parallelize()方法,学习时常用,可以快速生成自己的RDD,但是在开发是用得不多,因为需要把数据集放在机器内存里。

例 3-8:Python 中的 textFile() 方法
lines = sc.textFile("/path/to/README.md")
例 3-9:Scala 中的 textFile() 方法
val lines = sc.textFile("/path/to/README.md")
例 3-10:Java 中的 textFile() 方法
JavaRDD<String> lines = sc.textFile("/path/to/README.md");

RDD操作

如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

转化操作

许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作 RDD 中的一个元素。不过并不是所有的转化操作都是这样的。
举个例子,假定我们有一个日志文件 log.txt,内含有若干消息,希望选出其中的错误消息。我们可以使用前面说过的转化操作 filter() 。

注意, filter() 操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的 RDD。 inputRDD 在后面的程序中还可以继续使用,比如我们还可以从中搜索别的单词。事实上,要再从 inputRDD 中找出所有包含单词 warning 的行。接下来,我们使用另一个转化操作 union() 来打印出包含 error 或 warning 的行数。

union起合并的作用。

union() 与 filter() 的不同点在于它操作两个 RDD 而不是一个。转化操作可以操作任意数量的输入 RDD。

行动操作

行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。

RDD take,获取少量数据。还有一个 collect() 函数,可以用来获取整个 RDD 中的数据。

记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect() ,因此, collect() 不能用在大规模数据集上。

在大多数情况下,RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。此时,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可以使用 saveAsTextFile() 、saveAsSequenceFile() ,或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来。

中间结果可以持久化。

惰性求值

RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。

RDD使用

传递函数

Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。

转化操作

map :用于RDD中每个元素。

filter:将RDD中满足条件的元素放入新RDD中。

有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap() 。和 map() 类似,我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器(使用迭代器进行访问得到想要的结果)。

flatMap() 的一个简单用途是把输入的字符串切分为单词。

尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。


1.png
行动操作

最常见的行动操作 reduce() 。它接收一个函数作为参数,这个函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就
是函数 + ,可以用它来对我们的 RDD 进行累加。

2.png

键值对操作

3.png 4.png
5.png

Pair RDD 也还是 RDD(元素为 Java 或 Scala 中的 Tuple2 对象或 Python 中的元组),因此同样支持 RDD 所支持的函数。

pairRDD转化操作

聚合操作

reduceByKey() 与 reduce() 相当类似;它们都接收一个函数,并使用该函数对值进行合并。
reduceByKey() 会为数据集中的每个键进行并行的归约操作,每个归约操作会将键相同的值合并起来。因为数据集中可能有大量的键,所reduceByKey() 没有被实现为向用户程序返回一个值的行动操作。实际上,它会返回一个由各键和对应键归约出来的结果值组成的新的 RDD。
foldByKey() 则与 fold() 相当类似;它们都使用一个与 RDD 和合并函数中的数据类型相同的零值作为初始值。与 fold() 一样, foldByKey() 操作所使用的合并函数对零值与另一个元素进行合并,结果仍为该元素。

并行度调优

每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作时的并行度。本章讨论的大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的RDD 的分区数。

sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定义并行度

对于这样的情况,Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作。Spark 中也有一个优化版的 repartition() ,叫作 coalesce() 。你可以使用 Java 或 Scala 中的 rdd.partitions.size() 以及 Python 中的 rdd.getNumPartitions 查看 RDD 的分区数,并确保调用 coalesce() 时将 RDD 合并到比现在的分区数更少的分区中。

分组&连接&排序

Pair RDD的行动操作

和转化操作一样,所有基础 RDD 支持的传统行动操作也都在 pair RDD 上可用。Pair RDD提供了一些额外的行动操作,可以让我们充分利用数据的键值对特性。

6.png

数据分区

本章要讨论的最后一个 Spark 特性是对数据集在节点间的分区进行控制。Spark 程序可以通过控制RDD 分区方式来减少通信开销。

举例:

举个简单的例子,我们分析这样一个应用,它在内存中保存着一张很大的用户信息表——也就是一个由 (UserID, UserInfo) 对组成的 RDD,其中 UserInfo 包含一个该用户所订阅的主题的列表。该应用会周期性地将这张表与一个小文件进行组合,这个小文件中存着过去五分钟内发生的事件——其实就是一个由 (UserID, LinkInfo) 对组成的表,存放着过去五分钟内某网站各用户的访问情况。例如,我们可能需要对用户访问其未订阅主题的页面
的情况进行统计。我们可以使用 Spark 的 join() 操作来实现这个组合操作,其中需要把UserInfo 和 LinkInfo 的有序对根据 UserID 进行分组。

// 初始化代码;从HDFS商的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性调用函数来处理过去五分钟产生的事件日志
// 假设这是一个包含(UserID, LinkInfo)对的SequenceFile
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

代码练习

collect,显示rdd变量的内容

理解:parallelize,一堆需要处理的数据集和,默认可进行分区处理,也可主动分区处理。

rdd.partitions可以看到被分到几个分区,分区的概念之后细说。

转化操作-》1.对value RDD进行转化。2.对pair RDD进行转化。-》

对一个RDD进行变换,对两个RDD进行变换。

有很多函数,可参考JavaRDD网站

去重运算-》没有改变分区数目-》但是这其中难道不应有一个合并对比操作??或者类似操作。

单元素value-map

对于数值型rdd,很像一个列表。

filter函数,以某种方法过滤出列表出需要的元素。

flatmap函数,可以把一维列表转化为二维列表。[1,2,3]->[[1,1,1],[2,2,2],[3,3,3]]

pipe 对分区中的rdd数值进行操作。

sample函数:抽样生成rdd

sortBy函数 排序并可确定升序和降序。

双元素value-map

双元素-》两个列表

可以求笛卡尔积,交集,并集,补集

可以联结两个列表 zip [1,2,3],[2,2,2]->[(1,2),(2,2),(3,2)]

单元素pair-map

用map可以轻易生成键值对 map ->(x=>(x(0),x))

keyby(_) key的名,以什么生成键

combinebykey 依据key聚合元素。(可考虑最后汇总)不去重。

同一个键的组成一个pair对。

groupbykey 同一个键的键值对组合在一起,一个集合。

key 提取key成为新rdd

mapvalues value进行变换

flatmapvalue 进行flatmap

处理嵌套json

https://blog.csdn.net/qq_21439395/article/details/80710180

http://www.lubinsu.com/index.php/archives/493

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import scala.util.parsing.json.JSON

object jsonex1 {
  def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("JSONApp");
        //通过conf来创建sparkcontext
        val sc = new SparkContext(conf);
 
        val inputFile = "tmp.json"//读取json文件
        val jsonStr = sc.textFile(inputFile);
        val result = jsonStr.map(s => JSON.parseFull(s))
            val ss = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    ss.read.json("tmp.json").createOrReplaceTempView("user")
    val rs = ss.sql("select user.hits from user")//返回dataframe
    rs.printSchema()
    rs.show()
  }
}

UDF使用

可以使用自定义方法,在SQL语句和列。

写的比较好的博客:

https://www.cnblogs.com/cc11001100/p/9463909.html

https://www.jianshu.com/p/b1e9d5cc6193

Scala匿名函数

var inc =(x:Int)=>x+1
def add2 = new Function1[Int,Int]{
    def apply(x:Int):Int = x+1
}

HDFS

读文件 分布式文件存储系统。

MapReduce 框架的核心步骤主要分两部分:Map 和 Reduce。当你向 MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个 Map 任务,然后分配到不同的节点上去执行,每一个 Map 任务处理输入数据中的一部分,当 Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为 Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个 Map 的输出汇总到一起并输出。

下载气象数据集部分数据,写一个 Map-Reduce 作业,求每年的最低温度。

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MinTemperature {

    public static void main(String[] args) throws Exception {
        if(args.length != 2) {
            System.err.println("Usage: MinTemperature<input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(MinTemperature.class);
        job.setJobName("Min temperature");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MinTemperatureMapper.class);
        job.setReducerClass(MinTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String year = line.substring(15, 19);

        int airTemperature;
        if(line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }

        String quality = line.substring(92, 93);
        if(airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int minValue = Integer.MAX_VALUE;
        for(IntWritable value : values) {
            minValue = Math.min(minValue, value.get());
        }
        context.write(key, new IntWritable(minValue));
    }
}

相关文章

  • spark 学习笔记

    Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...

  • spark

    *Spark Spark 函数Spark (Python版) 零基础学习笔记(一)—— 快速入门 1.map与fl...

  • 2020-03-17

    spark学习笔记centos安装Oracle VirtualBox: Centos安装Vagrant

  • spark核心编程

    Spark 学习笔记 Spark 架构及组件 client:客户端进程,负责提交job到master Driver...

  • Spark Core 学习笔记

    Spark Core 学习笔记 1、Spark 简介 ​ Spark 是一种用于大规模数据处理的统一计算引擎...

  • Spark Architecture

    OReilly.Learning.Spark 学习笔记 Spark里所有操作都是对RDD来的。分为两种 1. Tr...

  • 《架构师训练营》之大数据应用

    极客时间《架构师训练营》第十三周学习笔记 Spark 架构 Spark 则是 UC Berkeley AMP la...

  • 【Spark学习笔记】初识spark

    1.Spark简介 快速且通用的集群计算平台 1.1.快速性: Spark扩充了流行的mapreduce计算模型 ...

  • Spark学习笔记

    Scala语法 至于scala语法而言,大致上和Java的语法类似,增加了一些函数式编程,具体语法可以参考Scal...

  • Spark 学习笔记

    归档至github 前言 Spark 作为目前最火的技术栈或许 大概 应该 maybe 没有之一了吧,看上去很厉害...

网友评论

      本文标题:Spark学习笔记

      本文链接:https://www.haomeiwen.com/subject/jtlpuhtx.html