美文网首页
Spark HiveThriftServer2 高可用的实现

Spark HiveThriftServer2 高可用的实现

作者: 烂泥_119c | 来源:发表于2020-01-05 23:30 被阅读0次

    Spark HiveThriftServer高可用的问题

    spark HiveThriftServer 继承了HiveServer2,但是却没有继承HiveServer2的HA机制,现在我们通过修改源码的方式来实现HiveThriftServer的高可用,基本原理是在zk上注册多个服务的连接地址,与HiveServer2的使用方式相同

    涉及类及源码修改

    • spark HiveThriftServer的入口类为HiveThriftServer2,该object有个main方法入口,我们看下这里的main方法做了什么
      def main(args: Array[String]) {
        // 解析命令行参数
        Utils.initDaemon(log)
        val optionsProcessor = new HiveServer2.ServerOptionsProcessor("HiveThriftServer2")
        optionsProcessor.parse(args)
    
        logInfo("Starting SparkContext")
        // 初始化环境
        SparkSQLEnv.init()
    
        ShutdownHookManager.addShutdownHook { () =>
          SparkSQLEnv.stop()
          uiTab.foreach(_.detach())
        }
    
        val executionHive = HiveUtils.newClientForExecution(
          SparkSQLEnv.sqlContext.sparkContext.conf,
          SparkSQLEnv.sqlContext.sessionState.newHadoopConf())
    
        try {
          /**
            * 实例化hiveThriftServer2
            */
          val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
          server.init(executionHive.conf)
          server.start()
          logInfo("HiveThriftServer2 started")
          listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
          SparkSQLEnv.sparkContext.addSparkListener(listener)
          uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
            Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
          } else {
            None
          }
          // If application was killed before HiveThriftServer2 start successfully then SparkSubmit
          // process can not exit, so check whether if SparkContext was stopped.
          if (SparkSQLEnv.sparkContext.stopped.get()) {
            logError("SparkContext has stopped even if HiveServer2 has started, so exit")
            System.exit(-1)
          }
        } catch {
          case e: Exception =>
            logError("Error starting HiveThriftServer2", e)
            System.exit(-1)
        }
      }
    
    • 我们看到这里new了一个HiveThriftServer2的对象,我们进这个对象看一下
    private[hive] class HiveThriftServer2(sqlContext: SQLContext)
      extends HiveServer2
      with ReflectedCompositeService {
      // state is tracked internally so that the server only attempts to shut down if it successfully
      // started, and then once only.
      private val started = new AtomicBoolean(false)
    
      // todo: 新加的
      var hiveConf:HiveConf = _
      /**
        * 初始化hiveThriftServer
        */
      override def init(hiveConf: HiveConf) {
        this.hiveConf = hiveConf
        val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
        setSuperField(this, "cliService", sparkSqlCliService)
        addService(sparkSqlCliService)
    
        val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
          new ThriftHttpCLIService(sparkSqlCliService)
        } else {
          new ThriftBinaryCLIService(sparkSqlCliService)
        }
    
        setSuperField(this, "thriftCLIService", thriftCliService)
        addService(thriftCliService)
        initCompositeService(hiveConf)
      }
    
      private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
        val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
        transportMode.toLowerCase(Locale.ROOT).equals("http")
      }
    
      override def start(): Unit = {
        super.start()
        started.set(true)
        /**
          * todo: 这里使用了HiveServer的高可用的配置
          */
        if (this.hiveConf.getBoolVar(
          ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
          /*
          * addServerInstanceToZooKeeper 把hiveServer2注册到zookeeper
          * */
          invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
            classOf[HiveConf] -> this.hiveConf)
        }
    
      }
    
      override def stop(): Unit = {
        /**
          * todo: 停止的时候,将zookeeper上的注册信息删除
          */
        if (started.getAndSet(false)) {
          if (this.hiveConf.getBoolVar(
            ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
            invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
          }
          super.stop()
        }
      }
     }
    
    • 这里的方法也很简单,一个初始化,一个start()方法,一个stop()方法,代码中todo的部分是我新加的代码,改动也很少
    • 新加hiveConf的成员变量,记录初始化时的配置,以便后面用
    • start方法: 在zookeeper中注册服务信息
    • stop方法: 在zookeeper中删除注册信息
    • 这里主要是应用了反射,在HiveServer中添加了注册和删除zk的信息,我们来到HiveServer2的代码中看一下如何修改
      /*#################################     新增方法      ############################################*/
    
      //获取thriftServer的IP:HOST
      private String getServerInstanceURI() throws Exception {
        if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
          throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
        }
    
        return getHiveHost() + ":"
            + thriftCLIService.getPortNumber();
      }
    
      private String getHiveHost() {
        HiveConf hiveConf = thriftCLIService.getHiveConf();
        String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
        if (hiveHost != null && !hiveHost.isEmpty()) {
          return hiveHost;
        } else {
          return thriftCLIService.getServerIPAddress().getHostName();
        }
      }
      
      /**
       * 控制是否需要重新在zookeeper上注册HiveServer2
       * */
      private boolean deregisteredWithZooKeeper = false;
      private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
        this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
      }
    
      /**
       * zk的监控者,如果发现注册信息为null,会触发监控,然后关掉当前注册hiveServer2的实例信息
       */
      private PersistentEphemeralNode znode;
      private class DeRegisterWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
          if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
            if (znode != null) {
              try {
                znode.close();
                LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
                    + "The server will be shut down after the last client sesssion completes.");
              } catch (IOException e) {
                LOG.error("Failed to close the persistent ephemeral znode", e);
              } finally {
                HiveServer2.this.setDeregisteredWithZooKeeper(true);
                // 如果当前已经没有可用的服务,那么就把HiveServer2关闭掉
                if (cliService.getSessionManager().getOpenSessionCount() == 0) {
                  LOG.warn("This instance of HiveServer2 has been removed from the list of server "
                      + "instances available for dynamic service discovery. "
                      + "The last client session has ended - will shutdown now.");
                  HiveServer2.this.stop();
                }
              }
            }
          }
        }
      }
    
      private CuratorFramework zooKeeperClient;
      private String znodePath;
      /**
       * 把服务注册到zookeeper中
       * @param hiveConf
       * @throws Exception
       */
      private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
        //从hiveConf中获取zookeeper地址
        String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
        //从hive-site.xml中获取hive.server2.zookeeper.namespace的配置信息
        String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
        //获取用户提供的thriftServer地址
        String instanceURI = getServerInstanceURI();
        
        //获取hive连接zookeeper的session超时时间
        int sessionTimeout =
            (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
                TimeUnit.MILLISECONDS);
        //hive连接zookeeper的等待时间
        int baseSleepTime =
            (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
                TimeUnit.MILLISECONDS);
        //hive连接zookeeper的最大重试次数
        int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
        // 获取zookeeper客户端
        zooKeeperClient =
            CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
        //启动zookeeper客户端
        zooKeeperClient.start();
        //TODO 在zookeeper上根据rootNamespace创建一个空间(用来存储数据的文件夹)
        try {
          zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
              .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
          LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
        } catch (KeeperException e) {
          if (e.code() != KeeperException.Code.NODEEXISTS) {
            LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
            throw e;
          }
        }
        //TODO 把hiveserver2的信息注册到rootNamespace下:
        // serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
        try {
          String pathPrefix =
              ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
                  + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
                  + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
          byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
          znode =
              new PersistentEphemeralNode(zooKeeperClient,
                  PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
          znode.start();
          // We'll wait for 120s for node creation
          long znodeCreationTimeout = 120;
          if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
            throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
          }
          setDeregisteredWithZooKeeper(false);
          znodePath = znode.getActualPath();
          // TODO 添加zk的watch , 如果服务不见了,需要第一时间watche到
          if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
            // No node exists, throw exception
            throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
          }
          LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
        } catch (Exception e) {
          LOG.fatal("Unable to create a znode for this server instance", e);
          if (znode != null) {
            znode.close();
          }
          throw (e);
        }
      }
    
      //移除znode,代表当前程序关闭
      private void removeServerInstanceFromZooKeeper() throws Exception {
        setDeregisteredWithZooKeeper(true);
    
        if (znode != null) {
          znode.close();
        }
        zooKeeperClient.close();
        LOG.info("Server instance removed from ZooKeeper.");
      }
    
    • 这里主要是对HiveServer2添加了两个方法,及HiveThriftServer2中通过反射调用的两个方法,分别是在启动的时候,在zk指定的地址上注册连接信息,以及停止的时候删除对应位置的连接信息。
    • 新建了一个zkWatcher监控
    • 注意这里zk存储的数据信息,必须是: serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005的形式,主要是为了复用hiveServer2的高可用

    编译

    mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package 
    
    • 得到的hive-thriftServer.jar包替换spark lib目录下的jar包即可

    相关配置

    • 这里的高可用的配置与hiveServer2的高可用配置是一致的,在hive-site.xml文件中添加
        <name>hive.server2.support.dynamic.service.discovery</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.server2.zookeeper.namespace</name>
        <value>hiveserver2_zk</value>
    </property>
    
    <property
        <name>hive.zookeeper.quorum</name>
        <value>cdh1:2181,cdh2:2181,cdh3:2181</value>
    </property>
    <property>
        <name>hive.zookeeper.client.port</name>
        <value>2181</value>
    </property>
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>cdh1</value>
    </property>
    
    • 另外,因为我们需要以spark的sbin/start-thriftserver.sh脚本启动进程,脚本中调用了spark-daemon.sh脚本实际启动任务,而这个脚本中的有关于重复应用的检查,即同一台机器上启动多个相同应用会报错,如果我们要在同一台机器上启动两个hiveThriftServer的话,我们需要对spark-daemon.sh做一些修改
    # 屏蔽以下脚本###########################################
    # if [ -f "$pid" ]; then
    #   TARGET_ID="$(cat "$pid")"
    #   if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
    #     echo "$command running as process $TARGET_ID.  Stop it first."
    #     exit 1
    #   fi
    # fi
    ########################################################
    

    启动

    • 启动命令
    sbin/start-thriftserver.sh \
    --master yarn \
    --conf spark.driver.memory=1G \
    --executor-memory 1G \
    --num-executors 1 \
    --hiveconf hive.server2.thrift.port=10002
    
    • beeline连接命令
    !connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk
    

    相关文章

      网友评论

          本文标题:Spark HiveThriftServer2 高可用的实现

          本文链接:https://www.haomeiwen.com/subject/niqhactx.html