最近看《SQL内核剖析》,提到在org.apache.spark.sql.hive.client.IsolatedClientLoader中通过自定义类加载器,运用类加载机制,达到Spark程序类(源码中属于SharedClass)通过org.apache.spark.sql.hive.client.HiveClient(源码中属于BarrierClass,一个接口),调用org.apache.spark.sql.hive.client.HiveClientImpl中的实现方法,访问底层依赖的Hive。
-
如何隔离Spark程序类和Hive类
-
分析前先要清楚源码中的几个点
1.IsolatedClientLoader 中2个类加载器构造参数
IsolatedClientLoader(
...
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,//这个为null, 实际上已经是BootStrap ClassLoader 负责加载<JAVA_HOME>\lib下的类
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader//这个其实就是Application ClassLoader 加载应用程序的类
...
)
2.IsolatedClientLoader 将要加载的类分为3个类别
##BarrierClass
protected def isBarrierClass(name: String): Boolean =
name.startsWith(classOf[HiveClientImpl].getName) ||
name.startsWith(classOf[Shim].getName) ||
barrierPrefixes.exists(name.startsWith)
##SharedClass
protected def isSharedClass(name: String): Boolean = {
val isHadoopClass =
name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")
name.contains("slf4j") ||
name.contains("log4j") ||
name.startsWith("org.apache.spark.") ||
(sharesHadoopClasses && isHadoopClass) ||
name.startsWith("scala.") ||
(name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
name.startsWith("java.lang.") ||
name.startsWith("java.net") ||
sharedPrefixes.exists(name.startsWith)
}
##HiveClass
剩下的就是Hive依赖相关的类了
- 通过自定义加载器实现隔离
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) { //要实现隔离,前提是这个选项必须打开
new URLClassLoader(allJars, rootClassLoader) { // 这里创建一个匿名内部类,其父类为URLClassLoader。同时可以看到,其父加载器被设置为rootClassLoader,即上文中提到的 BootStrapClassLoader
//重写父类loadClass方法
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\.", "/") + ".class"
if (isBarrierClass(name)) {
......
defineClass(name, bytes, 0, bytes.length) //如果是BarrierClass则自己加载,前文提到过HiveClientImpl就属于BarrierClass
......
} else if (!isSharedClass(name)) {
//如果不是BarrierClass也不是SharedClass(其实就是HiveClass了),则调用其父类的loadClass方法。
//此时parent classloader是rootClassLoader,其父类(即URLClassLoader)的parent classloader也是rootClassLoader,而rootClassLoader只能加载到<JAVA_HOME>\lib下的类,所以实际上属于Hive依赖下的一些类是无法委派给rootClassLoader加载到的,最后还是得自己加载,自己消化。
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)// 走到这里的类属于SharedClass了,交给baseClassLoader处理,org.apache.spark.sql.hive.client.HiveClient就属于这里
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
}
} else {
baseClassLoader //如果隔离开关没打开,则使用Application ClassLoader,则SharedClass们可以直接访问HiveClass们
}
new NonClosableMutableURLClassLoader(isolatedClassLoader)
}
关键源码逻辑大概就是这样,到底为啥能隔离呢,通过下面的图来个最终说明
加载归属图
根据双亲委派机制,从下到上委托,如果父加载器加载不了,则还给子加载器加载。HiveClass,BarrierClass中不在BootStrap ClassLoader加载空间下的类,最终确认还给MutableURLClassLoader加载。同理SharedClass最终还给Application ClassLoader加载。
代码逻辑上看,形成了MutableURLClassLoader与Application ClassLoader两个独立的加载空间,他们私自才能加载的类是互相隔离的。即Application ClassLoader中的类是访问不了MutableURLClassLoader中的类的。
-
隔离情况下Spark程序类如何间接访问Hive类
Spark为了间接调用Hive类,利用的是HiveClient与HiveClientImpl 的 接口与实现 的关系,即应用程序持有HiveClient接口,HiveClient引用HiveClientImpl,而HiveClientImpl是可以访问到Hive类的。
下面我也通过自己的一个示例Demo验证了下Spark的这套逻辑
Demo结构很简单
Demo结构
A.class是程序入口类
IClient是一个接口
IClientImpl是接口的实现类
ShareClazz模拟一个程序类
HiProxy模拟一个依赖类
验证设计记录如下
- 隔离验证
使得ShareClazz由ApplicationClassLoader加载,ShareClazz中持有HiProxy, 分别用ApplicationClassLoader和自定义的加载器加载并创建HiProxy实例,并赋值给ShareClazz中的HiProxy。
代码如下
A.class
public class A {
public static void main(String[] args) {
ClassLoader baseClassLoder = Thread.currentThread().getContextClassLoader();
URL url = null;
try {
url = new URL(".class文件路径");
} catch (MalformedURLException e) {
e.printStackTrace();
}
//自定义一个加载器
ClassLoader customLoader = new URLClassLoader(new URL[]{url}, null){
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class clazz = findLoadedClass(name);
if (clazz==null) {
return doLoadClass(name, resolve);
}else{
return clazz;
}
}
protected Class doLoadClass(String name, boolean resolve){
if (name.contains("ClientImpl")||name.contains("HiProxy")) {
String classFileName = name.replaceAll("\\.", "/") + ".class";
try {
byte[] bytes = IOUtils.toByteArray(baseClassLoder.getResourceAsStream(classFileName));
return defineClass(name, bytes, 0, bytes.length);
} catch (IOException e) {
e.printStackTrace();
}
} else {
try {
return baseClassLoder.loadClass(name);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return null;
}
};
Class hiProxyClazz = null;
ShareClazz shareClazz = new ShareClazz();
System.out.println("shareClazz loaded by: "+shareClazz.getClass().getClassLoader());
try {
//情况1:自定义加载器加载HiProxy
hiProxyClazz = ((URLClassLoader) customLoader).loadClass("HiProxy");
System.out.println("hiProxy loaded by: "+hiProxyClazz.getClassLoader());
shareClazz.hiProxy = (HiProxy) hiProxyClazz.newInstance();
//情况2:ApplicationClassLoader加载HiProxy
// shareClazz.hiProxy = new HiProxy();
shareClazz.callHiProxy();
} catch (Exception e) {
e.printStackTrace();
}
}
}
ShareClazz
public class ShareClazz {
IClient iClient;
HiProxy hiProxy;
public void callSayHi(){
System.out.println(iClient.sayHi("wahaha"));
}
public void callHiProxy(){
System.out.println("hiProxy loaded by: "+hiProxy.getClass().getClassLoader());
System.out.println(hiProxy.sayHi("I am HiProxy..."));
}
}
HiProxy
public class HiProxy {
public String sayHi(String name){
return "Hi" + name;
}
}
情况1结果
自定义加载器加载HiProxy
通过打印可以清楚看到,ShareClazz由AppClassLoader加载,HiProxy由自定义加载器加载,此时HiProxy是不能赋值给ShareClazz内的HiProxy引用的,代码直接报错
情况2结果
AppClassLoader加载HiProxy
代码运行成功
- 隔离的类通过接口间接访问验证
使得ShareClazz被AppClassLoader加载,而且使其持有IClient引用并且赋值ClientImpl实例给这个引用。用自定义加载器加载ClientImpl, ClientImpl内部创建并调用HiProxy的方法。
代码如下
IClient
public interface IClient {
String sayHi(String name);
}
ClientImpl
public class ClientImpl implements IClient {
HiProxy hiProxy = new HiProxy();
@Override
public String sayHi(String name) {
System.out.println("hiProxy loaded by: "+hiProxy.getClass().getClassLoader());
return hiProxy.sayHi(name);
}
}
A.class
public class A {
public static void main(String[] args) {
ClassLoader baseClassLoder = Thread.currentThread().getContextClassLoader();
URL url = null;
try {
url = new URL(".class路径");
} catch (MalformedURLException e) {
e.printStackTrace();
}
ClassLoader customLoader = new URLClassLoader(new URL[]{url}, null){
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class clazz = findLoadedClass(name);
if (clazz==null) {
return doLoadClass(name, resolve);
}else{
return clazz;
}
}
protected Class doLoadClass(String name, boolean resolve){
if (name.contains("ClientImpl")||name.contains("HiProxy")) {
String classFileName = name.replaceAll("\\.", "/") + ".class";
try {
byte[] bytes = IOUtils.toByteArray(baseClassLoder.getResourceAsStream(classFileName));
return defineClass(name, bytes, 0, bytes.length);
} catch (IOException e) {
e.printStackTrace();
}
} else {
try {
return baseClassLoder.loadClass(name);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return null;
}
};
ShareClazz shareClazz = new ShareClazz();
System.out.println("shareClazz loaded by: "+shareClazz.getClass().getClassLoader());
try {
//实验代码
Class clientImplClazz = ((URLClassLoader) customLoader).loadClass("ClientImpl");
shareClazz.iClient= (IClient) clientImplClazz.newInstance();
System.out.println("clientImpl laoded by: "+clientImplClazz.getClassLoader());
shareClazz.callSayHi();
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果
ShareClazz通过接口间接访问不在同一加载器的HiProxy
打印结果可知,ShareClazz由AppClassLoader加载,HiProxy由自定义加载器加载,间接访问方式正常运行
收工!!!
网友评论