问题描述
在提交flinkStreamSQL时,抛出异常信息java.lang.OutOfMemoryError: Metaspace,(已设置-XX:MaxMetaspaceSize=400M 不让其动态增长)很明显元数据溢出,猜测是加载的类过多。通过获取堆转存快照发现,堆被大量DtClassLoader对象占据,这才意识到我们没有对DtClassLoader做相关缓存处理,导致每次提交任务都会创建出DtClassLoader对象,并由该对象加载我们的插件类。
使用ClassLoaderManager统一管理
- 定义函数接口,由指定的ClassLoader去加载插件。
@FunctionalInterface
public interface ClassLoaderSupplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get(ClassLoader cl) throws Exception;
}
- 使用Map集合缓存类加载器对象,key为要加载插件的URL。
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
}
// get or create
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
try {
URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
return classLoader;
} catch (Throwable e) {
LOG.error("retrieve ClassLoad happens error:{}", e);
throw new RuntimeException("retrieve ClassLoad happens error");
}
});
}
- 使用集合中缓存的类加载器,回调函数接口方法加载插件,并切换回父类加载器。
public class ClassLoaderSupplierCallBack {
public static <R> R callbackAndReset(ClassLoaderSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(toSetClassLoader);
try {
return supplier.get(toSetClassLoader);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
}
- 插件调用方法调整
转换之前,设置DtClassLoader为当前类加载器。
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
Thread.currentThread().setContextClassLoader(parentClassloader);
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String pathOfType = String.format(PATH_FORMAT, sideType);
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}
转换后
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
String pathOfType = String.format(PATH_FORMAT, sideType);
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
cl.loadClass(className).asSubclass(AsyncReqRow.class)
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}
网友评论