美文网首页
scala代码快速从Oracle抽取数据到kudu

scala代码快速从Oracle抽取数据到kudu

作者: spark孙 | 来源:发表于2023-05-18 15:38 被阅读0次

    scala代码高速拉去数据-适合同步历史数据场景

    package com.longi.util
    
    import com.longi.common.OracleTemplate
    import com.longi.common.base.{HDFSPathWrapper, PropertiesHelper}
    import com.longi.hadoop.{LoadKuduToDataFrame, SparkSessionPort}
    import org.apache.kudu.client.SessionConfiguration
    import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions}
    import org.apache.spark.sql.functions.{coalesce, lit}
    import org.apache.spark.sql.types.{DataType, DecimalType, StructField, StructType}
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    /**
     * @date: 2021-12-3 9:55
     * @desc: Please fill in the remarks
     */
    object Oracle2KuduWMS {
    
      def main(args: Array[String]): Unit = {
        //Step 0: 参数初始化
        if (args.length > 7) {
          throw new IllegalArgumentException("You need to pass the following parameters  1:The data of time(2020-06-01) 2:The name of Kudu table 3:Data extraction date column 4:The business Domain")
        }
    
        //(Oracle Table , Oracle Column, Begin Dt, End Dt, interval, Kudu Table)
        //Oracle Table Name
        val oraTableName = args(0)
        //Oracle Incremental Sliding Column
        val oraColumnName = args(1)
        //The Begin time and End Time
        val datBeginDateTime = args(2)
        val datEndDateTime = args(3)
        //Kudu table name
        val kuduTableName = args(4)
        val mesge = s"Oracle Table $oraTableName -> Kudu Table : $kuduTableName ($datBeginDateTime - $datEndDateTime) "
        val fieldValueChange = args(5)
        println(mesge)
        val spark = SparkSessionPort(mesge)
    
        var exeSQLCondition = ""
        if (datBeginDateTime == "0000-00-00 00:00:00") {
          exeSQLCondition = "1=1"
        } else {
          exeSQLCondition = oraColumnName.concat(">=to_date('").concat(datBeginDateTime).concat("','yyyy-mm-dd hh24:mi:ss') AND ").concat(oraColumnName).concat("<to_date('").concat(datEndDateTime).concat("','yyyy-mm-dd hh24:mi:ss')")
        }
        println(s"exeSQLCondition = ${exeSQLCondition}")
    
        val exeSQL =
          s"""
             |     (
             |     SELECT rownum as rs
             |           ,rowid as offsets  -- 用这个替换cast(rowid as varchar(20)) as offsets
             |           ,'I' as op_type
             |           ,t.*
             |       FROM ${oraTableName} t
             |      WHERE ${exeSQLCondition}
             |     )
              """.stripMargin
        println(s"exeSQL = ${exeSQL}")
        val exeAggrSQL =
          s"""
             |     (
             |     SELECT count(1)
             |       FROM ${oraTableName} t
             |      WHERE ${exeSQLCondition}
             |     ) t
             |""".stripMargin
        println(exeAggrSQL)
    
        val aggrDF = spark
          .read
          .format("jdbc")
          .option("url", PropertiesHelper.getPropertiesValueFromKey("origi.wms.ora.uri"))
          .option("driver", "oracle.jdbc.driver.OracleDriver")
          .option("dbtable", exeAggrSQL)
          .option("user", PropertiesHelper.getPropertiesValueFromKey("origi.wms.ora.username"))
          .option("password", PropertiesHelper.getPropertiesValueFromKey("origi.wms.ora.password")).load()
        val upperBound = aggrDF.first().getDecimal(0).toBigInteger.intValue()
    
    
        val numPartitions = upperBound match {
          case upperBound if (upperBound <= 100000) => 1
          case upperBound if (upperBound < 100000 && upperBound <= 500000) => 3
          case upperBound if (upperBound < 500000 && upperBound <= 1000000) => 10
          case upperBound if (upperBound < 1000000 && upperBound <= 5000000) => 15
          case _ => 20
        }
    
        val oracleDF = spark.read
          .format("jdbc")
          .option("url", "jdbc:oracle:thin:@xxx-st.longi.com:1521/prod")
          .option("driver", "oracle.jdbc.driver.OracleDriver")
          .option("dbtable", exeSQL)
          .option("user", "USER")
          .option("partitionColumn", "rs")
          .option("lowerBound", 1)
          .option("upperBound", upperBound)
          .option("numPartitions", numPartitions)
          .option("fetchsize", 3000)
          .option("password", "XXXXXXX").load()
        println(oracleDF.printSchema())
    
        val kuduSchema = LoadKuduToDataFrame(spark, kuduTableName).schema
        import org.apache.spark.sql.functions._
        //按照Kudu表数据类型对Oracle数据类型进行改造
        val colNames = kuduSchema.names.map(c => {
          col(c).cast(kuduSchema.fields(kuduSchema.fieldIndex(c)).dataType.typeName)
        })
        var oracleFinalDF = oracleDF.select(colNames: _*)
        //oracleFinalDF.show()
        //需要转变的字段不为空
        /*val fields = fieldValueChange.split(",")
    
        fields.foreach(f => {
          /*oracleFinalDF.schema.fields(kuduSchema.fieldIndex(f.split("=")(0))).dataType.typeName match {
            case "Int" => coalesce(oracleFinalDF(f.split("=")(0)), lit(f.split("=")(1)))
            case "String" => (f.split("=")(0))
          }*/
          println(s"f={$fields}, f0=${f.split("=")(0)} f1=${f.split("=")(1)}")
          //coalesce(oracleFinalDF(f.split("=")(0)), lit(f.split("=")(1)))
          //nvl(oracleFinalDF.col(f.split("=")(0)), f.split("=")(1))
          //coalesce(col(f.split("=")(0)),lit(oracleFinalDF(f.split("=")(1))))
          val tf = oracleFinalDF.withColumn(f.split("=")(0).concat("_new"), nvl(oracleFinalDF.col(f.split("=")(0)), f.split("=")(1)))
          //tf.withColumn("a",oracleFinalDF.col("gl_sl_link_id"))
          tf.where("").show()
          val df = tf.drop(col(f.split("=")(0)))
          df.show()
          val rf = df.withColumnRenamed(f.split("=")(0).concat("_new"), f.split("=")(0))
          rf.where("gl_sl_link_id is null or gl_sl_link_id =-1").show()
          oracleFinalDF = rf
        })
    
        //将空值转为给定值
        def nvl(ColIn: Column, ReplaceVal: Any): Column = {
          println(ReplaceVal)
          (when(ColIn.isNull, lit(ReplaceVal)).otherwise(ColIn))
        }
    
        oracleFinalDF.select("gl_sl_link_id").where("gl_sl_link_id is null or gl_sl_link_id =-1").distinct().show(200)
    */
        //创建新DF,用于插入Kudu表中
        val newData = spark.createDataFrame(oracleFinalDF.rdd, kuduSchema)
    
    
        println(newData.printSchema())
    
        println(s"newData.rdd.partitions.size :${newData.rdd.partitions.size}")
    
        upsertKuduData(spark, newData, kuduTableName)
        spark.stop()
    
      }
    
      /**
       * @Description:写入Kudu数据库
       * @Param: [spark, dataFrame, tabName]
       * @return: void
       */
      private def upsertKuduData(spark: SparkSession, dataFrame: DataFrame, tabName: String): Unit = {
        val kuduContext = new KuduContext(
          HDFSPathWrapper.getKuduMasterPath(),
          spark.sparkContext
        )
        val ks = kuduContext.syncClient.newSession()
        ks.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
        ks.setMutationBufferSpace(10000)
        kuduContext.upsertRows(dataFrame, tabName, new KuduWriteOptions(false, true))
      }
    
    }
    

    调用方式

    spark-submit  --class com.longi.util.Oracle2KuduWMS  --master yarn  --deploy-mode cluster  --queue bdp --driver-memory 2g  --num-executors 20  --executor-memory 1g  --executor-cores 1  --conf spark.sql.warehouse.dir=hdfs://longi/home/data/hive/warehouse  --conf spark.sql.session.timeZone=Asia/Shanghai  --conf spark.executor.memoryOverhead=10G  --jars /dfs/projects/etl-schedule-entry/lib/common/etl-functions-1.0.jar,/dfs/projects/etl-schedule-entry/lib/common/ojdbc6-11.2.0.3.jar,/dfs/projects/etl-schedule-entry/lib/common/kudu-spark2_2.11-1.10.0-cdh6.3.2.jar,/dfs/projects/etl-schedule-entry/lib/common/grizzled-slf4j_2.11-1.3.0.jar,/dfs/projects/etl-schedule-entry/lib/common/mysql-connector-java-5.1.44.jar  /dfs/projects-test/etl-schedule-entry/lib/common/etl-tools-1.0.jar WMS_PROD.ACT_ALLOCATION_DETAILS  addtime '2010-01-01 00:00:00' '2023-12-23 10:00:00'  ods_po_wms.streaming_cemd_act_allocation_details  'a=1'
    

    相关文章

      网友评论

          本文标题:scala代码快速从Oracle抽取数据到kudu

          本文链接:https://www.haomeiwen.com/subject/hkawsdtx.html