本文仅供思路参考,大佬多赐教
一、物理结构图
image.png这是测试环境,线上环境可能会是下面的这种情况
image.png
下文只说测试环境的搭建
二、数据流程
1.上报方式 GET
2.通过携带参数与hive表的一一映射
3.日志清洗使用spark
具体代码
具体代码总共以下几步,可以使用oozie,调度工具进行调度
1.nginx 日志切割
#!/bin/bash
YESTERDAY=$(date -d "yesterday" +"%Y%m%d")
LOGPATH=/usr/local/cluster_app/openresty-1.13.6.2/nginx/logs/
PID=${LOGPATH}nginx.pid
mv ${LOGPATH}access.log ${LOGPATH}history/access-${YESTERDAY}.log
mv ${LOGPATH}error.log ${LOGPATH}history/error-${YESTERDAY}.log
kill -USR1 `cat ${PID}`
2.flume负载均衡配置
a1.sources = nginxLog
a1.channels = memoryChannel
a1.sinks = sink1 sink2
a1.sinkgroups = sinkGroup
a1.sinkgroups.sinkGroup.sinks = sink1 sink2
a1.sources.nginxLog.type = exec
a1.sources.nginxLog.command = tail -F /usr/local/cluster_app/openresty-1.13.6.2/nginx/logs/access.log
a1.sources.nginxLog.channels = memoryChannel
a1.sinks.sink1.type = avro
a1.sinks.sink1.channel = memoryChannel
a1.sinks.sink1.hostname = Test-Hadoop-003
a1.sinks.sink1.port = 4545
a1.sinks.sink2.type = avro
a1.sinks.sink2.channel = memoryChannel
a1.sinks.sink2.hostname = Test-Hadoop-004
a1.sinks.sink2.port = 4545
a1.channels.memoryChannel.type = memory
a1.sinkgroups.sinkGroup.processor.type = load_balance
a1.sinkgroups.sinkGroup.processor.backoff = true
a1.sinkgroups.sinkGroup.processor.selector = round_robin
a1.sinkgroups.sinkGroup.processor.selector.maxTimeOut = 30000
a1.channels.memoryChannel.capacity = 500000
a1.channels.memoryChannel.transactionCapacity = 10000
a1.channels.memoryChannel.byteCapacityBufferPercentage = 20
a1.channels.memoryChannel.byteCapacity = 800000
3.flume下沉配置,两台一样
a1.sources = avroSrc
a1.channels = memoryChannel
a1.sinks = k1
# For each one of the sources, the type is defined
a1.sources.avroSrc.type = avro
a1.sources.avroSrc.bind = 0.0.0.0
a1.sources.avroSrc.port = 4545
# Each sink's type must be defined
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://36.251.250.6:8020/flume/kankan/app/ds=%Y%m%d/hour=%H/
a1.sinks.k1.hdfs.filePrefix = nginx-%Y%m%d%H
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 5120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.sink.serializer = Text
a1.sinks.k1.sink.serializer.appendNewline = true
# Each channel's type is defined.
a1.channels.memoryChannel.type = memory
a1.channels.memoryChannel.capacity = 500000
a1.channels.memoryChannel.transactionCapacity = 1000
a1.channels.memoryChannel.byteCapacityBufferPercentage = 20
a1.channels.memoryChannel.byteCapacity = 5000000000
a1.sinks.k1.channel = memoryChannel
a1.sources.avroSrc.channels = memoryChannel
4.spark清洗代码(根据需求修改)
package com.kankan.tongji
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.http.util.TextUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
class RDDMultipleTextOutputFormat1 extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.toString + "/" + name
}
object NginxEtl {
private val APP_NAME = ConfigUtil.getValue("appName")
private val FLUME_LOG_DIR = ConfigUtil.getValue("flumeLog")
private val HDFS_URL = ConfigUtil.getValue("hdfs")
private val HDFS_URL_PORT = ConfigUtil.getValue("hdfsWithPort")
def etl(): Unit = {
val conf = new SparkConf().setAppName(APP_NAME).setMaster("local[*]")
val sc = new SparkContext(conf)
val currentTime = RDDMultipleTextOutputFormat.getFrontHour().split(" ")
val date = currentTime(0)
val hour = currentTime(1)
var log: RDD[String] = null
val flumeLog = FLUME_LOG_DIR + "ds=%s/hour=%s/".format(date,hour)
val hdfs = FileSystem.get(
new URI(HDFS_URL_PORT), new Configuration())
if(!hdfs.exists(new Path(flumeLog))){
println("Hdfs directory is't exist,check it:%s.".format(flumeLog))
return
}
log = sc.textFile(flumeLog)
log.filter{
l =>
if(l.length == 0)
false
else
true
}.filter {
l =>
val url: String = l.split(" ")(6)
if (url.startsWith("/?u="))
true
else
false
}.mapPartitions(partition =>
partition.map {
lg =>
val url: String = lg.split(" ")(6)
val arr: Array[String] = url.replace("/?", "&").split("&rd")(0).split("&u")
//去除第一个空格
val mid = arr.takeRight(arr.length - 1)
val result = mid.map {
x =>
var kv: Array[String] = x.split("=")
if (kv.length < 2)
(kv(0), "")
else
(kv(0), kv(1))
}.sortBy {
tuple =>
var comparable = -1
if (TextUtils.isEmpty(tuple._1))
comparable = 0
else
comparable = tuple._1.toInt
comparable
}
result
}
).mapPartitions(
iter =>
iter.map { arr =>
val k = arr(0)._2
val line = arr.map(_._2).mkString("\t")
(k, line.substring(0, line.lastIndexOf("\t")))
}
).saveAsHadoopFile(HDFS_URL, classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
val isTest = ConfigUtil.getValue("isTest")
if(!"1".equals(isTest)){
hdfs.delete(new Path(flumeLog),true)
println("Delete flume log dir : %s".format(flumeLog))
}
}
def main(args: Array[String]): Unit = {
etl()
}
}
5.hive 脚本
#!/bin/bash
echo "================================================="
dt=$(date -d "-1 hour" +'%Y%m%d %H')
ds=${dt:0:8}
hour=${dt:9:10}
HIVE="/usr/local/cluster_app/hive/bin/hive -hiveconf mapred.job.name=kkstat_hive -hiveconf hive.groupby.skewindata=false -hiveconf hive.exec.compress.output=true -hiveconf hive.exec.compress.intermediate=true -hiveconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec -hiveconf hive.map.aggr=true -hiveconf hive.stats.autogather=false -hiveconf mapred.job.queue.name=kankan"
while read table
do
echo "use kankan_odl;LOAD DATA INPATH '/user/kankan/warehouse/kankan_odl/${table}/ds=${ds}/hour=${hour}' INTO TABLE ${table} PARTITION (ds=${ds},hour=${hour})"
$HIVE -e "use kankan_odl;LOAD DATA INPATH '/user/kankan/warehouse/kankan_odl/${table}/ds=${ds}/hour=${hour}' INTO TABLE ${table} PARTITION (ds=${ds},hour=${hour})"
done < /usr/local/cluster_app/crontab-hive-load/tables.conf
网友评论