前面Hadoop MR ETL项目文章
Hadoop MR ETL离线项目1
基于ETL离线项目的改造2
Spark core完成ETL项目
一、定义Log、日期解析类
log解析
package com.soul.bigdata.task.task04
import com.soul.bigdata.task.task04.DateUtils.parseToMinute
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object LogParse {
val schema = StructType(Array(
StructField("cdn", StringType),
StructField("region", StringType),
StructField("level", StringType),
StructField("time", StringType),
StructField("ip", StringType),
StructField("domain", StringType),
StructField("url", StringType),
StructField("traffic", LongType)
))
def parseLog(log: String): Row = {
//baidu CN 2 2019-01-21 17:17:56 123.235.248.216 v2.go2yd.com http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4 785966
val splits = log.split("\t")
var traffic = 0L
if (splits.length == 8) {
try {
val cdn = splits(0)
val region = splits(1)
val level = splits(2)
val time = parseToMinute(splits(3))
val ip = splits(4)
val domain = splits(5)
val url = splits(6)
traffic = splits(7).toLong
Row(cdn, region, level, time, ip, domain, url, traffic)
} catch {
case e: Exception => e.printStackTrace()
Row(0)
}
} else {
Row(0)
}
}
}
日期工具类
package com.soul.bigdata.task.task04
import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
object DateUtils extends Logging {
val sourceFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val targetFormat = FastDateFormat.getInstance("yyyyMMddHHmmss")
def getTime(time: String): Long = {
try {
sourceFormat.parse(time).getTime
} catch {
case e: Exception =>
logError(s"$time parse error : ${e.getMessage}")
0l
}
}
def parseToMinute(time: String): String = {
targetFormat.format(new Date(getTime(time)))
}
def getDay(time: String): String = {
time.substring(0, 8)
}
def getHour(time: String): String = {
time.substring(8, 10)
}
def main(args: Array[String]): Unit = {
println(parseToMinute("2019-01-21 17:17:56"))
println(getDay(parseToMinute("2019-01-21 17:17:56")))
println(getHour(parseToMinute("2019-01-21 17:17:56")))
}
}
二、Spark SQL完成ETL操作
package com.soul.bigdata.task.task04
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
object LogSQLETL extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
logWarning("Please input 2 parms: input_path outputtmp_path finally_path")
System.exit(0)
}
val input_path = args(0)
val outputtmp_path = args(1)
// val finally_path = args(2)
val spark = SparkSession.builder()
// .master("local[2]").appName("LogETL")
.getOrCreate()
// val rowRDD = spark.sparkContext.textFile("file:///D:\\\\RZ-G6\\\\2019G6\\\\data\\\\access2.log")
var logDF = spark.read.format("text").load(input_path).coalesce(1)
logDF = spark.createDataFrame(logDF.rdd.map(x => {
LogParse.parseLog(x.getString(0))
}), LogParse.schema)
// logDF.show(10,false)
//此处使用overwrite后应该移动到其他目录
logDF.write.format("parquet").mode("overwrite").save(outputtmp_path)
// HDFSUtils.rename(outputtmp_path,finally_path)
spark.stop()
}
}
三、Spark SQL 完成统计分析
package com.soul.bigdata.task.task04
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
object LogAnalyseApp extends Logging {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
logWarning("Please input 2 parms: input_path outputtmp_path")
System.exit(0)
}
val input_path = args(0)
val outputtmp_path = args(1)
val spark = SparkSession.builder()
// .master("yarn").appName("LogETL")
.getOrCreate()
val df = spark.read.format("parquet").load(input_path).coalesce(1)
df.createOrReplaceTempView("etl_log")
val rsDF = spark.sql("select domain,count(*) as sum from etl_log group by domain")
// rsDF.show(false)
rsDF.write.format("parquet").mode("overwrite").save(outputtmp_path)
spark.stop()
}
}
网友评论