美文网首页
Spark性能优化记录

Spark性能优化记录

作者: hellozepp | 来源:发表于2018-03-26 20:47 被阅读0次

    一、背景

    1). 针对现有的spark日志清洗程序,寻找可优化的点,减少资源消耗。

    2). 熟悉spark开发、测试、优化流程。

    二、 方案

    1. RDD调优

    1). 调优内容:其他参数不变,替换map算子为mapPartitions.

    2). 修改依据:将逐条拉取数据的方式换成按分区处理数据。

    3). 结果: 经测试发现性能降低(在系统资源较少的情况会出现耗时成倍增长的情况)。

    2. partition优化

    经分析6个文件总共5.8G数据被分到6个Task处理,造成并行度低处理速度变慢,尝试repartition分区为50个,并发量明显提高。

    3. 算法优化

    减少多次RDD迭代,优化代码降低时间复杂度。使用hashMap取代遍历Array操作,减少mapRDD使用次数.

    三、结果

    性能提升29.4%。

    D9B0A2F7-F11E-453E-8DF7-DC83326C6F44.png

    附(代码):

    
    line1.map {s => loadLog(s, broad_fields.value) }
    
    .map(p => convert(p, broad_fields.value,broad_types.value))
    
    //函数代码
    
    def makeField(field: String, data_type: String): StructField = {
    
    if (data_type.equals("BIGINT") || data_type.contains("LONG") || data_type.contains("INT") || data_type.contains("BYTE") || data_type.contains("TINYINT")) {
    
    return StructField(field, LongType, true)
    
      }
    
    else if (data_type.contains("DOUBLE") || data_type.contains("FLOAT")) {
    
    return StructField(field, DoubleType, true)
    
      }
    
    else if (data_type.contains("TIMESTAMP"))
    
    return StructField(field, TimestampType, true)
    
    else
    
        returnStructField(field, StringType, true)
    
    }
    
    def convert(field: String, types: String, data: String): Any = {
    
    var result: Any = null
    
      if(StringUtils.isNotBlank(data)) {
    
    if (types.equals("BIGINT") || types.contains("LONG") || types.contains("INT") || types.contains("BYTE") || types.contains("TINYINT"))
    
    if (data.contains(".")) {
    
    result = data.substring(0, data.indexOf('.')).toLong
    
    }
    
    else
    
            result = data.toLong
    
    else if (types.contains("FLOAT") || types.contains("DOUBLE"))
    
    result = data.toDouble
    
    else if (types.contains("TIMESTAMP")) {
    
    if (data.contains(":") || data.contains("-")) {
    
    val loc = new Locale("en")
    
    val dfm = new SimpleDateFormat("yyyyMMdd-HH:mm:ss", loc)
    
    val dt = dfm.parse(data)
    
    result = new Timestamp(dt.getTime)
    
          }
    
    else
    
            result = new Timestamp(0)
    
        }
    
    else
    
          result = data
    
      }
    
    result
    
    }
    
    

    优化为:

    
    import java.io.IOException
    
    import java.sql.Timestamp
    
    import java.text.SimpleDateFormat
    
    import java.util
    
    import java.util.Locale
    
    import org.apache.commons.lang.StringUtils
    
    import org.apache.spark.rdd.RDD
    
    import org.apache.spark.sql.Row
    
    import org.apache.spark.sql.types._
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object FlumeDataToHive3 {
    
      def main(args: Array[String]): Unit = {
    
        if (args.length < 2) {
    
          System.err.println("the number of parameters shold more than 1")
    
          System.exit(1)
    
        }
    
        try {
    
          val inputpath = args(0)
    
          val outputstr = args(1)
    
      val fields = Array("logid", "logtime", "logtype", "logip", "businessline", "application", "apptype", "driver_id", "coupling_order_num", "log_type", "current_grab_num", "must_grab", "isVieorderLimit", "coupling_rate", "driver_rank", "angle", "collection_driver", "dirver_str", "new_driver_order", "new_driver", "a", "no_actual_distance","is_new_user", "order_id", "coupling_order","city_id")
    
          val types = Array("STRING", "STRING",   "STRING", "STRING", "STRING",       "STRING",       "STRING", "STRING",     "STRING",            "STRING",   "TINYINT",           "TINYINT", "BOOLEAN",         "DOUBLE",         "INT",          "DOUBLE", "TINYINT",          "DOUBLE",     "BOOLEAN",          "BOOLEAN", "DOUBLE", "DOUBLE",          "TINYINT",      "BIGINT", "BIGINT"      , "BIGINT")
    
          var num = 0
    
          var pairs: Array[(String, String)] = new Array[(String, String)](fields.length)
    
          while (num < fields.length) {
    
            pairs(num) = (fields(num), types(num))
    
            num += 1
    
          }
    
          val conf = new SparkConf()
    
          val sc = new SparkContext(conf)
    
          val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    
          val broadmap = sc.broadcast(pairs)
    
          val line = sc.textFile(inputpath).repartition(100)
    
          val arrays = broadmap.value
    
          val schema = mkSchema(arrays)
    
          if (!line.isEmpty()) {
    
            val line1 = line.filter(s => StringUtils.isNotBlank(s) && s.contains("logid"))
    
            if (!line1.isEmpty()) {
    
              val splits = mkRow(line1, arrays)
    
              val df = hiveContext.createDataFrame(splits, schema)
    
              df.write.parquet(outputstr)
    
              df.show()
    
            }
    
          }
    
          sc.stop()
    
        }
    
        catch {
    
          case ex: InterruptedException => {
    
            println("InterruptedException")
    
          }
    
          case ex: IOException => {
    
            println("IOException")
    
          }
    
        }
    
      }
    
      def mkSchema(maps: Array[(String, String)]): StructType = {
    
        StructType(for (elem <- maps) yield {
    
          makeField(elem._1, elem._2)
    
        })
    
      }
    
      def makeField(field: String, data_type: String): StructField = {
    
        if (data_type.equals("BIGINT") || data_type.contains("LONG") || data_type.contains("INT") || data_type.contains("BYTE") || data_type.contains("TINYINT")) {
    
          return StructField(field, LongType, true)
    
        }
    
        else if (data_type.contains("DOUBLE") || data_type.contains("FLOAT")) {
    
          return StructField(field, DoubleType, true)
    
        }
    
        else if (data_type.contains("TIMESTAMP"))
    
          return StructField(field, TimestampType, true)
    
        else
    
          return StructField(field, StringType, true)
    
      }
    
      def mkRow(str: RDD[String], maps: Array[(String, String)]): RDD[Row] = {
    
        str.map(s => {
    
          val map: util.HashMap[String, String] = new util.HashMap()
    
          val tup = s.split(" ").map(_.split("=")).filter(s => s.length == 2).foreach(x => map.put(x(0), x(1))) //数据:k v
    
          var arr: Array[(String, String, String)] = new Array[(String, String, String)](maps.length)
    
          val seq = for (i <- 0 until maps.length) yield {
    
            convert(maps(i)._1, maps(i)._2, map.get(maps(i)._1))
    
          }
    
          Row.fromSeq(seq)
    
        })
    
      }
    
      def convert(field: String, types: String, data: String): Any = {
    
        var result: Any = null
    
        if (StringUtils.isNotBlank(data)) {
    
          if (types.equals("BIGINT") || types.contains("LONG") || types.contains("INT") || types.contains("BYTE") || types.contains("TINYINT"))
    
            if (data.contains(".")) {
    
              result = data.substring(0, data.indexOf('.')).toLong
    
            }
    
            else
    
              result = data.toLong
    
          else if (types.contains("FLOAT") || types.contains("DOUBLE"))
    
            result = data.toDouble
    
          else if (types.contains("TIMESTAMP")) {
    
            if (data.contains(":") || data.contains("-")) {
    
              val loc = new Locale("en")
    
              val dfm = new SimpleDateFormat("yyyyMMdd-HH:mm:ss", loc)
    
              val dt = dfm.parse(data)
    
              result = new Timestamp(dt.getTime)
    
            }
    
            else
    
              result = new Timestamp(0)
    
          }
    
          else
    
            result = data
    
        }
    
        result
    
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Spark性能优化记录

          本文链接:https://www.haomeiwen.com/subject/eohdcftx.html