1. RDD(弹性分布式数据集):
1.1 构造方式:
- 从定义的集合获取
- 外部获取的数据集合
1.2 操作算子
transformations:转化操作,返回另一个RDD
转化操作 | 说明 |
---|---|
map(Function<T,U>) | 将每个RDD[T]以函数内方法操作转为RDD[U]返回 |
filter(Function<T,Boolean>) | 将每个RDD[T]以函数内方法操作返回结果为true的数据 |
flatMap(FlatMapFunction[T, U]) | 将每个RDD[T]以函数内方法操作得到多个RDD[U]的迭代器返回集合 |
RDD.distinct() | 对RDD进行去重。开销较大(需要全部数据通过网络混洗(shuffle)) |
RDD1.union(RDD2) | 两个RDD[T]进行求和 |
RDD1intersection(RDD2) | 两个RDD[T]求交集。会去掉重复数据,开销较大 |
RDD1.subtract(RDD2) | 返回只存在RDD1不存在RDD2中的数据,开销较大 |
RDD1.cartesian(RDD2) | 返回所有可能的(a,b),a是RDD1中元素,b是RDD2中元素,开销巨大 |
actions:真正执行,返回其他数据集
行动操作 | 说明 |
---|---|
reduce(Function<T, T, T>) | 将两个RDD[T]处理并返回一个RDD[T],最终将RDD集全部合并为一个T |
fold(T)(Function<T, T, T>) | 提供初始值的reduce() |
aggregate(U)(Function<U, T, U>)(Function<U, U, U>) | 可以将RDD[T]转化为其他类型[U]数据。第一个参数为初始值;第二个参数为如何将RDD[T]与U计算并返回新的U的函数;第三个参数为整个多个U为一个U的函数(多节点计算结果合并) |
collect() | 将结果收集到一台机器内存中用于比较测试,内容不宜过大 |
take(n) | 返回n个元素,尽量少的访问分区,数据不均衡 |
top(int,Comparator<T>) | 返回前几个元素,可自定义比较函数 |
takeSample() | 随机采样 |
foreach(VoidFunction[T]) | 对RDD[T]每个元素进行操作,而不把RDD发回本地 |
count() | RDD中元素个数 |
countByValue() | 各元素出现次数 |
中间结果持久化
当一个rdd需要调用多个行动操作时,方式每次都重新计算rdd,可以将其先持久化(缓存)起来。
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.unpersist()
累加器
广播变量
2. SQL数据集
DataSet:带有强数据结构的数据集。
可以用于sql查询转化
DataFrame:带有弱数据结构ROW的DataSet。
type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
自定义函数:
UDF
UDAF
3.构造数据集进行简单demo
- 自己构造测试数据集并将RDD转为Dataset完整wordCount Demo
SparkSession spark = SparkSession
.builder()
.appName("test").master("local")
.getOrCreate();
//用于rdd环境
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
//用于dataset环境
SQLContext sqlContext = new SQLContext(jsc);
//构造测试数据并注册进SparkContext
ArrayList<String> arr = new ArrayList<>();
arr.add("i love china");
arr.add("i love usa");
JavaRDD<String> lines = jsc.parallelize(arr);
//将所有行展平成一行,并在展平的过程中按照空格对每一行进行分词
JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
//Map操作:每个单词记一次数,即将word变为(word, 1)形式
JavaPairRDD<String, Integer> wordOne = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<String, Integer>(word, 1));
//Reduce操作:使用reduceByKey函数将相同的key的value相加
JavaPairRDD<String, Integer> count = wordOne.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
//以上算子都是Transformation,不会触发计算;使用Action算子:collect开始计算
List<Tuple2<String, Integer>> result = count.collect();
//将结果输出到屏幕上
for (Tuple2<String, Integer> t : result) {
System.out.println(t._1 + "\t" + t._2);
}
//构造RDD<ROW>转Dataset需要的schema
List<StructField> fields = new ArrayList<>();
StructField field1 = DataTypes.createStructField("word", DataTypes.StringType, true);
StructField field2 = DataTypes.createStructField("count", DataTypes.IntegerType, true);
fields.add(field1);
fields.add(field2);
StructType schema = DataTypes.createStructType(fields);
//RDD转为ROW类型
JavaRDD<Row> rowRDD = count.map(new Function<Tuple2<String, Integer>, Row>() {
@Override
public Row call(Tuple2<String, Integer> tuple) throws Exception {
return RowFactory.create(tuple._1, tuple._2);
}
});
//RDD<ROW>转Dataset
Dataset<Row> res = sqlContext.createDataFrame(rowRDD,schema);
//注册并查询
res.createOrReplaceTempView("res");
Dataset<Row> results = sqlContext.sql("SELECT word,count FROM res where count>1");
results.show();
//停止context对象
jsc.stop();
相比之下Scala版本编码简单很多,但可阅读性不如java:
val sc = SparkSession.builder().appName("test").master("local").getOrCreate()
val lines = sc.sparkContext.parallelize(Array("i love china","i love usa"))
//进行分词操作
//:RDD[String]
val words = lines.flatMap(_.split(" "))
//Map操作:每个单词记一次数
//:RDD[(String,Int)]
val wordPair = words.map((_,1))
//key相同单词计数合并
//:RDD[(String,Int)]
val wordCount = wordPair.reduceByKey(_+_)
//RDD转为Row类型
val rowRdd:RDD[Row] = wordCount.map(wc => Row(wc._1, wc._2))
val schema = StructType(Array(StructField("word", StringType, nullable = true),StructField("count", IntegerType, nullable = true)))
//RDD[Row]转为DataFrame
val frame:DataFrame = sc.createDataFrame(rowRdd, schema)
frame.createOrReplaceTempView("res")
val results = sc.sql("select word,count from res where count > 1");
results.show();
sc.stop()
- SparkSession通过驱动直接读取
SparkSession spark = SparkSession.builder()
.appName("test").master("local").getOrCreate();
Dataset<Row> df = spark.read().format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("user","root").option("password","123456")
.jdbc("jdbc:mysql://192.168.1.1:3306/caster?characterEncoding=utf-8&serverTimezone=UTC","name_age",new Properties());
df.printSchema();
df.show();
4.SparkStreaming流处理
//TODO
网友评论