本文基于hive-1.2.2源码
Metastore模块在service目录下
入口文件
service/src/java/org/apache/hive/service/server/HiveServer2.java
流程分析
1.由main函数开始
public static void main(String[] args) {
...
ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
ServerOptionsProcessorResponse oprocResponse = oproc.parse(args);
...
oprocResponse.getServerOptionsExecutor().execute();
...
}
可见主逻辑是oprocResponse.getServerOptionsExecutor().execute(),而ServerOptionsExecutor的初始化在oproc.parse(args)中
ServerOptionsProcessorResponse parse(String[] argv) {
...
// Default executor, when no option is specified
return new ServerOptionsProcessorResponse(new StartOptionExecutor());
}
如果不指定参数,默认的executor实现是StartOptionExecutor
static class StartOptionExecutor implements ServerOptionsExecutor {
@Override
public void execute() {
try {
startHiveServer2();
} catch (Throwable t) {
LOG.fatal("Error starting HiveServer2", t);
System.exit(-1);
}
}
}
ServerOptionsExecutor 中执行了startHiveServer2方法
private static void startHiveServer2() throws Throwable {
...
server = new HiveServer2();
server.init(hiveConf);
server.start();
....
}
startHiveServer2中执行了HiveServer2的init方法和start方法
public synchronized void init(HiveConf hiveConf) {
cliService = new CLIService(this);
addService(cliService);
if (isHTTPTransportMode(hiveConf)) {
thriftCLIService = new ThriftHttpCLIService(cliService);
} else {
thriftCLIService = new ThriftBinaryCLIService(cliService);
}
addService(thriftCLIService);
...
}
...
public synchronized void start() {
super.start();
}
HiveServer2的init方法,初始化了2个service:CLIService和ThriftBinaryCLIService
其中CLIService负责处理请求,ThriftBinaryCLIService提供thrift RPC支持
而start方法调用父类CompositeService的start实现,分别调用各个Service的start
public synchronized void start() {
...
for (int n = serviceList.size(); i < n; i++) {
Service service = serviceList.get(i);
service.start();
}
...
}
所有我们的聚焦点就在CLIService上
这个类有一个init方法,如下
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
sessionManager = new SessionManager(hiveServer2);
addService(sessionManager);
// If the hadoop cluster is secure, do a kerberos login for the service from the keytab
if (UserGroupInformation.isSecurityEnabled()) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf);
this.serviceUGI = Utils.getUGI();
} catch (IOException e) {
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e);
} catch (LoginException e) {
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e);
}
// Also try creating a UGI object for the SPNego principal
String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL);
String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB);
if (principal.isEmpty() || keyTabFile.isEmpty()) {
LOG.info("SPNego httpUGI not created, spNegoPrincipal: " + principal +
", ketabFile: " + keyTabFile);
} else {
try {
this.httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf);
LOG.info("SPNego httpUGI successfully created.");
} catch (IOException e) {
LOG.warn("SPNego httpUGI creation failed: ", e);
}
}
}
// creates connection to HMS and thus *must* occur after kerberos login above
try {
applyAuthorizationConfigPolicy(hiveConf);
} catch (Exception e) {
throw new RuntimeException("Error applying authorization policy on hive configuration: "
+ e.getMessage(), e);
}
setupBlockedUdfs();
super.init(hiveConf);
}
还有一个start方法,如下
public synchronized void start() {
super.start();
// Initialize and test a connection to the metastore
IMetaStoreClient metastoreClient = null;
try {
metastoreClient = new HiveMetaStoreClient(hiveConf);
metastoreClient.getDatabases("default");
} catch (Exception e) {
throw new ServiceException("Unable to connect to MetaStore!", e);
}
finally {
if (metastoreClient != null) {
metastoreClient.close();
}
}
}
网友评论