美文网首页
Hive源码概读-HQL任务提交Yarn

Hive源码概读-HQL任务提交Yarn

作者: 嘻嘻是小猪 | 来源:发表于2021-03-25 17:50 被阅读0次

基于hive2.3.4, 这里以CLI方式提交Sql为例 :

  • 启动Driver
    沿着调用:
    org.apache.hadoop.hive.cli.CliDriver#main-->org.apache.hadoop.hive.cli.CliDriver#run-->org.apache.hadoop.hive.cli.CliDriver#executeDriver-->org.apache.hadoop.hive.cli.CliDriver#processLine-->org.apache.hadoop.hive.cli.CliDriver#processCmd
//这里Driver对象产生
public int processCmd(String cmd) {
......  
//对指令不同情况的判断 
 if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
      ss.close();
      System.exit(0);
    } else if (tokens[0].equalsIgnoreCase("source")) {
      ......
    } else if (cmd_trimmed.startsWith("!")) {
      ......
    }  else { // local mode
      try {
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);//这里得到Diver对象
        ret = processLocalCmd(cmd, proc, ss);//继续向下调用
      } catch (SQLException e) {
        console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }
    }
......
}
  • 解析&触发执行
    沿着上面的调用链:
    org.apache.hadoop.hive.cli.CliDriver#processLocalCmd-->org.apache.hadoop.hive.ql.Driver#run(java.lang.String, boolean)-->org.apache.hadoop.hive.ql.Driver#runInternal
    在这个方法里,sql得到解析和触发执行
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled){
  ......
      if (!alreadyCompiled) {
        // compile internal will automatically reset the perf logger
        ret = compileInternal(command, true);//这里是解析sql的入口
        // then we continue to use this perf logger
        perfLogger = SessionState.getPerfLogger();
        if (ret != 0) {
          return createProcessorResponse(ret);
        }
      } else {
       ......
      }
......

    if (requiresLock()) {
        // a checkpoint to see if the thread is interrupted or not before an expensive operation
        if (isInterrupted()) {
          ret = handleInterruption("at acquiring the lock.");
        } else {
          ret = acquireLocksAndOpenTxn(startTxnImplicitly);
        }
        if (ret != 0) {
          return rollback(createProcessorResponse(ret));
        }
      }
      ret = execute(true);//这里是执行入口
      if (ret != 0) {
        //if needRequireLock is false, the release here will do nothing because there is no lock
        return rollback(createProcessorResponse(ret));
      }
......
    
}
  • 提交任务
    org.apache.hadoop.hive.ql.Driver#execute(boolean)
    先看下这个方法, 这里很清晰,不停的从队列中取出task并launch
 public int execute(boolean deferClose) throws CommandNeedRetryException {
    ......
       // Loop while you either have tasks running, or tasks queued up
      while (driverCxt.isRunning()) {
        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs,driverCxt);//在这里launch
          if (!runner.isRunning()) {
            break;
          }
        }
        ......
      }
  ......
}

补充一句:这里的一个task,其实就是hive里面的一个Stage,也对应mapreduce程序里的一个job, 如下图:

MapredTask.png
launch.png

继续往下看 org.apache.hadoop.hive.ql.Driver#launchTask方法

    // Launch Task
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
      // Launch it in the parallel mode, as a separate thread only for MR tasks
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in parallel");
      }
      tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
      tskRun.start();
    } else {
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in serial mode");
      }
      tskRun.runSequential();
    }

这里分两种情况
条件1:如果设置了并行执行模式,即"hive.exec.parallel=true"
条件2:tsk.isMapRedTask()为true, 这里具体的判断标准,不同的org.apache.hadoop.hive.ql.exec.Task子类不同
同时满足条件1和条件2的task, 直接开启一个org.apache.hadoop.hive.ql.exec.TaskRunner线程来执行该task
否则,只能乖乖在当前线程执行

再进入TaskRunner看一下

public class TaskRunner extends Thread {
  protected Task<? extends Serializable> tsk;
  @Override
  public void run() {
    runner = Thread.currentThread();
    try {
      OperationLog.setCurrentOperationLog(operationLog);
      SessionState.start(ss);
      runSequential(); //这里还是调用了 runSequential()
    } finally {
    }
  }
  
//Launches a task, and sets its exit value in the result variable.
  public void runSequential() {
    int exitVal = -101;
    try {
      exitVal = tsk.executeTask();
    } catch (Throwable t) {
    }
    result.setExitVal(exitVal);
    if (tsk.getException() != null) {
      result.setTaskError(tsk.getException());
    }
  }
}

可以看到几点
1.它是一个线程类
2.run方法最终也是调用runSequential()
3.runSequential()中,真正和执行相关的,还是Task自己的executeTask(), TaskRunner只不过是一个容器而已

那继续往下追org.apache.hadoop.hive.ql.exec.Task#executeTask-->org.apache.hadoop.hive.ql.exec.Task#execute
这个execute方法,对于不同的Task子类,有不同的实现,这里以org.apache.hadoop.hive.ql.exec.mr.MapRedTask为例:

 @Override
  public int execute(DriverContext driverContext) {
    ......
    runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);

      if (!runningViaChild) {
        // since we are running the mapred task in the same jvm, we should update the job conf
        // in ExecDriver as well to have proper local properties.
        if (this.isLocalMode()) {
          // save the original job tracker
          ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job));
          // change it to local
          ShimLoader.getHadoopShims().setJobLauncherRpcAddress(job, "local");
        }
        // we are not running this mapred task via child jvm
        // so directly invoke ExecDriver
        int ret = super.execute(driverContext);//执行父类方法

        // restore the previous properties for framework name, RM address etc.
        if (this.isLocalMode()) {
          // restore the local job tracker back to original
          ctx.restoreOriginalTracker();
        }
    ......
  }

这里会先对hive.exec.submitviachild配置有个判断,意思为map/reduce Job 是否应该使用各自独立的 JVM 进行提交(Child进程),默认情况下,使用与 HQL compiler 相同的 JVM 进行提交,即false.
最后这个方法调回直接父类的execute方法,即org.apache.hadoop.hive.ql.exec.mr.ExecDriver#execute

继续看ExecDriver#execute
首先注意一下ExecDriver构造方法,可见这就是MapReduce Job的入口了

public ExecDriver() {
    super();
    console = new LogHelper(LOG);
    job = new JobConf(ExecDriver.class); //这里指定MapReduce入口类
    this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
  }

再关注方法本身

 @Override
  public int execute(DriverContext driverContext) {
  ......
    job.setOutputFormat(HiveOutputFormatImpl.class);
    job.setMapperClass(ExecMapper.class); //这里指定Mapper Class
    job.setMapOutputKeyClass(HiveKey.class);
    job.setMapOutputValueClass(BytesWritable.class);
     job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
    job.setReducerClass(ExecReducer.class);//这里指定Reducer Class
    ......
     rj = jc.submitJob(job);
  ......
  }

看到很多熟悉的向yarn提交MapReduce-Job的代码了. 这里最终会走到org.apache.hadoop.mapred.JobClient#submitJob(org.apache.hadoop.mapred.JobConf), 任务得以提交!


收工!!!

相关文章

网友评论

      本文标题:Hive源码概读-HQL任务提交Yarn

      本文链接:https://www.haomeiwen.com/subject/qkjecltx.html