美文网首页
spark入门

spark入门

作者: caster | 来源:发表于2021-05-20 17:58 被阅读0次

1. RDD(弹性分布式数据集):

1.1 构造方式:

  • 从定义的集合获取
  • 外部获取的数据集合

1.2 操作算子

transformations:转化操作,返回另一个RDD

转化操作 说明
map(Function<T,U>) 将每个RDD[T]以函数内方法操作转为RDD[U]返回
filter(Function<T,Boolean>) 将每个RDD[T]以函数内方法操作返回结果为true的数据
flatMap(FlatMapFunction[T, U]) 将每个RDD[T]以函数内方法操作得到多个RDD[U]的迭代器返回集合
RDD.distinct() 对RDD进行去重。开销较大(需要全部数据通过网络混洗(shuffle))
RDD1.union(RDD2) 两个RDD[T]进行求和
RDD1intersection(RDD2) 两个RDD[T]求交集。会去掉重复数据,开销较大
RDD1.subtract(RDD2) 返回只存在RDD1不存在RDD2中的数据,开销较大
RDD1.cartesian(RDD2) 返回所有可能的(a,b),a是RDD1中元素,b是RDD2中元素,开销巨大

actions:真正执行,返回其他数据集

行动操作 说明
reduce(Function<T, T, T>) 将两个RDD[T]处理并返回一个RDD[T],最终将RDD集全部合并为一个T
fold(T)(Function<T, T, T>) 提供初始值的reduce()
aggregate(U)(Function<U, T, U>)(Function<U, U, U>) 可以将RDD[T]转化为其他类型[U]数据。第一个参数为初始值;第二个参数为如何将RDD[T]与U计算并返回新的U的函数;第三个参数为整个多个U为一个U的函数(多节点计算结果合并)
collect() 将结果收集到一台机器内存中用于比较测试,内容不宜过大
take(n) 返回n个元素,尽量少的访问分区,数据不均衡
top(int,Comparator<T>) 返回前几个元素,可自定义比较函数
takeSample() 随机采样
foreach(VoidFunction[T]) 对RDD[T]每个元素进行操作,而不把RDD发回本地
count() RDD中元素个数
countByValue() 各元素出现次数

中间结果持久化
当一个rdd需要调用多个行动操作时,方式每次都重新计算rdd,可以将其先持久化(缓存)起来。

rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.unpersist()

累加器
广播变量

2. SQL数据集

DataSet:带有强数据结构的数据集。
可以用于sql查询转化
DataFrame:带有弱数据结构ROW的DataSet。

  type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

自定义函数:
UDF
UDAF

3.构造数据集进行简单demo

  1. 自己构造测试数据集并将RDD转为Dataset完整wordCount Demo
SparkSession spark = SparkSession
        .builder()
        .appName("test").master("local")
        .getOrCreate();
//用于rdd环境
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//用于dataset环境
SQLContext sqlContext = new SQLContext(jsc);

//构造测试数据并注册进SparkContext
ArrayList<String> arr = new ArrayList<>();
arr.add("i love china");
arr.add("i love usa");
JavaRDD<String> lines = jsc.parallelize(arr);

//将所有行展平成一行,并在展平的过程中按照空格对每一行进行分词
JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
//Map操作:每个单词记一次数,即将word变为(word, 1)形式
JavaPairRDD<String, Integer> wordOne = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<String, Integer>(word, 1));
//Reduce操作:使用reduceByKey函数将相同的key的value相加
JavaPairRDD<String, Integer> count = wordOne.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);

//以上算子都是Transformation,不会触发计算;使用Action算子:collect开始计算
List<Tuple2<String, Integer>> result = count.collect();
//将结果输出到屏幕上
for (Tuple2<String, Integer> t : result) {
    System.out.println(t._1 + "\t" + t._2);
}

//构造RDD<ROW>转Dataset需要的schema
List<StructField> fields = new ArrayList<>();
StructField field1 = DataTypes.createStructField("word", DataTypes.StringType, true);
StructField field2 = DataTypes.createStructField("count", DataTypes.IntegerType, true);
fields.add(field1);
fields.add(field2);
StructType schema = DataTypes.createStructType(fields);

//RDD转为ROW类型
JavaRDD<Row> rowRDD = count.map(new Function<Tuple2<String, Integer>, Row>() {
    @Override
    public Row call(Tuple2<String, Integer> tuple) throws Exception {
        return RowFactory.create(tuple._1, tuple._2);
    }
});

//RDD<ROW>转Dataset
Dataset<Row> res = sqlContext.createDataFrame(rowRDD,schema);
//注册并查询
res.createOrReplaceTempView("res");
Dataset<Row> results = sqlContext.sql("SELECT word,count FROM res where count>1");
results.show();

//停止context对象
jsc.stop();

相比之下Scala版本编码简单很多,但可阅读性不如java:

val sc = SparkSession.builder().appName("test").master("local").getOrCreate()
val lines = sc.sparkContext.parallelize(Array("i love china","i love usa"))

//进行分词操作
//:RDD[String]
val words = lines.flatMap(_.split(" "))

//Map操作:每个单词记一次数
//:RDD[(String,Int)]
val wordPair = words.map((_,1))

//key相同单词计数合并
//:RDD[(String,Int)]
val wordCount = wordPair.reduceByKey(_+_)

//RDD转为Row类型
val rowRdd:RDD[Row] = wordCount.map(wc => Row(wc._1, wc._2))

val schema = StructType(Array(StructField("word", StringType, nullable = true),StructField("count", IntegerType, nullable = true)))
//RDD[Row]转为DataFrame
val frame:DataFrame = sc.createDataFrame(rowRdd, schema)

frame.createOrReplaceTempView("res")
val results = sc.sql("select word,count from res where count > 1");
results.show();
sc.stop()
  1. SparkSession通过驱动直接读取
SparkSession spark = SparkSession.builder()
        .appName("test").master("local").getOrCreate();
Dataset<Row> df = spark.read().format("jdbc")
        .option("driver","com.mysql.cj.jdbc.Driver")
        .option("user","root").option("password","123456")
        .jdbc("jdbc:mysql://192.168.1.1:3306/caster?characterEncoding=utf-8&serverTimezone=UTC","name_age",new Properties());
df.printSchema();
df.show();

4.SparkStreaming流处理

//TODO

相关文章

  • 2020-10-21

    spark 入门 课程目标: 了解spark概念 知道spark的特点(与hadoop对比) 独立实现spark ...

  • 关于python学习文档

    1.《Spark 官方文档》Spark快速入门 英文原文:http://spark.apache.org/docs...

  • Spark RDD Api使用指南

    ​ 在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换和行动操作。在进行spark程...

  • Spark注意事项

    spark快速入门 要让spark在集群模式下运行,需要正确设置 注意master默认要用spark master...

  • spark

    *Spark Spark 函数Spark (Python版) 零基础学习笔记(一)—— 快速入门 1.map与fl...

  • YARN 原理简介

    YARN 组件 参考:Spark on Yarn | Spark,从入门到精通 YARN 采用 Master/Sl...

  • Spark快速入门

    Spark快速入门 本教程提供了如何使用 Spark 的简要介绍。首先通过运行 Spark 交互式的 shell(...

  • 大数据学习教程

    Hadoop生态 Hadoop相关内容 Spark Spark系列教程 Hive Hive快速入门 Elastic...

  • 从零开始学习Spark(二)Scala基础

    Scala基础 Spark的原生语言是Scala,因此入门一下Scala是学习Spark的第一步,下面就快速入门一...

  • Spark技术实战之基础篇

    Spark技术实战之基础篇 -Scala语言从入门到精通为什么要学习Scala?源于Spark的流行,Spark是...

网友评论

      本文标题:spark入门

      本文链接:https://www.haomeiwen.com/subject/niwdrltx.html