问题现象
hbase表30000条数据,使用spark读取hbase数据,按照某一字段值进行分区,分区数在1000个,写入到hive分区表时,耗时较长,大概耗时在25分钟
spark.sql("insert into legend.test_log_hive partition(name_par) select rowKey,name,age,mobile,addr,name as name_par from result")
根据执行日志分析,可确定耗时在load数据到hive阶段
spark load执行过程分析
spark.sql("insert into table") 对应spark类InsertIntoHiveTable中的run方法
case class InsertIntoHiveTable(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
outputColumnNames: Seq[String]) extends SaveAsHiveFile {
/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
* `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
*/
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val externalCatalog = sparkSession.sharedState.externalCatalog
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val hiveQlTable = HiveClientImpl.toHiveTable(table)
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = new TableDesc(
hiveQlTable.getInputFormatClass,
// The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
// getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
// substitute some output formats, e.g. substituting SequenceFileOutputFormat to
// HiveSequenceFileOutputFormat.
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata
)
val tableLocation = hiveQlTable.getDataLocation
val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
try {
//主要方法
processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
} finally {
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
deleteExternalTmpPath(hadoopConf)
}
// un-cache this table.
sparkSession.catalog.uncacheTable(table.identifier.quotedString)
sparkSession.sessionState.catalog.refreshTable(table.identifier)
CommandUtils.updateTableStats(sparkSession, table)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
Seq.empty[Row]
}
insert overwrite 执行分为三步,一个是select,一个是write,一个是load,前边两步没什么问题,主要是最后一步load,以loadPartition为例看下执行过程:
override def loadPartition(
db: String,
table: String,
loadPath: String,
partition: TablePartitionSpec,
isOverwrite: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
getTable(db, table).partitionColumnNames.foreach { colName =>
// Hive metastore is not case preserving and keeps partition columns with lower cased names,
// and Hive will validate the column names in partition spec to make sure they are partition
// columns. Here we Lowercase the column names before passing the partition spec to Hive
// client, to satisfy Hive.
orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
}
client.loadPartition(
loadPath,
db,
table,
orderedPartitionSpec,
isOverwrite,
inheritTableSpecs,
isSrcLocal)
}
接着调用org.apache.spark.sql.hive.client.HiveClientImpl.loadPartition
def loadPartition(
loadPath: String,
dbName: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withHiveState {
val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
shim.loadPartition(
client,
new Path(loadPath), // TODO: Use URI
s"$dbName.$tableName",
partSpec,
replace,
inheritTableSpecs,
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
isSrcLocal = isSrcLocal)
}
注意代码中shim.loadPartition,shim默认对应于版本1.2.1
private val shim = version match {
case hive.v12 => new Shim_v0_12()
case hive.v13 => new Shim_v0_13()
case hive.v14 => new Shim_v0_14()
case hive.v1_0 => new Shim_v1_0()
case hive.v1_1 => new Shim_v1_1()
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
case hive.v2_1 => new Shim_v2_1()
case hive.v2_2 => new Shim_v2_2()
case hive.v2_3 => new Shim_v2_3()
}
则对应于对象new Shim_v1_2().loadPartition
override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, isAcid)
}
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
反射调用hive的类org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(版本是1.2.1)
hive在执行loadPartition的时候,如果分区目录已经存在,会调用replaceFiles,replaceFiles会调用trashFilesUnderDir,trashFilesUnderDir里会逐个将文件放到回收站;不存在则直接创建相应目录及copy文件,其中trashFilesUnderDir方法是for循环一个一个进行删除,在文件数比较多时会比较慢
public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf) throws FileNotFoundException, IOException {
FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER);
boolean result = true;
FileStatus[] arr$ = statuses;
int len$ = statuses.length;
for(int i$ = 0; i$ < len$; ++i$) {
FileStatus status = arr$[i$];
result &= moveToTrash(fs, status.getPath(), conf);
}
return result;
}
对比hive-3.1.1源码发现
protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
boolean isNeedRecycle, boolean isManaged) throws HiveException {
try {
FileSystem destFs = destf.getFileSystem(conf);
// check if srcf contains nested sub-directories
FileStatus[] srcs;
FileSystem srcFs;
try {
srcFs = srcf.getFileSystem(conf);
srcs = srcFs.globStatus(srcf);
} catch (IOException e) {
throw new HiveException("Getting globStatus " + srcf.toString(), e);
}
if (srcs == null) {
LOG.info("No sources specified to move: " + srcf);
return;
}
if (oldPath != null) {
deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isNeedRecycle);
}
// first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
// destf
boolean destfExist = FileUtils.mkdir(destFs, destf, conf);
if(!destfExist) {
throw new IOException("Directory " + destf.toString()
+ " does not exist and could not be created.");
}
//接着调用deleteOldPathForReplace方法
private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
PathFilter pathFilter, boolean isNeedRecycle) throws HiveException {
Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath
+ " and old path " + oldPath);
boolean isOldPathUnderDestf = false;
try {
FileSystem oldFs = oldPath.getFileSystem(conf);
FileSystem destFs = destPath.getFileSystem(conf);
// if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
// existing content might result in incorrect (extra) data.
// But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
// not the destf or its subdir?
isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
if (isOldPathUnderDestf) {
cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge, isNeedRecycle);
}
} catch (IOException e) {
if (isOldPathUnderDestf) {
// if oldPath is a subdir of destf but it could not be cleaned
throw new HiveException("Directory " + oldPath.toString()
+ " could not be cleaned up.", e);
} else {
//swallow the exception since it won't affect the final result
LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
}
}
接着调用cleanUpOneDirectoryForReplace方法
private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
PathFilter pathFilter, HiveConf conf, boolean purge, boolean isNeedRecycle) throws IOException, HiveException {
if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
recycleDirToCmPath(path, purge);
}
FileStatus[] statuses = fs.listStatus(path, pathFilter);
if (statuses == null || statuses.length == 0) {
return;
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
String s = "Deleting files under " + path + " for replace: ";
for (FileStatus file : statuses) {
s += file.getPath().getName() + ", ";
}
Utilities.FILE_OP_LOGGER.trace(s);
}
if (!trashFiles(fs, statuses, conf, purge)) {
throw new HiveException("Old path " + path + " has not been cleaned up.");
}
}
最后调用trashFiles
/**
* Trashes or deletes all files under a directory. Leaves the directory as is.
* @param fs FileSystem to use
* @param statuses fileStatuses of files to be deleted
* @param conf hive configuration
* @return true if deletion successful
* @throws IOException
*/
public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses,
final Configuration conf, final boolean purge)
throws IOException {
boolean result = true;
if (statuses == null || statuses.length == 0) {
return false;
}
final List<Future<Boolean>> futures = new LinkedList<>();
final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null;
final SessionState parentSession = SessionState.get();
for (final FileStatus status : statuses) {
if (null == pool) {
result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
} else {
futures.add(pool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
SessionState.setCurrentSessionState(parentSession);
return FileUtils.moveToTrash(fs, status.getPath(), conf, purge);
}
}));
}
}
if (null != pool) {
pool.shutdown();
for (Future<Boolean> future : futures) {
try {
result &= future.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to delete: ",e);
pool.shutdownNow();
throw new IOException(e);
}
}
}
return result;
}
可看到该trash方法中使用了多线程对文件进行处理,其实再查看如果文件不存在执行copyFiles流程方法,也使用了线程池进行文件copy从而提高了文件拷贝速度
调用链路

网友评论