美文网首页
Hive Metastore Thrift客户端

Hive Metastore Thrift客户端

作者: 此间少年仍犹在 | 来源:发表于2019-02-25 18:45 被阅读0次

     上篇文章介绍了Metastore Thrift服务端,接下来浅析一下Metastore Thrift客户端。

    1、IMetaStoreClient接口

     IMetaStoreClient接口定义了Metastore的thrift api,该接口中定义了操作元数据的各种方法,如下图所示。


    IMetaStoreClient
    2、HiveMetaStoreClient

    Hive中IMetaStoreClient的实现类是HiveMetaStoreClient
    Hive.getMSC() ➔ createMetaStoreClient()
      ↳ RetryingMetaStoreClient.getProxy()//动态代理类创建代理对象
      ↳RetryingMetaStoreClient.RetryingMetaStoreClient()
      ↳MetaStoreUtils.newInstance()//反射实例化对象
      ↳SessionHiveMetaStoreClient.SessionHiveMetaStoreClient()
      ↳HiveMetaStoreClient.HiveMetaStoreClient()
      ↳HiveMetaStoreClient.open()

    3、建立Thrift连接
      private void open() throws MetaException {
        isConnected = false;
        TTransportException tte = null;
        boolean useSSL = conf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
        boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
        boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
        boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
        int clientSocketTimeout = (int) conf.getTimeVar(
            ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
    
        for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
          for (URI store : metastoreUris) {
            LOG.info("Trying to connect to metastore with URI " + store);
    
            try {
              if (useSasl) {
                // Wrap thrift connection with SASL for secure connection.
                try {
                  HadoopThriftAuthBridge.Client authBridge =
                    ShimLoader.getHadoopThriftAuthBridge().createClient();
    
                  // check if we should use delegation tokens to authenticate
                  // the call below gets hold of the tokens if they are set up by hadoop
                  // this should happen on the map/reduce tasks if the client added the
                  // tokens into hadoop's credential store in the front end during job
                  // submission.
                  String tokenSig = conf.get("hive.metastore.token.signature");
                  // tokenSig could be null
                  tokenStrForm = Utils.getTokenStrForm(tokenSig);
                  transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout);
    
                  if(tokenStrForm != null) {
                    // authenticate using delegation tokens via the "DIGEST" mechanism
                    transport = authBridge.createClientTransport(null, store.getHost(),
                        "DIGEST", tokenStrForm, transport,
                            MetaStoreUtils.getMetaStoreSaslProperties(conf));
                  } else {
                    String principalConfig =
                        conf.getVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL);
                    transport = authBridge.createClientTransport(
                        principalConfig, store.getHost(), "KERBEROS", null,
                        transport, MetaStoreUtils.getMetaStoreSaslProperties(conf));
                  }
                } catch (IOException ioe) {
                  LOG.error("Couldn't create client transport", ioe);
                  throw new MetaException(ioe.toString());
                }
              } else {
                if (useSSL) {
                  ......
              }
    
              final TProtocol protocol;
              if (useCompactProtocol) {
                protocol = new TCompactProtocol(transport);
              } else {
                protocol = new TBinaryProtocol(transport);
              }
              client = new ThriftHiveMetastore.Client(protocol);
              try {
                if (!transport.isOpen()) {
                  transport.open();
                  LOG.info("Opened a connection to metastore, current connections: " + connCount.incrementAndGet());
                }
                isConnected = true;
              } catch (TTransportException e) {
                tte = e;
                if (LOG.isDebugEnabled()) {
                  LOG.warn("Failed to connect to the MetaStore Server...", e);
                } else {
                  // Don't print full exception trace if DEBUG is not on.
                  LOG.warn("Failed to connect to the MetaStore Server...");
                }
              }
    
              if (isConnected && !useSasl && conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)){
                ......
              }
            } catch (MetaException e) {
              LOG.error("Unable to connect to metastore with URI " + store
                        + " in attempt " + attempt, e);
            }
            if (isConnected) {
              break;
            }
          }
          // Wait before launching the next round of connection retries.
          if (!isConnected && retryDelaySeconds > 0) {
            try {
              LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
              Thread.sleep(retryDelaySeconds * 1000);
            } catch (InterruptedException ignore) {}
          }
        }
    
        if (!isConnected) {
          throw new MetaException("Could not connect to meta store using any of the URIs provided." +
            " Most recent failure: " + StringUtils.stringifyException(tte));
        }
    
        snapshotActiveConf();
    
        LOG.info("Connected to metastore.");
      }
    

    相关文章

      网友评论

          本文标题:Hive Metastore Thrift客户端

          本文链接:https://www.haomeiwen.com/subject/ixpgyqtx.html