美文网首页
原创-spark sql 写入hive较慢原因分析

原创-spark sql 写入hive较慢原因分析

作者: 无色的叶 | 来源:发表于2019-04-29 16:22 被阅读0次

    问题现象

    hbase表30000条数据,使用spark读取hbase数据,按照某一字段值进行分区,分区数在1000个,写入到hive分区表时,耗时较长,大概耗时在25分钟

     spark.sql("insert into legend.test_log_hive partition(name_par) select rowKey,name,age,mobile,addr,name as name_par from result")
    

    根据执行日志分析,可确定耗时在load数据到hive阶段

    spark load执行过程分析

    spark.sql("insert into table") 对应spark类InsertIntoHiveTable中的run方法

    case class InsertIntoHiveTable(
        table: CatalogTable,
        partition: Map[String, Option[String]],
        query: LogicalPlan,
        overwrite: Boolean,
        ifPartitionNotExists: Boolean,
        outputColumnNames: Seq[String]) extends SaveAsHiveFile {
    
      /**
       * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
       * `org.apache.hadoop.hive.serde2.SerDe` and the
       * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
       */
      override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
        val externalCatalog = sparkSession.sharedState.externalCatalog
        val hadoopConf = sparkSession.sessionState.newHadoopConf()
    
        val hiveQlTable = HiveClientImpl.toHiveTable(table)
        // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
        // instances within the closure, since Serializer is not serializable while TableDesc is.
        val tableDesc = new TableDesc(
          hiveQlTable.getInputFormatClass,
          // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
          // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
          // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
          // HiveSequenceFileOutputFormat.
          hiveQlTable.getOutputFormatClass,
          hiveQlTable.getMetadata
        )
        val tableLocation = hiveQlTable.getDataLocation
        val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
    
        try {
        //主要方法
          processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
        } finally {
          // Attempt to delete the staging directory and the inclusive files. If failed, the files are
          // expected to be dropped at the normal termination of VM since deleteOnExit is used.
          deleteExternalTmpPath(hadoopConf)
        }
    
        // un-cache this table.
        sparkSession.catalog.uncacheTable(table.identifier.quotedString)
        sparkSession.sessionState.catalog.refreshTable(table.identifier)
    
        CommandUtils.updateTableStats(sparkSession, table)
    
        // It would be nice to just return the childRdd unchanged so insert operations could be chained,
        // however for now we return an empty list to simplify compatibility checks with hive, which
        // does not return anything for insert operations.
        // TODO: implement hive compatibility as rules.
        Seq.empty[Row]
      }
    
    

    insert overwrite 执行分为三步,一个是select,一个是write,一个是load,前边两步没什么问题,主要是最后一步load,以loadPartition为例看下执行过程:

    override def loadPartition(
          db: String,
          table: String,
          loadPath: String,
          partition: TablePartitionSpec,
          isOverwrite: Boolean,
          inheritTableSpecs: Boolean,
          isSrcLocal: Boolean): Unit = withClient {
        requireTableExists(db, table)
    
        val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
        getTable(db, table).partitionColumnNames.foreach { colName =>
          // Hive metastore is not case preserving and keeps partition columns with lower cased names,
          // and Hive will validate the column names in partition spec to make sure they are partition
          // columns. Here we Lowercase the column names before passing the partition spec to Hive
          // client, to satisfy Hive.
          orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
        }
    
        client.loadPartition(
          loadPath,
          db,
          table,
          orderedPartitionSpec,
          isOverwrite,
          inheritTableSpecs,
          isSrcLocal)
      }
    

    接着调用org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition

    def loadPartition(
          loadPath: String,
          dbName: String,
          tableName: String,
          partSpec: java.util.LinkedHashMap[String, String],
          replace: Boolean,
          inheritTableSpecs: Boolean,
          isSrcLocal: Boolean): Unit = withHiveState {
        val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
        shim.loadPartition(
          client,
          new Path(loadPath), // TODO: Use URI
          s"$dbName.$tableName",
          partSpec,
          replace,
          inheritTableSpecs,
          isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
          isSrcLocal = isSrcLocal)
      }
    

    注意代码中shim.loadPartition,shim默认对应于版本1.2.1

    private val shim = version match {
        case hive.v12 => new Shim_v0_12()
        case hive.v13 => new Shim_v0_13()
        case hive.v14 => new Shim_v0_14()
        case hive.v1_0 => new Shim_v1_0()
        case hive.v1_1 => new Shim_v1_1()
        case hive.v1_2 => new Shim_v1_2()
        case hive.v2_0 => new Shim_v2_0()
        case hive.v2_1 => new Shim_v2_1()
        case hive.v2_2 => new Shim_v2_2()
        case hive.v2_3 => new Shim_v2_3()
      }
    

    则对应于对象new Shim_v1_2().loadPartition

    override def loadPartition(
          hive: Hive,
          loadPath: Path,
          tableName: String,
          partSpec: JMap[String, String],
          replace: Boolean,
          inheritTableSpecs: Boolean,
          isSkewedStoreAsSubdir: Boolean,
          isSrcLocal: Boolean): Unit = {
        loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
          holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
          isSrcLocal: JBoolean, isAcid)
      }
    
    
    private lazy val loadPartitionMethod =
        findMethod(
          classOf[Hive],
          "loadPartition",
          classOf[Path],
          classOf[String],
          classOf[JMap[String, String]],
          JBoolean.TYPE,
          JBoolean.TYPE,
          JBoolean.TYPE,
          JBoolean.TYPE,
          JBoolean.TYPE,
          JBoolean.TYPE)
    

    反射调用hive的类org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(版本是1.2.1)
    hive在执行loadPartition的时候,如果分区目录已经存在,会调用replaceFiles,replaceFiles会调用trashFilesUnderDir,trashFilesUnderDir里会逐个将文件放到回收站;不存在则直接创建相应目录及copy文件,其中trashFilesUnderDir方法是for循环一个一个进行删除,在文件数比较多时会比较慢

    public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf) throws FileNotFoundException, IOException {
            FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER);
            boolean result = true;
            FileStatus[] arr$ = statuses;
            int len$ = statuses.length;
    
            for(int i$ = 0; i$ < len$; ++i$) {
                FileStatus status = arr$[i$];
                result &= moveToTrash(fs, status.getPath(), conf);
            }
    
            return result;
        }
    

    对比hive-3.1.1源码发现

    protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
              boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
              boolean isNeedRecycle, boolean isManaged) throws HiveException {
        try {
    
          FileSystem destFs = destf.getFileSystem(conf);
          // check if srcf contains nested sub-directories
          FileStatus[] srcs;
          FileSystem srcFs;
          try {
            srcFs = srcf.getFileSystem(conf);
            srcs = srcFs.globStatus(srcf);
          } catch (IOException e) {
            throw new HiveException("Getting globStatus " + srcf.toString(), e);
          }
          if (srcs == null) {
            LOG.info("No sources specified to move: " + srcf);
            return;
          }
    
          if (oldPath != null) {
            deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isNeedRecycle);
          }
    
          // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
          // destf
          boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
          if(!destfExist) {
            throw new IOException("Directory " + destf.toString()
                + " does not exist and could not be created.");
          }
    
    

    //接着调用deleteOldPathForReplace方法

     private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
          PathFilter pathFilter, boolean isNeedRecycle) throws HiveException {
        Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath
            + " and old path " + oldPath);
        boolean isOldPathUnderDestf = false;
        try {
          FileSystem oldFs = oldPath.getFileSystem(conf);
          FileSystem destFs = destPath.getFileSystem(conf);
          // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
          // existing content might result in incorrect (extra) data.
          // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
          // not the destf or its subdir?
          isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
          if (isOldPathUnderDestf) {
            cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge, isNeedRecycle);
          }
        } catch (IOException e) {
          if (isOldPathUnderDestf) {
            // if oldPath is a subdir of destf but it could not be cleaned
            throw new HiveException("Directory " + oldPath.toString()
                + " could not be cleaned up.", e);
          } else {
            //swallow the exception since it won't affect the final result
            LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
          }
        }
    

    接着调用cleanUpOneDirectoryForReplace方法

    private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
          PathFilter pathFilter, HiveConf conf, boolean purge, boolean isNeedRecycle) throws IOException, HiveException {
        if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
          recycleDirToCmPath(path, purge);
        }
        FileStatus[] statuses = fs.listStatus(path, pathFilter);
        if (statuses == null || statuses.length == 0) {
          return;
        }
        if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
          String s = "Deleting files under " + path + " for replace: ";
          for (FileStatus file : statuses) {
            s += file.getPath().getName() + ", ";
          }
          Utilities.FILE_OP_LOGGER.trace(s);
        }
    
        if (!trashFiles(fs, statuses, conf, purge)) {
          throw new HiveException("Old path " + path + " has not been cleaned up.");
        }
      }
    

    最后调用trashFiles

    /**
       * Trashes or deletes all files under a directory. Leaves the directory as is.
       * @param fs FileSystem to use
       * @param statuses fileStatuses of files to be deleted
       * @param conf hive configuration
       * @return true if deletion successful
       * @throws IOException
       */
      public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses,
          final Configuration conf, final boolean purge)
          throws IOException {
        boolean result = true;
    
        if (statuses == null || statuses.length == 0) {
          return false;
        }
        final List<Future<Boolean>> futures = new LinkedList<>();
        final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
            Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null;
        final SessionState parentSession = SessionState.get();
        for (final FileStatus status : statuses) {
          if (null == pool) {
            result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
          } else {
            futures.add(pool.submit(new Callable<Boolean>() {
              @Override
              public Boolean call() throws Exception {
                SessionState.setCurrentSessionState(parentSession);
                return FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
              }
            }));
          }
        }
        if (null != pool) {
          pool.shutdown();
          for (Future<Boolean> future : futures) {
            try {
              result &= future.get();
            } catch (InterruptedException | ExecutionException e) {
              LOG.error("Failed to delete: ",e);
              pool.shutdownNow();
              throw new IOException(e);
            }
          }
        }
        return result;
      }
    

    可看到该trash方法中使用了多线程对文件进行处理,其实再查看如果文件不存在执行copyFiles流程方法,也使用了线程池进行文件copy从而提高了文件拷贝速度

    调用链路


    spark sql load执行链路.png

    相关文章

      网友评论

          本文标题:原创-spark sql 写入hive较慢原因分析

          本文链接:https://www.haomeiwen.com/subject/qxwynqtx.html