1.coalesce和repartition操作
它们都是用来重新定义分区的,区别在于:
coalesce,只能用于减少分区数量,而且可以选择不发生shuffle
repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
实践:
数据集:
employee.json:
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}
department:
{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}
代码:
package com.spark.ds
import org.apache.spark.sql.SparkSession
object TypedOperation {
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
case class Department(id: Long, name: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("TypedOperation")
.master("local")
.config("spark.sql.warehouse.dir", "D:/spark-warehouse")
.getOrCreate()
import spark.implicits._
val employee = spark.read.json("inputData/employee.json")
println(employee.getClass.getSimpleName)
val employeeDS = employee.as[Employee]
println(employeeDS.getClass.getSimpleName)
val department = spark.read.json("inputData/department.json")
val departmentDS = department.as[Department]
// coalesce和repartition操作
val employeeDSRepartitioned = employeeDS.repartition(7)
// 看一下它的分区情况
println(employeeDSRepartitioned.rdd.partitions.size)
val employeeDSCoalesced = employeeDSRepartitioned.coalesce(3)
// 看一下它的分区情况
println(employeeDSCoalesced.rdd.partitions.size)
employeeDSCoalesced.show()
}
}
输出结果:
Dataset
Dataset
7
1
+---+-----+------+------+------+
|age|depId|gender| name|salary|
+---+-----+------+------+------+
| 25| 1| male| Leo| 20000|
| 30| 2|female| Marry| 25000|
| 35| 1| male| Jack| 15000|
| 42| 3| male| Tom| 18000|
| 21| 3|female|Kattie| 21000|
| 30| 2|female| Jen| 28000|
| 19| 2|female| Jen| 8000|
+---+-----+------+------+------+
2.distinct和dropDuplicates操作
它们都是用来进行去重的,区别:
distinct,是根据每一条数据,进行完整内容的比对和去重
dropDuplicates,可以根据指定的字段进行去重
实践:
val distinctEmployeeDS = employeeDS.distinct();
distinctEmployeeDS.show()
val dropDuplicatesEmployeeDS = employeeDS.dropDuplicates(Seq("name"))
dropDuplicatesEmployeeDS.show()
输出结果:
+---+-----+------+------+------+
|age|depId|gender| name|salary|
+---+-----+------+------+------+
| 30| 2|female| Marry| 25000|
| 21| 3|female|Kattie| 21000|
| 42| 3| male| Tom| 18000|
| 35| 1| male| Jack| 15000|
| 30| 2|female| Jen| 28000|
| 19| 2|female| Jen| 8000|
| 25| 1| male| Leo| 20000|
+---+-----+------+------+------+
+---+-----+------+------+------+
|age|depId|gender| name|salary|
+---+-----+------+------+------+
| 35| 1| male| Jack| 15000|
| 42| 3| male| Tom| 18000|
| 30| 2|female| Jen| 28000|
| 30| 2|female| Marry| 25000|
| 21| 3|female|Kattie| 21000|
| 25| 1| male| Leo| 20000|
+---+-----+------+------+------+
3.except、filter和intersect操作
except:获取在当前dataset中有,但是在另外一个dataset中没有的元素
filter:根据我们自己的逻辑,如果返回true,那么就保留该元素,否则就过滤掉该元素
intersect:获取两个数据集的交集
接着以上代码实践:
输入数据employee2.json:
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
代码:
employeeDS.except(employeeDS2).show()
employeeDS.filter { employee => employee.age > 30 }.show()
employeeDS.intersect(employeeDS2).show()
输出结果:
+---+-----+------+------+------+
|age|depId|gender| name|salary|
+---+-----+------+------+------+
| 21| 3|female|Kattie| 21000|
| 42| 3| male| Tom| 18000|
| 30| 2|female| Jen| 28000|
| 19| 2|female| Jen| 8000|
+---+-----+------+------+------+
+---+-----+------+----+------+
|age|depId|gender|name|salary|
+---+-----+------+----+------+
| 35| 1| male|Jack| 15000|
| 42| 3| male| Tom| 18000|
+---+-----+------+----+------+
+---+-----+------+-----+------+
|age|depId|gender| name|salary|
+---+-----+------+-----+------+
| 30| 2|female|Marry| 25000|
| 35| 1| male| Jack| 15000|
| 25| 1| male| Leo| 20000|
+---+-----+------+-----+------+
4.map、flatMap和mapPartitions操作
employeeDS.map { employee => (employee.name, employee.salary + 1000) }.show()
departmentDS.flatMap {
department => Seq(Department(department.id + 1, department.name + "_1"), Department(department.id + 2, department.name + "_2"))
}.show()
employeeDS.mapPartitions { employees => {
val result = scala.collection.mutable.ArrayBuffer[(String, Long)]()
while(employees.hasNext) {
var emp = employees.next()
result += ((emp.name, emp.salary + 1000))
}
result.iterator
}
}.show()
输出结果:
+------+-----+
| _1| _2|
+------+-----+
| Leo|21000|
| Marry|26000|
| Jack|16000|
| Tom|19000|
|Kattie|22000|
| Jen|29000|
| Jen| 9000|
+------+-----+
+---+--------------------+
| id| name|
+---+--------------------+
| 2|Technical Departm...|
| 3|Technical Departm...|
| 3|Financial Departm...|
| 4|Financial Departm...|
| 4| HR Department_1|
| 5| HR Department_2|
+---+--------------------+
+------+-----+
| _1| _2|
+------+-----+
| Leo|21000|
| Marry|26000|
| Jack|16000|
| Tom|19000|
|Kattie|22000|
| Jen|29000|
| Jen| 9000|
+------+-----+
网友评论