背景
- 笔者的大数据监控系统中有一项hdfs路径下异常格式文件检测的功能。简单的说就是每天需要定期的采集hdfs下的路径。
- 在某天添加了hive staging路径后,发现程序OOM了。当时从代码出发怀疑是(1)从DB查询过大没有分页导致直接load到内存导致OOM,(2)线程池中blockqueue是没有设置大小的,可能任务都提交到blockqueue中导致内存溢出。
- 基于上述两个可能点对代码进行了修改并发布,但是在第二天还是又OOM了!!
- 于是只能对堆内存进行分析来找出真正的泄漏点。在这里仅分享最关键的一处泄漏点。
- 注: 可能有朋友问为何不通过解析fsimage来获取hdfs详情,其实fsimage的解析每天都有在做。笔者每天起docker容器拉取fsimage并解析后导入到hive分区表中,再进行相关的加工后导入到mysql中,此过程虽已标准化但是比较麻烦。我也参考了hadoop回放fsimage代码,通过java对fsimage进行解析,但是所需的JVM堆内存需要很大(如:fsimage 20G则至少需要40G JVM内存)。这也是没有用java解析的原因,如果有你有更好的办法麻烦跟我联系,谢谢。
排查过程
- 首先看下项目JVM参数。我使用的是G1回收期(确实给力),然后有记录相关的GC日志,这个可以帮助我觉得到底要设置多大Xmx和Xms,然后在程序OOM的时候会自动给我dump下堆内存(虽然dump过程中对程序有影响,但是好像没其他更好办法了!!)
${JAVA_EXEC} -server -XX:+UseG1GC -Xmx8G -Xms8G -Xss256k -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/xxx -XX:MaxGCPauseMillis=300 -Xloggc:/var/log/xxx/xxx_gc.log -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Dservice.name=${EXEC_COMMEN} -Dfastjson.parser.safeMode=true -cp "${FULL_PATH_SERVER_JAR}:${LIB_PATH}/*:${CONFIG_VERSION_PATH}/" ${MAIN_CLASS} >> ${LOG_FILE} 2>&1 &
-
在dump程序之前,我先用jmap -histo:live <PID>执行了一次强制的full GC,之后通过arthas的heapdump 命令dump下来,也可以用jmap的dump命令。
-
关于OOM分析工具,visualvm和eclipse memory analyze我都有用,总体上觉得MemoryAnalyze会好用一些。
-
如下图所示是程序中加载类的清单,可以看到hdfs的configuration占据了绝大多数
image.png -
同时看到重点包加载也都是跟hadoop的filesystem相关,因此可以判断这个可能是主要的泄漏点。
image.png
-
其它的还可以看到很多kerberos鉴权类也没有释放
image.png
代码排查和修改
- 在代码的工具类中有个获取filesystem的方法。代码通过cluster对象来携带集群的详情,然后构建FileSystem的过程(很多线程都会调用此方法),每个进程进来都先初始化一个Configuration对象,然后进行kerberos鉴权!相关代码如下:
public FileSystem getFileSystemInstance(Cluster cluster){
FileSystem fs = null;
HadoopClusterParam param = JSONObject.parseObject(cluster.getParam(), HadoopClusterParam.class);
Configuration conf = new Configuration();
System.setProperty("java.security.krb5.conf", param.getKrb5Conf());
conf.set("dfs.namenode.kerberos.principal", param.getHdfsKerberosPrincipal());
conf.set("dfs.namenode.kerberos.principal.pattern", "*");
conf.set("hadoop.security.authentication", "kerberos");
conf.set("fs.trash.interval", "1");
conf.set("fs.defaultFS", String.format("hdfs://%s", param.getHaName()));
conf.set(String.format("dfs.client.failover.proxy.provider.%s", param.getHaName()), "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set(String.format("dfs.ha.namenodes.%s", param.getHaName()), param.getHaNamenodes());
String[] nns = param.getHaNamenodes().split(",");
String[] nnHosts = param.getNamenodeAddress().split(",");
conf.set(String.format("dfs.namenode.rpc-address.%s.%s", param.getHaName(), nns[0]), String.format("%s:8020", nnHosts[0]));
conf.set(String.format("dfs.namenode.rpc-address.%s.%s", param.getHaName(), nns[1]), String.format("%s:8020", nnHosts[1]));
conf.set("dfs.nameservices", param.getHaNames());
try {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(param.getHdfsKerberosPrincipal(), param.getHdfsKerberosKeytab());
fs = FileSystem.get(conf);
} catch (Exception e) {
LOGGER.error("Build FileSystem found execption,caused by:", e);
}
return fs;
}
- 上面代码有两个致命缺点(1)Configuration其实是可以复用的而不需要每次重新new,(2)等于登陆一次kerberos鉴权即可而不需要每个都用UserGroupInformation去鉴权。
- 基于上面的致命缺点进行修改后的代码如下:
private Map<String,Configuration> confMap = new HashMap<>();
private Configuration generateFileSystemConf(Cluster cluster) throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
String userName = currentUser.getUserName();
//防止其它操作更新掉当前线程中的kerberos认证用户
if(!HDFS_USER.equals(userName)){
LOGGER.info("The login user has changed,current user:{},change to {}",userName,HDFS_USER);
confMap.remove(cluster.getClusterName());
}
if(confMap.getOrDefault(cluster.getClusterName(),null)==null){
Configuration conf = new Configuration();
HadoopClusterParam param = JSONObject.parseObject(cluster.getParam(), HadoopClusterParam.class);
System.setProperty("java.security.krb5.conf", param.getKrb5Conf());
conf.set("dfs.namenode.kerberos.principal", param.getHdfsKerberosPrincipal());
conf.set("dfs.namenode.kerberos.principal.pattern", "*");
conf.set("hadoop.security.authentication", "kerberos");
conf.set("fs.trash.interval", "1");
conf.set("fs.defaultFS", String.format("hdfs://%s", param.getHaName()));
conf.set(String.format("dfs.client.failover.proxy.provider.%s", param.getHaName()), "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
conf.set(String.format("dfs.ha.namenodes.%s", param.getHaName()), param.getHaNamenodes());
String[] nns = param.getHaNamenodes().split(",");
String[] nnHosts = param.getNamenodeAddress().split(",");
conf.set(String.format("dfs.namenode.rpc-address.%s.%s", param.getHaName(), nns[0]), String.format("%s:8020", nnHosts[0]));
conf.set(String.format("dfs.namenode.rpc-address.%s.%s", param.getHaName(), nns[1]), String.format("%s:8020", nnHosts[1]));
conf.set("dfs.nameservices", param.getHaNames());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(param.getHdfsKerberosPrincipal(), param.getHdfsKerberosKeytab());
confMap.put(cluster.getClusterName(),conf);
return conf;
}
return confMap.get(cluster.getClusterName());
}
public FileSystem getFileSystemInstance(Cluster cluster){
FileSystem fs = null;
try {
Configuration conf = this.generateFileSystemConf(cluster);
fs = FileSystem.get(conf);
} catch (Exception e) {
LOGGER.error("Build FileSystem found execption,caused by:", e);
}
return fs;
}
-
修改后重新发布程序,运行了几天之后再也没有OOM的现象。从下图可见每次minor gc都能正常的回收,old_gen也维持在一个较低的范围内。
image.png
后记
- 为什么Configuration和UserGroupInformation无法回收的问题,我猜测可能跟FileSystem的设计有关。
- Hadoop把对于文件系统的调用封装成了一个FileSystem类,同时FileSystem对于文件系统类的实例做了缓存,如果是来自同一个文件系统,它会返回同一个实例。代码如下:
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
- 为什么要设置cache?我猜是因为每个FileSystem的实例都会建立一个到HDFS Namenode的连接,在大量并发的读时缓存确实能减低namenode的压力。那么可能产生什么情况?你从FileSystem里面拿到的文件系统的实例可能别人也拿到了,而且可能正在用,所以这里也是不推荐你close掉fs的原因
- 由于FileSystem是缓存在hadoop中的,则对Configuration和UserGroupInformation的引用关系是存在的,也即GC ROOT是存在的,因此当前不会被回收。慢慢堆积也就OOM了。
网友评论