美文网首页
Spark性能优化记录

Spark性能优化记录

作者: hellozepp | 来源:发表于2018-03-26 20:47 被阅读0次

一、背景

1). 针对现有的spark日志清洗程序,寻找可优化的点,减少资源消耗。

2). 熟悉spark开发、测试、优化流程。

二、 方案

1. RDD调优

1). 调优内容:其他参数不变,替换map算子为mapPartitions.

2). 修改依据:将逐条拉取数据的方式换成按分区处理数据。

3). 结果: 经测试发现性能降低(在系统资源较少的情况会出现耗时成倍增长的情况)。

2. partition优化

经分析6个文件总共5.8G数据被分到6个Task处理,造成并行度低处理速度变慢,尝试repartition分区为50个,并发量明显提高。

3. 算法优化

减少多次RDD迭代,优化代码降低时间复杂度。使用hashMap取代遍历Array操作,减少mapRDD使用次数.

三、结果

性能提升29.4%。

D9B0A2F7-F11E-453E-8DF7-DC83326C6F44.png

附(代码):


line1.map {s => loadLog(s, broad_fields.value) }

.map(p => convert(p, broad_fields.value,broad_types.value))

//函数代码

def makeField(field: String, data_type: String): StructField = {

if (data_type.equals("BIGINT") || data_type.contains("LONG") || data_type.contains("INT") || data_type.contains("BYTE") || data_type.contains("TINYINT")) {

return StructField(field, LongType, true)

  }

else if (data_type.contains("DOUBLE") || data_type.contains("FLOAT")) {

return StructField(field, DoubleType, true)

  }

else if (data_type.contains("TIMESTAMP"))

return StructField(field, TimestampType, true)

else

    returnStructField(field, StringType, true)

}

def convert(field: String, types: String, data: String): Any = {

var result: Any = null

  if(StringUtils.isNotBlank(data)) {

if (types.equals("BIGINT") || types.contains("LONG") || types.contains("INT") || types.contains("BYTE") || types.contains("TINYINT"))

if (data.contains(".")) {

result = data.substring(0, data.indexOf('.')).toLong

}

else

        result = data.toLong

else if (types.contains("FLOAT") || types.contains("DOUBLE"))

result = data.toDouble

else if (types.contains("TIMESTAMP")) {

if (data.contains(":") || data.contains("-")) {

val loc = new Locale("en")

val dfm = new SimpleDateFormat("yyyyMMdd-HH:mm:ss", loc)

val dt = dfm.parse(data)

result = new Timestamp(dt.getTime)

      }

else

        result = new Timestamp(0)

    }

else

      result = data

  }

result

}

优化为:


import java.io.IOException

import java.sql.Timestamp

import java.text.SimpleDateFormat

import java.util

import java.util.Locale

import org.apache.commons.lang.StringUtils

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

import org.apache.spark.{SparkConf, SparkContext}

object FlumeDataToHive3 {

  def main(args: Array[String]): Unit = {

    if (args.length < 2) {

      System.err.println("the number of parameters shold more than 1")

      System.exit(1)

    }

    try {

      val inputpath = args(0)

      val outputstr = args(1)

  val fields = Array("logid", "logtime", "logtype", "logip", "businessline", "application", "apptype", "driver_id", "coupling_order_num", "log_type", "current_grab_num", "must_grab", "isVieorderLimit", "coupling_rate", "driver_rank", "angle", "collection_driver", "dirver_str", "new_driver_order", "new_driver", "a", "no_actual_distance","is_new_user", "order_id", "coupling_order","city_id")

      val types = Array("STRING", "STRING",   "STRING", "STRING", "STRING",       "STRING",       "STRING", "STRING",     "STRING",            "STRING",   "TINYINT",           "TINYINT", "BOOLEAN",         "DOUBLE",         "INT",          "DOUBLE", "TINYINT",          "DOUBLE",     "BOOLEAN",          "BOOLEAN", "DOUBLE", "DOUBLE",          "TINYINT",      "BIGINT", "BIGINT"      , "BIGINT")

      var num = 0

      var pairs: Array[(String, String)] = new Array[(String, String)](fields.length)

      while (num < fields.length) {

        pairs(num) = (fields(num), types(num))

        num += 1

      }

      val conf = new SparkConf()

      val sc = new SparkContext(conf)

      val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

      val broadmap = sc.broadcast(pairs)

      val line = sc.textFile(inputpath).repartition(100)

      val arrays = broadmap.value

      val schema = mkSchema(arrays)

      if (!line.isEmpty()) {

        val line1 = line.filter(s => StringUtils.isNotBlank(s) && s.contains("logid"))

        if (!line1.isEmpty()) {

          val splits = mkRow(line1, arrays)

          val df = hiveContext.createDataFrame(splits, schema)

          df.write.parquet(outputstr)

          df.show()

        }

      }

      sc.stop()

    }

    catch {

      case ex: InterruptedException => {

        println("InterruptedException")

      }

      case ex: IOException => {

        println("IOException")

      }

    }

  }

  def mkSchema(maps: Array[(String, String)]): StructType = {

    StructType(for (elem <- maps) yield {

      makeField(elem._1, elem._2)

    })

  }

  def makeField(field: String, data_type: String): StructField = {

    if (data_type.equals("BIGINT") || data_type.contains("LONG") || data_type.contains("INT") || data_type.contains("BYTE") || data_type.contains("TINYINT")) {

      return StructField(field, LongType, true)

    }

    else if (data_type.contains("DOUBLE") || data_type.contains("FLOAT")) {

      return StructField(field, DoubleType, true)

    }

    else if (data_type.contains("TIMESTAMP"))

      return StructField(field, TimestampType, true)

    else

      return StructField(field, StringType, true)

  }

  def mkRow(str: RDD[String], maps: Array[(String, String)]): RDD[Row] = {

    str.map(s => {

      val map: util.HashMap[String, String] = new util.HashMap()

      val tup = s.split(" ").map(_.split("=")).filter(s => s.length == 2).foreach(x => map.put(x(0), x(1))) //数据:k v

      var arr: Array[(String, String, String)] = new Array[(String, String, String)](maps.length)

      val seq = for (i <- 0 until maps.length) yield {

        convert(maps(i)._1, maps(i)._2, map.get(maps(i)._1))

      }

      Row.fromSeq(seq)

    })

  }

  def convert(field: String, types: String, data: String): Any = {

    var result: Any = null

    if (StringUtils.isNotBlank(data)) {

      if (types.equals("BIGINT") || types.contains("LONG") || types.contains("INT") || types.contains("BYTE") || types.contains("TINYINT"))

        if (data.contains(".")) {

          result = data.substring(0, data.indexOf('.')).toLong

        }

        else

          result = data.toLong

      else if (types.contains("FLOAT") || types.contains("DOUBLE"))

        result = data.toDouble

      else if (types.contains("TIMESTAMP")) {

        if (data.contains(":") || data.contains("-")) {

          val loc = new Locale("en")

          val dfm = new SimpleDateFormat("yyyyMMdd-HH:mm:ss", loc)

          val dt = dfm.parse(data)

          result = new Timestamp(dt.getTime)

        }

        else

          result = new Timestamp(0)

      }

      else

        result = data

    }

    result

  }

}

相关文章

  • Awesome Extra

    性能优化 性能优化模式 常见性能优化策略的总结 Spark 性能优化指南——基础篇 Spark 性能优化指南——高...

  • Spark 性能优化方案

    Spark 性能优化方案(转自李智慧的Spark性能优化方案): Spark性能测试工具 •Spark性能测试基准...

  • Spark性能调优

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Spark性能优化:数据倾斜调优(转)

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Spark性能优化:开发调优篇(转)

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • Spark性能优化:资源调优篇(转)

    《Spark性能优化:开发调优篇》《Spark性能优化:资源调优篇》《Spark性能优化:数据倾斜调优》《Spar...

  • spark性能调优

    [Spark性能优化指南——基础篇][Spark性能优化指南——高级篇]

  • 目录

    Spark之参数介绍 Spark之性能优化2.1. 官方性能优化指南2.2. Spark性能优化指南——基础篇2....

  • 美团关于大数据技术的文章

    Spark性能优化指南——基础篇Spark性能优化指南——高级篇Spark在美团的实践Kafka文件存储机制那些事...

  • Spark性能优化记录

    一、背景 1). 针对现有的spark日志清洗程序,寻找可优化的点,减少资源消耗。 2). 熟悉spark开发、测...

网友评论

      本文标题:Spark性能优化记录

      本文链接:https://www.haomeiwen.com/subject/eohdcftx.html