美文网首页
Spark HiveThriftServer2 高可用的实现

Spark HiveThriftServer2 高可用的实现

作者: 烂泥_119c | 来源:发表于2020-01-05 23:30 被阅读0次

Spark HiveThriftServer高可用的问题

spark HiveThriftServer 继承了HiveServer2,但是却没有继承HiveServer2的HA机制,现在我们通过修改源码的方式来实现HiveThriftServer的高可用,基本原理是在zk上注册多个服务的连接地址,与HiveServer2的使用方式相同

涉及类及源码修改

  • spark HiveThriftServer的入口类为HiveThriftServer2,该object有个main方法入口,我们看下这里的main方法做了什么
  def main(args: Array[String]) {
    // 解析命令行参数
    Utils.initDaemon(log)
    val optionsProcessor = new HiveServer2.ServerOptionsProcessor("HiveThriftServer2")
    optionsProcessor.parse(args)

    logInfo("Starting SparkContext")
    // 初始化环境
    SparkSQLEnv.init()

    ShutdownHookManager.addShutdownHook { () =>
      SparkSQLEnv.stop()
      uiTab.foreach(_.detach())
    }

    val executionHive = HiveUtils.newClientForExecution(
      SparkSQLEnv.sqlContext.sparkContext.conf,
      SparkSQLEnv.sqlContext.sessionState.newHadoopConf())

    try {
      /**
        * 实例化hiveThriftServer2
        */
      val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
      server.init(executionHive.conf)
      server.start()
      logInfo("HiveThriftServer2 started")
      listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
      SparkSQLEnv.sparkContext.addSparkListener(listener)
      uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
        Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
      } else {
        None
      }
      // If application was killed before HiveThriftServer2 start successfully then SparkSubmit
      // process can not exit, so check whether if SparkContext was stopped.
      if (SparkSQLEnv.sparkContext.stopped.get()) {
        logError("SparkContext has stopped even if HiveServer2 has started, so exit")
        System.exit(-1)
      }
    } catch {
      case e: Exception =>
        logError("Error starting HiveThriftServer2", e)
        System.exit(-1)
    }
  }
  • 我们看到这里new了一个HiveThriftServer2的对象,我们进这个对象看一下
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
  extends HiveServer2
  with ReflectedCompositeService {
  // state is tracked internally so that the server only attempts to shut down if it successfully
  // started, and then once only.
  private val started = new AtomicBoolean(false)

  // todo: 新加的
  var hiveConf:HiveConf = _
  /**
    * 初始化hiveThriftServer
    */
  override def init(hiveConf: HiveConf) {
    this.hiveConf = hiveConf
    val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
    setSuperField(this, "cliService", sparkSqlCliService)
    addService(sparkSqlCliService)

    val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
      new ThriftHttpCLIService(sparkSqlCliService)
    } else {
      new ThriftBinaryCLIService(sparkSqlCliService)
    }

    setSuperField(this, "thriftCLIService", thriftCliService)
    addService(thriftCliService)
    initCompositeService(hiveConf)
  }

  private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
    val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
    transportMode.toLowerCase(Locale.ROOT).equals("http")
  }

  override def start(): Unit = {
    super.start()
    started.set(true)
    /**
      * todo: 这里使用了HiveServer的高可用的配置
      */
    if (this.hiveConf.getBoolVar(
      ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
      /*
      * addServerInstanceToZooKeeper 把hiveServer2注册到zookeeper
      * */
      invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
        classOf[HiveConf] -> this.hiveConf)
    }

  }

  override def stop(): Unit = {
    /**
      * todo: 停止的时候,将zookeeper上的注册信息删除
      */
    if (started.getAndSet(false)) {
      if (this.hiveConf.getBoolVar(
        ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
        invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
      }
      super.stop()
    }
  }
 }
  • 这里的方法也很简单,一个初始化,一个start()方法,一个stop()方法,代码中todo的部分是我新加的代码,改动也很少
  • 新加hiveConf的成员变量,记录初始化时的配置,以便后面用
  • start方法: 在zookeeper中注册服务信息
  • stop方法: 在zookeeper中删除注册信息
  • 这里主要是应用了反射,在HiveServer中添加了注册和删除zk的信息,我们来到HiveServer2的代码中看一下如何修改
  /*#################################     新增方法      ############################################*/

  //获取thriftServer的IP:HOST
  private String getServerInstanceURI() throws Exception {
    if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
    }

    return getHiveHost() + ":"
        + thriftCLIService.getPortNumber();
  }

  private String getHiveHost() {
    HiveConf hiveConf = thriftCLIService.getHiveConf();
    String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
    if (hiveHost != null && !hiveHost.isEmpty()) {
      return hiveHost;
    } else {
      return thriftCLIService.getServerIPAddress().getHostName();
    }
  }
  
  /**
   * 控制是否需要重新在zookeeper上注册HiveServer2
   * */
  private boolean deregisteredWithZooKeeper = false;
  private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
    this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
  }

  /**
   * zk的监控者,如果发现注册信息为null,会触发监控,然后关掉当前注册hiveServer2的实例信息
   */
  private PersistentEphemeralNode znode;
  private class DeRegisterWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
      if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
        if (znode != null) {
          try {
            znode.close();
            LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
                + "The server will be shut down after the last client sesssion completes.");
          } catch (IOException e) {
            LOG.error("Failed to close the persistent ephemeral znode", e);
          } finally {
            HiveServer2.this.setDeregisteredWithZooKeeper(true);
            // 如果当前已经没有可用的服务,那么就把HiveServer2关闭掉
            if (cliService.getSessionManager().getOpenSessionCount() == 0) {
              LOG.warn("This instance of HiveServer2 has been removed from the list of server "
                  + "instances available for dynamic service discovery. "
                  + "The last client session has ended - will shutdown now.");
              HiveServer2.this.stop();
            }
          }
        }
      }
    }
  }

  private CuratorFramework zooKeeperClient;
  private String znodePath;
  /**
   * 把服务注册到zookeeper中
   * @param hiveConf
   * @throws Exception
   */
  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
    //从hiveConf中获取zookeeper地址
    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
    //从hive-site.xml中获取hive.server2.zookeeper.namespace的配置信息
    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
    //获取用户提供的thriftServer地址
    String instanceURI = getServerInstanceURI();
    
    //获取hive连接zookeeper的session超时时间
    int sessionTimeout =
        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
            TimeUnit.MILLISECONDS);
    //hive连接zookeeper的等待时间
    int baseSleepTime =
        (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
            TimeUnit.MILLISECONDS);
    //hive连接zookeeper的最大重试次数
    int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
    // 获取zookeeper客户端
    zooKeeperClient =
        CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
            .sessionTimeoutMs(sessionTimeout)
            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
    //启动zookeeper客户端
    zooKeeperClient.start();
    //TODO 在zookeeper上根据rootNamespace创建一个空间(用来存储数据的文件夹)
    try {
      zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
          .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
      LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
    } catch (KeeperException e) {
      if (e.code() != KeeperException.Code.NODEEXISTS) {
        LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
        throw e;
      }
    }
    //TODO 把hiveserver2的信息注册到rootNamespace下:
    // serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
    try {
      String pathPrefix =
          ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
              + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
              + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
      byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
      znode =
          new PersistentEphemeralNode(zooKeeperClient,
              PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
      znode.start();
      // We'll wait for 120s for node creation
      long znodeCreationTimeout = 120;
      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
        throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
      }
      setDeregisteredWithZooKeeper(false);
      znodePath = znode.getActualPath();
      // TODO 添加zk的watch , 如果服务不见了,需要第一时间watche到
      if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
        // No node exists, throw exception
        throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
      }
      LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
    } catch (Exception e) {
      LOG.fatal("Unable to create a znode for this server instance", e);
      if (znode != null) {
        znode.close();
      }
      throw (e);
    }
  }

  //移除znode,代表当前程序关闭
  private void removeServerInstanceFromZooKeeper() throws Exception {
    setDeregisteredWithZooKeeper(true);

    if (znode != null) {
      znode.close();
    }
    zooKeeperClient.close();
    LOG.info("Server instance removed from ZooKeeper.");
  }
  • 这里主要是对HiveServer2添加了两个方法,及HiveThriftServer2中通过反射调用的两个方法,分别是在启动的时候,在zk指定的地址上注册连接信息,以及停止的时候删除对应位置的连接信息。
  • 新建了一个zkWatcher监控
  • 注意这里zk存储的数据信息,必须是: serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005的形式,主要是为了复用hiveServer2的高可用

编译

mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package 
  • 得到的hive-thriftServer.jar包替换spark lib目录下的jar包即可

相关配置

  • 这里的高可用的配置与hiveServer2的高可用配置是一致的,在hive-site.xml文件中添加
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
</property>
<property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver2_zk</value>
</property>

<property
    <name>hive.zookeeper.quorum</name>
    <value>cdh1:2181,cdh2:2181,cdh3:2181</value>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
</property>
<property>
    <name>hive.server2.thrift.bind.host</name>
    <value>cdh1</value>
</property>
  • 另外,因为我们需要以spark的sbin/start-thriftserver.sh脚本启动进程,脚本中调用了spark-daemon.sh脚本实际启动任务,而这个脚本中的有关于重复应用的检查,即同一台机器上启动多个相同应用会报错,如果我们要在同一台机器上启动两个hiveThriftServer的话,我们需要对spark-daemon.sh做一些修改
# 屏蔽以下脚本###########################################
# if [ -f "$pid" ]; then
#   TARGET_ID="$(cat "$pid")"
#   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
#     echo "$command running as process $TARGET_ID.  Stop it first."
#     exit 1
#   fi
# fi
########################################################

启动

  • 启动命令
sbin/start-thriftserver.sh \
--master yarn \
--conf spark.driver.memory=1G \
--executor-memory 1G \
--num-executors 1 \
--hiveconf hive.server2.thrift.port=10002
  • beeline连接命令
!connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk

相关文章

网友评论

      本文标题:Spark HiveThriftServer2 高可用的实现

      本文链接:https://www.haomeiwen.com/subject/niqhactx.html