一、背景
1). 针对现有的spark日志清洗程序,寻找可优化的点,减少资源消耗。
2). 熟悉spark开发、测试、优化流程。
二、 方案
1. RDD调优
1). 调优内容:其他参数不变,替换map算子为mapPartitions.
2). 修改依据:将逐条拉取数据的方式换成按分区处理数据。
3). 结果: 经测试发现性能降低(在系统资源较少的情况会出现耗时成倍增长的情况)。
2. partition优化
经分析6个文件总共5.8G数据被分到6个Task处理,造成并行度低处理速度变慢,尝试repartition分区为50个,并发量明显提高。
3. 算法优化
减少多次RDD迭代,优化代码降低时间复杂度。使用hashMap取代遍历Array操作,减少mapRDD使用次数.
三、结果
性能提升29.4%。
D9B0A2F7-F11E-453E-8DF7-DC83326C6F44.png附(代码):
line1.map {s => loadLog(s, broad_fields.value) }
.map(p => convert(p, broad_fields.value,broad_types.value))
//函数代码
def makeField(field: String, data_type: String): StructField = {
if (data_type.equals("BIGINT") || data_type.contains("LONG") || data_type.contains("INT") || data_type.contains("BYTE") || data_type.contains("TINYINT")) {
return StructField(field, LongType, true)
}
else if (data_type.contains("DOUBLE") || data_type.contains("FLOAT")) {
return StructField(field, DoubleType, true)
}
else if (data_type.contains("TIMESTAMP"))
return StructField(field, TimestampType, true)
else
returnStructField(field, StringType, true)
}
def convert(field: String, types: String, data: String): Any = {
var result: Any = null
if(StringUtils.isNotBlank(data)) {
if (types.equals("BIGINT") || types.contains("LONG") || types.contains("INT") || types.contains("BYTE") || types.contains("TINYINT"))
if (data.contains(".")) {
result = data.substring(0, data.indexOf('.')).toLong
}
else
result = data.toLong
else if (types.contains("FLOAT") || types.contains("DOUBLE"))
result = data.toDouble
else if (types.contains("TIMESTAMP")) {
if (data.contains(":") || data.contains("-")) {
val loc = new Locale("en")
val dfm = new SimpleDateFormat("yyyyMMdd-HH:mm:ss", loc)
val dt = dfm.parse(data)
result = new Timestamp(dt.getTime)
}
else
result = new Timestamp(0)
}
else
result = data
}
result
}
优化为:
import java.io.IOException
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util
import java.util.Locale
import org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
object FlumeDataToHive3 {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("the number of parameters shold more than 1")
System.exit(1)
}
try {
val inputpath = args(0)
val outputstr = args(1)
val fields = Array("logid", "logtime", "logtype", "logip", "businessline", "application", "apptype", "driver_id", "coupling_order_num", "log_type", "current_grab_num", "must_grab", "isVieorderLimit", "coupling_rate", "driver_rank", "angle", "collection_driver", "dirver_str", "new_driver_order", "new_driver", "a", "no_actual_distance","is_new_user", "order_id", "coupling_order","city_id")
val types = Array("STRING", "STRING", "STRING", "STRING", "STRING", "STRING", "STRING", "STRING", "STRING", "STRING", "TINYINT", "TINYINT", "BOOLEAN", "DOUBLE", "INT", "DOUBLE", "TINYINT", "DOUBLE", "BOOLEAN", "BOOLEAN", "DOUBLE", "DOUBLE", "TINYINT", "BIGINT", "BIGINT" , "BIGINT")
var num = 0
var pairs: Array[(String, String)] = new Array[(String, String)](fields.length)
while (num < fields.length) {
pairs(num) = (fields(num), types(num))
num += 1
}
val conf = new SparkConf()
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val broadmap = sc.broadcast(pairs)
val line = sc.textFile(inputpath).repartition(100)
val arrays = broadmap.value
val schema = mkSchema(arrays)
if (!line.isEmpty()) {
val line1 = line.filter(s => StringUtils.isNotBlank(s) && s.contains("logid"))
if (!line1.isEmpty()) {
val splits = mkRow(line1, arrays)
val df = hiveContext.createDataFrame(splits, schema)
df.write.parquet(outputstr)
df.show()
}
}
sc.stop()
}
catch {
case ex: InterruptedException => {
println("InterruptedException")
}
case ex: IOException => {
println("IOException")
}
}
}
def mkSchema(maps: Array[(String, String)]): StructType = {
StructType(for (elem <- maps) yield {
makeField(elem._1, elem._2)
})
}
def makeField(field: String, data_type: String): StructField = {
if (data_type.equals("BIGINT") || data_type.contains("LONG") || data_type.contains("INT") || data_type.contains("BYTE") || data_type.contains("TINYINT")) {
return StructField(field, LongType, true)
}
else if (data_type.contains("DOUBLE") || data_type.contains("FLOAT")) {
return StructField(field, DoubleType, true)
}
else if (data_type.contains("TIMESTAMP"))
return StructField(field, TimestampType, true)
else
return StructField(field, StringType, true)
}
def mkRow(str: RDD[String], maps: Array[(String, String)]): RDD[Row] = {
str.map(s => {
val map: util.HashMap[String, String] = new util.HashMap()
val tup = s.split(" ").map(_.split("=")).filter(s => s.length == 2).foreach(x => map.put(x(0), x(1))) //数据:k v
var arr: Array[(String, String, String)] = new Array[(String, String, String)](maps.length)
val seq = for (i <- 0 until maps.length) yield {
convert(maps(i)._1, maps(i)._2, map.get(maps(i)._1))
}
Row.fromSeq(seq)
})
}
def convert(field: String, types: String, data: String): Any = {
var result: Any = null
if (StringUtils.isNotBlank(data)) {
if (types.equals("BIGINT") || types.contains("LONG") || types.contains("INT") || types.contains("BYTE") || types.contains("TINYINT"))
if (data.contains(".")) {
result = data.substring(0, data.indexOf('.')).toLong
}
else
result = data.toLong
else if (types.contains("FLOAT") || types.contains("DOUBLE"))
result = data.toDouble
else if (types.contains("TIMESTAMP")) {
if (data.contains(":") || data.contains("-")) {
val loc = new Locale("en")
val dfm = new SimpleDateFormat("yyyyMMdd-HH:mm:ss", loc)
val dt = dfm.parse(data)
result = new Timestamp(dt.getTime)
}
else
result = new Timestamp(0)
}
else
result = data
}
result
}
}
网友评论