美文网首页Spark
Spark从入门到精通63:Dataset的typed操作

Spark从入门到精通63:Dataset的typed操作

作者: 勇于自信 | 来源:发表于2020-07-19 14:32 被阅读0次
    1.coalesce和repartition操作

    它们都是用来重新定义分区的,区别在于:
    coalesce,只能用于减少分区数量,而且可以选择不发生shuffle
    repartiton,可以增加分区,也可以减少分区,必须会发生shuffle,相当于是进行了一次重分区操作
    实践:
    数据集:
    employee.json:

    {"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
    {"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
    {"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
    {"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
    {"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
    {"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
    {"name": "Jen", "age": 19, "depId": 2, "gender": "female", "salary": 8000}
    

    department:

    {"id": 1, "name": "Technical Department"}
    {"id": 2, "name": "Financial Department"}
    {"id": 3, "name": "HR Department"}
    

    代码:

    package com.spark.ds
    
    import org.apache.spark.sql.SparkSession
    
    object TypedOperation {
      case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Long)
      case class Department(id: Long, name: String)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("TypedOperation")
          .master("local")
          .config("spark.sql.warehouse.dir", "D:/spark-warehouse")
          .getOrCreate()
        import spark.implicits._
        val employee = spark.read.json("inputData/employee.json")
        println(employee.getClass.getSimpleName)
        val employeeDS = employee.as[Employee]
        println(employeeDS.getClass.getSimpleName)
        val department = spark.read.json("inputData/department.json")
        val departmentDS = department.as[Department]
     
        // coalesce和repartition操作
        val employeeDSRepartitioned = employeeDS.repartition(7)
        // 看一下它的分区情况
        println(employeeDSRepartitioned.rdd.partitions.size)
        val employeeDSCoalesced = employeeDSRepartitioned.coalesce(3)
        // 看一下它的分区情况
        println(employeeDSCoalesced.rdd.partitions.size)
        employeeDSCoalesced.show()
      }
    
    }
    
    

    输出结果:

    Dataset
    Dataset
    7
    1
    
    +---+-----+------+------+------+
    |age|depId|gender|  name|salary|
    +---+-----+------+------+------+
    | 25|    1|  male|   Leo| 20000|
    | 30|    2|female| Marry| 25000|
    | 35|    1|  male|  Jack| 15000|
    | 42|    3|  male|   Tom| 18000|
    | 21|    3|female|Kattie| 21000|
    | 30|    2|female|   Jen| 28000|
    | 19|    2|female|   Jen|  8000|
    +---+-----+------+------+------+
    
    2.distinct和dropDuplicates操作

    它们都是用来进行去重的,区别:
    distinct,是根据每一条数据,进行完整内容的比对和去重
    dropDuplicates,可以根据指定的字段进行去重
    实践:

    val distinctEmployeeDS = employeeDS.distinct();
        distinctEmployeeDS.show()
        val dropDuplicatesEmployeeDS = employeeDS.dropDuplicates(Seq("name"))  
        dropDuplicatesEmployeeDS.show()  
    

    输出结果:

    +---+-----+------+------+------+
    |age|depId|gender|  name|salary|
    +---+-----+------+------+------+
    | 30|    2|female| Marry| 25000|
    | 21|    3|female|Kattie| 21000|
    | 42|    3|  male|   Tom| 18000|
    | 35|    1|  male|  Jack| 15000|
    | 30|    2|female|   Jen| 28000|
    | 19|    2|female|   Jen|  8000|
    | 25|    1|  male|   Leo| 20000|
    +---+-----+------+------+------+
    
    +---+-----+------+------+------+
    |age|depId|gender|  name|salary|
    +---+-----+------+------+------+
    | 35|    1|  male|  Jack| 15000|
    | 42|    3|  male|   Tom| 18000|
    | 30|    2|female|   Jen| 28000|
    | 30|    2|female| Marry| 25000|
    | 21|    3|female|Kattie| 21000|
    | 25|    1|  male|   Leo| 20000|
    +---+-----+------+------+------+
    
    3.except、filter和intersect操作

    except:获取在当前dataset中有,但是在另外一个dataset中没有的元素
    filter:根据我们自己的逻辑,如果返回true,那么就保留该元素,否则就过滤掉该元素
    intersect:获取两个数据集的交集
    接着以上代码实践:
    输入数据employee2.json:

    {"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000}
    {"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
    {"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
    

    代码:

    employeeDS.except(employeeDS2).show()  
    employeeDS.filter { employee => employee.age > 30 }.show() 
    employeeDS.intersect(employeeDS2).show()  
    

    输出结果:

    +---+-----+------+------+------+
    |age|depId|gender|  name|salary|
    +---+-----+------+------+------+
    | 21|    3|female|Kattie| 21000|
    | 42|    3|  male|   Tom| 18000|
    | 30|    2|female|   Jen| 28000|
    | 19|    2|female|   Jen|  8000|
    +---+-----+------+------+------+
    
    
    +---+-----+------+----+------+
    |age|depId|gender|name|salary|
    +---+-----+------+----+------+
    | 35|    1|  male|Jack| 15000|
    | 42|    3|  male| Tom| 18000|
    +---+-----+------+----+------+
    
    
    +---+-----+------+-----+------+
    |age|depId|gender| name|salary|
    +---+-----+------+-----+------+
    | 30|    2|female|Marry| 25000|
    | 35|    1|  male| Jack| 15000|
    | 25|    1|  male|  Leo| 20000|
    +---+-----+------+-----+------+
    
    4.map、flatMap和mapPartitions操作
    employeeDS.map { employee => (employee.name, employee.salary + 1000) }.show()
    
        departmentDS.flatMap {
          department => Seq(Department(department.id + 1, department.name + "_1"), Department(department.id + 2, department.name + "_2"))
        }.show()
    
        employeeDS.mapPartitions { employees => {
          val result = scala.collection.mutable.ArrayBuffer[(String, Long)]()
          while(employees.hasNext) {
            var emp = employees.next()
            result += ((emp.name, emp.salary + 1000))
          }
          result.iterator
        }
        }.show()
    

    输出结果:

    +------+-----+
    |    _1|   _2|
    +------+-----+
    |   Leo|21000|
    | Marry|26000|
    |  Jack|16000|
    |   Tom|19000|
    |Kattie|22000|
    |   Jen|29000|
    |   Jen| 9000|
    +------+-----+
    
    
    +---+--------------------+
    | id|                name|
    +---+--------------------+
    |  2|Technical Departm...|
    |  3|Technical Departm...|
    |  3|Financial Departm...|
    |  4|Financial Departm...|
    |  4|     HR Department_1|
    |  5|     HR Department_2|
    +---+--------------------+
    
    
    
    +------+-----+
    |    _1|   _2|
    +------+-----+
    |   Leo|21000|
    | Marry|26000|
    |  Jack|16000|
    |   Tom|19000|
    |Kattie|22000|
    |   Jen|29000|
    |   Jen| 9000|
    +------+-----+
    

    相关文章

      网友评论

        本文标题:Spark从入门到精通63:Dataset的typed操作

        本文链接:https://www.haomeiwen.com/subject/kvwqkktx.html