美文网首页
Spark SQL完成ETL项目

Spark SQL完成ETL项目

作者: 喵星人ZC | 来源:发表于2019-06-20 20:42 被阅读0次

前面Hadoop MR ETL项目文章
Hadoop MR ETL离线项目1
基于ETL离线项目的改造2
Spark core完成ETL项目

一、定义Log、日期解析类
log解析

package com.soul.bigdata.task.task04

import com.soul.bigdata.task.task04.DateUtils.parseToMinute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object LogParse {
  val schema = StructType(Array(
    StructField("cdn", StringType),
    StructField("region", StringType),
    StructField("level", StringType),
    StructField("time", StringType),
    StructField("ip", StringType),
    StructField("domain", StringType),
    StructField("url", StringType),
    StructField("traffic", LongType)
  ))

  def parseLog(log: String): Row = {
    //baidu CN  2   2019-01-21 17:17:56 123.235.248.216 v2.go2yd.com    http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4    785966
    val splits = log.split("\t")
    var traffic = 0L
    if (splits.length == 8) {
      try {
        val cdn = splits(0)
        val region = splits(1)
        val level = splits(2)
        val time = parseToMinute(splits(3))
        val ip = splits(4)
        val domain = splits(5)
        val url = splits(6)
        traffic = splits(7).toLong

        Row(cdn, region, level, time, ip, domain, url, traffic)
      } catch {
        case e: Exception => e.printStackTrace()
          Row(0)
      }
    } else {
      Row(0)
    }
  }

}


日期工具类

package com.soul.bigdata.task.task04

import java.util.Date

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging


object DateUtils extends Logging {

  val sourceFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  val targetFormat = FastDateFormat.getInstance("yyyyMMddHHmmss")

  def getTime(time: String): Long = {
    try {
      sourceFormat.parse(time).getTime
    } catch {
      case e: Exception =>
        logError(s"$time parse error : ${e.getMessage}")
        0l
    }

  }

  def parseToMinute(time: String): String = {
    targetFormat.format(new Date(getTime(time)))
  }

  def getDay(time: String): String = {
    time.substring(0, 8)

  }

  def getHour(time: String): String = {
    time.substring(8, 10)
  }

  def main(args: Array[String]): Unit = {
    println(parseToMinute("2019-01-21 17:17:56"))
    println(getDay(parseToMinute("2019-01-21 17:17:56")))
    println(getHour(parseToMinute("2019-01-21 17:17:56")))
  }


}

二、Spark SQL完成ETL操作

package com.soul.bigdata.task.task04


import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object LogSQLETL extends Logging {

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      logWarning("Please input 2 parms: input_path outputtmp_path finally_path")
      System.exit(0)
    }

    val input_path = args(0)
    val outputtmp_path = args(1)
//    val finally_path = args(2)

    val spark = SparkSession.builder()
//            .master("local[2]").appName("LogETL")
      .getOrCreate()

    //    val rowRDD = spark.sparkContext.textFile("file:///D:\\\\RZ-G6\\\\2019G6\\\\data\\\\access2.log")

    var logDF = spark.read.format("text").load(input_path).coalesce(1)

    logDF = spark.createDataFrame(logDF.rdd.map(x => {
      LogParse.parseLog(x.getString(0))
    }), LogParse.schema)

    //    logDF.show(10,false)
    //此处使用overwrite后应该移动到其他目录
    logDF.write.format("parquet").mode("overwrite").save(outputtmp_path)

    //    HDFSUtils.rename(outputtmp_path,finally_path)


    spark.stop()

  }

}

三、Spark SQL 完成统计分析

package com.soul.bigdata.task.task04


import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object LogAnalyseApp extends Logging {

  def main(args: Array[String]): Unit = {

    if (args.length != 2) {
      logWarning("Please input 2 parms: input_path outputtmp_path")
      System.exit(0)
    }

    val input_path = args(0)
    val outputtmp_path = args(1)

    val spark = SparkSession.builder()
      //            .master("yarn").appName("LogETL")
      .getOrCreate()

    val df = spark.read.format("parquet").load(input_path).coalesce(1)
    df.createOrReplaceTempView("etl_log")

    val rsDF = spark.sql("select domain,count(*) as sum from etl_log group by domain")
    //    rsDF.show(false)

    rsDF.write.format("parquet").mode("overwrite").save(outputtmp_path)

    spark.stop()

  }

}

相关文章

网友评论

      本文标题:Spark SQL完成ETL项目

      本文链接:https://www.haomeiwen.com/subject/uclsqctx.html