问题现象
生产环境hiveserver2使用过程中占用大量内存,甚至将内存上限增大到32G左右后hiveserver2仍会达到内存上限。使用G1GC,单次full GC耗时很长,且绝大部分内存无法被回收(只能回收几百M内存),然后发生OOM退出。怀疑存在内存泄漏问题。本文围绕hiveserver2内存泄漏问题展开分析。
生产环境Hive版本为3.1.0。
Heap dump 分析
在生产服务器获取到hiveserver2 OOM时候的heap dump之后,使用MAT工具分析。获取它的leak suspect报告。具体分析步骤参见Java Heap Dump 分析步骤。
我们使用浏览器打开leak suspect报告。发现里面有一个内存泄漏怀疑点。打开详情,内容如下图所示:
Leak suspect报告上图中可以很明显的看出operationManager
中有一个叫做queryIdOperation
的ConcurrentHashMap
,占用了大量的内存。这个报告给出了问题分析的方向。下面的分析围绕着queryIdOpereation
展开。
原因分析
我们查看源代码,发现OperationManager
类中的queryIdOperation
为私有变量。因此queryIdOperation
只可能在OperationManager
中操作。继续寻找操作queryIdOperation
的方法,发现有如下三个:
- addOperation
- removeOperation
- getOperationByQueryId
其中前两个方法分别为向集合中添加和移除元素。我们接下来分析这两个方法。
OperationManager
的addOperation
方法代码如下:
private void addOperation(Operation operation) {
LOG.info("Adding operation: " + operation.getHandle());
// 通过getQueryId方法从operation中获取queryId,然后将queryId作为key,存放入queryIdOperation
queryIdOperation.put(getQueryId(operation), operation);
handleToOperation.put(operation.getHandle(), operation);
if (operation instanceof SQLOperation) {
synchronized (webuiLock) {
liveQueryInfos.put(operation.getHandle().getHandleIdentifier().toString(),
((SQLOperation) operation).getQueryInfo());
}
}
}
getQueryId
方法内容如下:
private String getQueryId(Operation operation) {
// 获取parent session的HiveConf对象
// queryId在这个HiveConf对象当中存放
return operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
}
这个方法是重点,此处先埋个伏笔,后面还会再次分析这个方法。
removeOperation
方法逻辑如下:
private Operation removeOperation(OperationHandle opHandle) {
Operation operation = handleToOperation.remove(opHandle);
// 通过上面的逻辑,获取queryId
String queryId = getQueryId(operation);
// 从queryIdOperation集合中remove掉
queryIdOperation.remove(queryId);
LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle);
if (operation instanceof SQLOperation) {
removeSafeQueryInfo(opHandle);
}
return operation;
}
接下来我们需要顺着向上层找,分别追踪addOperation
和removeOperation
方法的调用链。
addOperation
方法在OperationManager
的newExecuteStatementOperation
方法中调用,内容如下:
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
throws HiveSQLException {
// 创建一个ExecuteStatementOperation
ExecuteStatementOperation executeStatementOperation =
ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement,
confOverlay, runAsync, queryTimeout);
// 调用addOperation
addOperation(executeStatementOperation);
return executeStatementOperation;
}
追溯这个方法调用,我们来到HiveSessionImpl
的executeStatementInternal
方法,内容如下:
private OperationHandle executeStatementInternal(String statement,
Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
acquire(true, true);
ExecuteStatementOperation operation = null;
OperationHandle opHandle = null;
try {
// 此处调用了newExecuteStatementOperation
operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
confOverlay, runAsync, queryTimeout);
opHandle = operation.getHandle();
addOpHandle(opHandle);
operation.run();
return opHandle;
} catch (HiveSQLException e) {
// Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the
// async background operation submits to thread pool successfully at the same time. So, Cleanup
// opHandle directly when got HiveSQLException
if (opHandle != null) {
removeOpHandle(opHandle);
getOperationManager().closeOperation(opHandle);
}
throw e;
} finally {
if (operation == null || operation.getBackgroundHandle() == null) {
release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.)
} else {
releaseBeforeOpLock(true); // Release, but keep the lock (if present).
}
}
}
再继续向上追踪,发现上面的方法在HiveSessionImpl
的executeStatement
和executeStatementAsync
方法中调用(忽略了重载方法)。这两个方法分别为阻塞方式执行SQL statement和异步执行SQL statement。如果继续向上追踪调用,我们能够找到CLIService
类。ThriftCLIService类又再次包装了CLIService
类,它拥有ExecuteStatement
方法。这个方法是thrift RPC调用的endpoint,通过TExecuteStatementReq
类传递调用参数。继续追踪调用端的话我们陆续跟踪到HiveStatement
的runAsyncOnServer
方法->execute
方法(具体逻辑不再分析,只分析调用链)。HiveStatement
是java.sql.Statement
的子类,因此再往上分析就是JDBC使用的范畴了。调用链分析到这里为止。我们得到的结论是Hive JDBC调用statement
的execute
方法,会在执行SQL前创建一个operation。一条SQL的执行对应着一个operation。
接下来我们转到RemoveOperation
的分析。它的调用位于HiveSessionImpl
的close
方法。close方法会关闭所有的operation。代码如下所示,其他不相关的逻辑此处不分析。
@Override
public void close() throws HiveSQLException {
try {
acquire(true, false);
// Iterate through the opHandles and close their operations
List<OperationHandle> ops = null;
synchronized (opHandleSet) {
ops = new ArrayList<>(opHandleSet);
opHandleSet.clear();
}
// 遍历各个operationHandle,一个operationHandle对应着一个operation
// 然后关闭他们
for (OperationHandle opHandle : ops) {
operationManager.closeOperation(opHandle);
}
// Cleanup session log directory.
cleanupSessionLogDir();
HiveHistory hiveHist = sessionState.getHiveHistory();
if (null != hiveHist) {
hiveHist.closeStream();
}
try {
sessionState.resetThreadName();
sessionState.close();
} finally {
sessionState = null;
}
} catch (IOException ioe) {
throw new HiveSQLException("Failure to close", ioe);
} finally {
if (sessionState != null) {
try {
sessionState.resetThreadName();
sessionState.close();
} catch (Throwable t) {
LOG.warn("Error closing session", t);
}
sessionState = null;
}
if (sessionHive != null) {
try {
Hive.closeCurrent();
} catch (Throwable t) {
LOG.warn("Error closing sessionHive", t);
}
sessionHive = null;
}
release(true, false);
}
}
除此之外还有一处调用位于closeOperation
方法,内容如下。
@Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
acquire(true, false);
try {
operationManager.closeOperation(opHandle);
synchronized (opHandleSet) {
opHandleSet.remove(opHandle);
}
} finally {
release(true, false);
}
}
继续追踪调用,我们发现它的调用端也在CLIService
的closeOperation
方法。继续跟踪到ThriftCLIService
的CloseOperation
方法,它也是thrift RPC endpoint。通过TCloseOperationReq
传递RPC调用参数。追踪到RPC调用端,我们跟踪到HiveStatement
的closeStatementIfNeeded
方法。在往上追踪,调用链为closeClientOperation
-> close
方法。其中close
方法重写了java.sql.Statement
的同名方法。到这里我们得到结论,close Hive的statement的时候,会调用removeOperation
,从而将operation从queryIdOperation
中移除。
按照JDBC标准使用方式,statement使用完毕之后是必须要close的。也就是说正常情况下addOperation
和removeOperation
必然是成对出现。我们先假设用户使用问题,没有及时close掉statement。
接着继续分析还有哪些时机会调用removeOperation
方法。我们找到HiveSessionImpl
的close
方法 -> SessionManager
的closeSession
方法。除了正常关闭session外,SessionManager
中还有一个startTimeoutChecker
。这个方法周期运行,当session超时的时候会自动关闭session。从而关闭所有的statement。这些措施确保了removeOperation
是一定会被调用到的。就算是用户使用问题,没有close掉statement,这些operation也是可以被清理掉的。
造成OOM的原因是某些operation
始终不能够被remove掉。查看日志我们的确发现部分query id的确没有被remove掉(removeOperation
中LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle);
这一行代码会打印日志,存在一些query id没有这一行日志)。问题可能在于OperationManager
的getQueryId
方法。无法通过operation获取到它对应的query id。
我们回到OperationManager
的getQueryId
方法。发现query id并没有存储在operation中,而是存储在HiveConf
中。
private String getQueryId(Operation operation) {
return operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
}
一路跟踪operation的parentSession是什么时候赋值进去的。最终找到了HiveSessionImpl
的executeStatementInternal
方法。下面只贴出关键的一行,其他无关部分省略。
operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
confOverlay, runAsync, queryTimeout);
getSession
方法返回的是this
。说明这些operation共用同一个Hive Session(同一个JDBC连接下所有操作公用session)。自然HiveConf
也是公用的。到这里为止分析的重点来到了这个HiveConf
保存的内容上。
Hive的query id存储在HiveConf中,key为ConfVars.HIVEQUERYID
。猜测这个key一定有某个地方被set。跟踪HiveConf
的set这个key的调用,我们发现QueryState
的build
方法。
QueryState
的build
方法中分配新的queryId。方法内容如下:
public QueryState build() {
HiveConf queryConf;
if (isolated) {
// isolate query conf
if (hiveConf == null) {
queryConf = new HiveConf();
} else {
queryConf = new HiveConf(hiveConf);
}
} else {
queryConf = hiveConf;
}
// Set the specific parameters if needed
if (confOverlay != null && !confOverlay.isEmpty()) {
// apply overlay query specific settings, if any
for (Map.Entry<String, String> confEntry : confOverlay.entrySet()) {
try {
queryConf.verifyAndSet(confEntry.getKey(), confEntry.getValue());
} catch (IllegalArgumentException e) {
throw new RuntimeException("Error applying statement specific settings", e);
}
}
}
// Generate the new queryId if needed
// 如果需要生成新的query id
if (generateNewQueryId) {
// 分配新的query id
String queryId = QueryPlan.makeQueryId();
queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
// FIXME: druid storage handler relies on query.id to maintain some staging directories
// expose queryid to session level
// 将query id存放到hive session中
if (hiveConf != null) {
hiveConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
}
}
QueryState queryState = new QueryState(queryConf);
if (lineageState != null) {
queryState.setLineageState(lineageState);
}
return queryState;
}
下面我们要确认下这个build
方法是否在执行SQL查询的过程中调用。跟踪调用我们发现Operation
类的构造函数。内容如下:
protected Operation(HiveSession parentSession,
Map<String, String> confOverlay, OperationType opType) {
this.parentSession = parentSession;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
beginTime = System.currentTimeMillis();
lastAccessTime = beginTime;
operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
scheduledExecutorService = Executors.newScheduledThreadPool(1);
currentStateScope = updateOperationStateMetrics(null, MetricsConstant.OPERATION_PREFIX,
MetricsConstant.COMPLETED_OPERATION_PREFIX, state);
// 这里创建出了queryState
// 这个queryState被operation持有
queryState = new QueryState.Builder()
.withConfOverlay(confOverlay)
// 指定需要生成query id
.withGenerateNewQueryId(true)
.withHiveConf(parentSession.getHiveConf())
.build();
}
跟踪这个构造函数,不难发现ExecuteStatementOperation
是Operation
的子类。创建ExecuteStatementOperation
的时候调用了这个方法。
public ExecuteStatementOperation(HiveSession parentSession, String statement,
Map<String, String> confOverlay, boolean runInBackground) {
super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT);
this.statement = statement;
}
ExecuteStatementOperation
是HiveCommandOperation
的父类。HiveCommandOperation
的构造函数中自然需要调用上面的方法。
ExecuteStatementOperation
还有一个方法newExecuteStatementOperation
。这个方法我们上面已经分析过了,它最后创建了一个HiveCommandOperation
对象并返回。经过这段分析我们验证了Hive每次执行SQL statement的时候都会设置一个新的query id。那么问题来了,如果上一个query id还被来得及被remove就设置了新的query id,上一个query id就再也没有机会被remove,造成OOM的问题。同一个session只会保存最后一个query id。到此问题的根源已经找到。
问题解决
跟踪社区我们发现在Hive项目的branch-3.1
分支中有一个HIVE-26530
patch。这个patch合并的时间明显晚于Hive 3.1.0发布的时间,是一个hotfix。它对应的正是OperationManager
的getQueryId
方法的修改。这个patch将OperationManager
的getQueryId
方法从:
private String getQueryId(Operation operation) {
return operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
}
修改为:
private String getQueryId(Operation operation) {
return operation.getQueryId();
}
Operation
类增加如下代码:
public String getQueryId() {
return queryState.getQueryId();
}
该patch做出的改动将query id保存在每个operation
专有的queryState
中,从而杜绝了query id被覆盖的情况。将本地Hive3.1.0代码合入这个patch后重新编译。替换集群中的hive-service-xxx.jar
为新编译输出的jar后重启集群,问题解决。目前使用Hive 3.x版本的环境都存在此隐患,建议紧急修复此问题。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论