美文网首页大数据学习
Kudu:Spark SQL操作Kudu

Kudu:Spark SQL操作Kudu

作者: xiaogp | 来源:发表于2020-12-16 17:53 被阅读0次

    摘要:Spark SQLKudu

    参考https://github.com/xieenze/SparkOnKudu/blob/master/src/main/scala/com/spark/test/KuduCRUD.scala

    依赖

    引入spark-core_2.11spark-sql_2.11kudu-spark2_2.11hadoop-client依赖包

    <!--Spark 依赖 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-client</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                    </exclusion>
                </exclusions>
    <!--            <scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
    <!-- kudu -->
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-spark2_2.11</artifactId>
                <version>1.6.0-cdh5.14.2</version>
            </dependency>
    
    <!-- Hadoop 依赖 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    

    读取数据

    指定kudu.master"kudu.table,如果读取超时加入kudu.operation.timeout.ms参数

    import org.apache.kudu.spark.kudu._
    
    val df = spark.read.options(Map(
          "kudu.master" -> "cloudera01:7051,cloudera02:7051,cloudera03:7051",
          "kudu.table" -> "impala::default.students",
          "kudu.operation.timeout.ms" -> "1000000")).kudu
    

    或者

    val df = spark.read.format("org.apache.kudu.spark.kudu")
        .option("kudu.table", "impala::default.students")
        .option("kudu.master","cloudera01:7051")
        .option("kudu.operation.timeout.ms", "100000").load
    

    写入数据

    写入数据可以使用dataframe的write方法,也可以使用kuduContextupdateRowsinsertRowsupsertRowsinsertIgnoreRows方法

    (1)无key则插入,有key则更新

    直接调用dataframe的write方法指定kudu.masterkudu.table,只支持append模式,对已有key的数据自动更新

    val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
    df.write.options(Map(
          "kudu.master" -> "cloudera01:7051",
          "kudu.table" -> "impala::test_gp.student",
          "kudu.operation.timeout.ms" -> "1000000")).mode("append").kudu
    

    调用kuduContext的upsertRows方法,效果和dataframe调用write append模式一样

        val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
        // 写入
        val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
        val sc = spark.sparkContext
        val kuduMasters = "cloudera01:7051"
        val kuduContext = new KuduContext(kuduMasters, sc)
        kuduContext.upsertRows(df, "impala::test_gp.student")
    
    (2)只插入数据

    调用kuduContext insertRowsinsertIgnoreRows方法,如果插入的数据key已存在insertRows直接报错,insertIgnoreRows忽略已存在的key,只插入不存在的key

        val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
        // 写入
        val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
        val sc = spark.sparkContext
        val kuduMasters = "cloudera01:7051"
        val kuduContext = new KuduContext(kuduMasters, sc)
        kuduContext.insertRows(df, "impala::test_gp.student")
        // kuduContext.insertIgnoreRows(df, "impala::test_gp.student")
    
    (3)只更新数据

    调用kuduContext updateRows方法,对已经存在的key数据做更新,如果key不存在直接报错

        val spark = SparkSession.builder().master("local[*]").appName("spark_kudu").getOrCreate()
        // 写入
        val df = Seq(("张六", "男"), ("李四", "女")).toDF("name", "sex")
        val sc = spark.sparkContext
        val kuduMasters = "cloudera01:7051"
        val kuduContext = new KuduContext(kuduMasters, sc)
        kuduContext.updateRows(df, "impala::test_gp.student")
    

    建表和删除表

    使用已有dataframe的schema建表

    def createTable1(spark: SparkSession, kuduContext: KuduContext) = {
        import spark.implicits._
        import scala.collection.JavaConverters._
        val df = Seq(("王二", 3), ("李振", 4)).toDF("name", "age")
        val kuduTableSchema = df.schema
        val kuduTablePrimaryKey = Seq("name")  // 定义主键
        val kuduTableOptions = new CreateTableOptions()
        kuduTableOptions.
          setNumReplicas(1).// 设置副本数量
          addHashPartitions(List("name").asJava,20)  //设置分区 数据分到几台机器上
        //4 创建表
        kuduContext.createTable("impala::test_gp.student_info", kuduTableSchema,
          kuduTablePrimaryKey, kuduTableOptions)
      }
    

    使用StructType自定义schema

    def createTable2(spark: SparkSession, kuduContext: KuduContext) = {
        import scala.collection.JavaConverters._
        val schema = new StructType()
          .add(StructField("name", StringType, false))
          .add(StructField("age", IntegerType, true))
        val kuduTablePrimaryKey = Seq("name") // 定义主键
        val kuduTableOptions = new CreateTableOptions()
        kuduTableOptions.
          setNumReplicas(1). // 设置副本数量
          addHashPartitions(List("name").asJava, 20) //设置分区 数据分到几台机器上
        //4 创建表
        kuduContext.createTable("impala::test_gp.student_info", schema,
          kuduTablePrimaryKey, kuduTableOptions)
      }
    
    

    删除表和判断表是否存在

    def deleteTable(kuduContext: KuduContext): Unit = {
        val kuduTableName: String = "impala::test_gp.student_info"
        // 如果不存在直接删除会报错
        if (kuduContext.tableExists(kuduTableName)) {
          kuduContext.deleteTable(kuduTableName)
        }
      }
    

    相关文章

      网友评论

        本文标题:Kudu:Spark SQL操作Kudu

        本文链接:https://www.haomeiwen.com/subject/uifogktx.html