美文网首页我爱编程
Dr.Elephant源码分析--MapReduce

Dr.Elephant源码分析--MapReduce

作者: 炮灰向前冲啦 | 来源:发表于2018-06-21 19:39 被阅读0次

    Job日志

    mapred-site.xml配置任务执行日志路径(hdfs)

    <property>
        <name>yarn.app.mapreduce.am.staging-dir</name>
        <value>/data/yarn/stage</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>/data/yarn/intermediate_done</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.done-dir</name>
        <value>/data/yarn/done</value>
    </property>
    
    1. 作业启动时,hadoop会将作业信息放在${yarn.app.mapreduce.am.staging-dir}/${user}/.staging/${job_id}目录
    staging
    1. 作业完成后,作业数据会被移到${mapreduce.jobhistory.intermediate-done-dir}/${user}目录
    intermediate_done
    1. intermediate-done-dir只是临时中转站,hadoop会定时将此目录的数据移到done地址: ${mapreduce.jobhistory.done-dir}/${year}/${month}/${day}/${serialPart}
    done

    .jhist保存job的执行信息,对应JobInfo类;conf是job的配置信息。只针对MapReduce类型任务

    org.apache.hadoop.mapreduce.v2.hs.JobHistory,默认每3分钟move intermediate to done

      protected void serviceStart() throws Exception {
        hsManager.start();
        if (storage instanceof Service) {
          ((Service) storage).start();
        }
    
        scheduledExecutor = new ScheduledThreadPoolExecutor(2,
            new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
                .build());
    
        // moveThreadInterval = conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
        // "mapreduce.jobhistory.move.interval-ms": 3*60*1000L
        scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
            moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
    
        // Start historyCleaner
        scheduleHistoryCleaner();
        super.serviceStart();
      }
    

    Global

    Global是入口,继承play框架的GlobalSettings类,并重写onStart()、onStop()方法。也就是调用DrElephant对象的start()、kill()方法

    public class Global extends GlobalSettings {
    
      DrElephant _drElephant;
    
      public void onStart(Application app) {
        Logger.info("Starting Application...");
    
        fixJavaKerberos();
    
        try {
          _drElephant = new DrElephant();
          // DrElephant是Thread子类
          _drElephant.start();
        } catch (IOException e) {
          Logger.error("Application start failed...", e);
        }
      }
    
      public void onStop(Application app) {
        Logger.info("Stopping application...");
        if (_drElephant != null) {
          _drElephant.kill();
        }
      }
    
      ... // fixJavaKerberos
    }
    

    DrElephant

    DrElephant类继承Thread线程类,重写run()方法。主要用来加载app-conf目录下的配置文件,并调用ElephantRunner

    public class DrElephant extends Thread {
      public static final String AUTO_TUNING_ENABLED = "autotuning.enabled";
      private static final Logger logger = Logger.getLogger(DrElephant.class);
    
      private ElephantRunner _elephant;
      private AutoTuner _autoTuner;
      private Thread _autoTunerThread;
    
      private Boolean autoTuningEnabled;
    
      public DrElephant() throws IOException {
        // FileSystem.get(new Configuration()).getDefaultBlockSize(new Path("/"));
        // getConf().getLong("fs.local.block.size", 32 * 1024 * 1024)
        HDFSContext.load();
        // instance()方法调用loadConfiguration(),加载配置
        // 返回AutoTuningConf.xml配置信息
        Configuration configuration = ElephantContext.instance().getAutoTuningConf();
        autoTuningEnabled = configuration.getBoolean(AUTO_TUNING_ENABLED, false);
        logger.debug("Auto Tuning Configuration: " + configuration.toString());
        // 新建ElephantRunner对象,implements Runnable
        _elephant = new ElephantRunner();
        if (autoTuningEnabled) {
          // job优化器
          _autoTuner = new AutoTuner();
          _autoTunerThread = new Thread(_autoTuner, "Auto Tuner Thread");
        }
      }
    
      // 分别执行ElephantRunner类、AutoTuner类的run()方法
      @Override
      public void run() {
        if (_autoTunerThread != null) {
          logger.debug("Starting auto tuner thread ");
          _autoTunerThread.start();
        }
        _elephant.run();
      }
    
      public void kill() {
        if (_elephant != null) {
          // 执行ElephantRunner对象的kill()方法: _threadPoolExecutor.shutdownNow()
          _elephant.kill();
        }
        if (_autoTunerThread != null) {
          // 终止线程的写法
          // thread.interrupt() + while (!Thread.currentThread().isInterrupted()){...}
          _autoTunerThread.interrupt();
        }
      }
    }
    

    ElephantContext

    DrElephant对象的构造函数执行ElephantContext.instance()时,会加载系统各组件配置文件。对应configurations目录下的类

    private void loadConfiguration() {
      // AggregatorConf.xml
      loadAggregators();
      // FetcherConf.xml
      loadFetchers();
      // HeuristicConf.xml
      loadHeuristics();
      // JobTypeConf.xml
      loadJobTypes();
      // GeneralConf.xml
      loadGeneralConf();
      // AutoTuningConf.xml
      loadAutoTuningConf();
    
      // It is important to configure supported types in the LAST step so that we could have information from all
      // configurable components.
      /**
       * Decides what application types can be supported.
       *
       * An application type is supported if all the below are true.
       * 1. A Fetcher is defined in FetcherConf.xml for the application type.
       * 2. At least one Heuristic is configured in HeuristicConf.xml for the application type.
       * 3. At least one job type is configured in JobTypeConf.xml for the application type.
       */
       // 取_typeToFetcher、_typeToHeuristics、_appTypeToJobTypes、_typeToAggregator中ApplicationType的交集,同时将这4个Map的key retainAll,也就是只保留交集ApplicationType
      configureSupportedApplicationTypes();
    }
    

    解析xml文件,再执行XXXConfiguration对象的parseXXXConfiguration()方法,将xml数据映射成XXXConfigurationData对象,并存入ElephantContext对象的Map中

    InputStream instream = ClassLoader.getSystemClassLoader().getResourceAsStream(filePath);
    
    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    DocumentBuilder builder = factory.newDocumentBuilder();
    Document document = builder.parse(instream);
    

    ElephantRunner

    此类是系统的核心类,包含AnalyticJobGeneratorHadoop2.fetchAnalyticJobs()获取yarn jobs、分析每个job的AnalyticJob.getAnalysis()、保存分析数据AppResult.save()、Metrics统计MetricsController.init()、获取RM的Active URL:AnalyticJobGeneratorHadoop2.updateResourceManagerAddresses()

    public class ElephantRunner implements Runnable {
      private static final Logger logger = Logger.getLogger(ElephantRunner.class);
    
      private static final long FETCH_INTERVAL = 60 * 1000;     // Interval between fetches
      private static final long RETRY_INTERVAL = 60 * 1000;     // Interval between retries
      private static final int EXECUTOR_NUM = 5;                // The number of executor threads to analyse the jobs
    
      private static final String FETCH_INTERVAL_KEY = "drelephant.analysis.fetch.interval";
      private static final String RETRY_INTERVAL_KEY = "drelephant.analysis.retry.interval";
      private static final String EXECUTOR_NUM_KEY = "drelephant.analysis.thread.count";
    
      private AtomicBoolean _running = new AtomicBoolean(true);
      private long lastRun;
      private long _fetchInterval;
      private long _retryInterval;
      private int _executorNum;
      private HadoopSecurity _hadoopSecurity;
      private ThreadPoolExecutor _threadPoolExecutor;
      private AnalyticJobGenerator _analyticJobGenerator;
    
      private void loadGeneralConfiguration() {
        Configuration configuration = ElephantContext.instance().getGeneralConf();
        // "drelephant.analysis.thread.count" : 5
        _executorNum = Utils.getNonNegativeInt(configuration, EXECUTOR_NUM_KEY, EXECUTOR_NUM);
        // "drelephant.analysis.fetch.interval" : 60*1000
        _fetchInterval = Utils.getNonNegativeLong(configuration, FETCH_INTERVAL_KEY, FETCH_INTERVAL);
        // "drelephant.analysis.retry.interval" : 60*1000
        _retryInterval = Utils.getNonNegativeLong(configuration, RETRY_INTERVAL_KEY, RETRY_INTERVAL);
      }
    
      private void loadAnalyticJobGenerator() {
        // conf.get("mapreduce.framework.name").equals("yarn")
        if (HadoopSystemContext.isHadoop2Env()) {
          // 构造AnalyticJobGeneratorHadoop2对象获取待分析jobs
          _analyticJobGenerator = new AnalyticJobGeneratorHadoop2();
        } else {
          throw new RuntimeException("Unsupported Hadoop major version detected. It is not 2.x.");
        }
    
        try {
        // 设置_fetchStartTime、_lastTime,也就是获取job的startTime
        // 获取Active状态的_resourceManagerAddress,因为RM是HA的
        _analyticJobGenerator.configure(ElephantContext.instance().getGeneralConf());
        } catch (Exception e) {
          logger.error("Error occurred when configuring the analysis provider.", e);
          throw new RuntimeException(e);
        }
      }
    
      @Override
      public void run() {
        logger.info("Dr.elephant has started");
        try {
          _hadoopSecurity = HadoopSecurity.getInstance();
          // 特权访问,绕过HDFS的权限验证
          _hadoopSecurity.doAs(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
              HDFSContext.load();
              loadGeneralConfiguration();
              loadAnalyticJobGenerator();
              // 多余!在前面的DrElephant类构造函数里已经执行过ElephantContext.instance()
              ElephantContext.init();
    
              // Initialize the metrics registries.
              // 注册job、queue、gc、memory、healthcheck等统计信息
              // CustomGarbageCollectorMetricSet类对比GarbageCollectorMetricSet类,添加jvmUptime、gc2UptimeRatio信息
              // MemoryUsageGaugeSet类直接使用metrics-jvm组件
              // ThreadDeadlockHealthCheck通过ThreadDeadlockDetector获取死锁线程数,判定HealthCheck
              MetricsController.init();
    
              logger.info("executor num is " + _executorNum);
              if (_executorNum < 1) {
                throw new RuntimeException("Must have at least 1 worker thread.");
              }
              
              // 构建ThreadFactory,以及定长线程池
              ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("dr-el-executor-thread-%d").build();
              _threadPoolExecutor = new ThreadPoolExecutor(_executorNum, _executorNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory);
    
              // 循环处理整个流程!running标识以及线程中断检查
              // 使用ScheduledExecutorService.scheduleAtFixedRate()更好些
              while (_running.get() && !Thread.currentThread().isInterrupted()) {
                // 每次都更新RM的URL
                _analyticJobGenerator.updateResourceManagerAddresses();
                lastRun = System.currentTimeMillis();
    
                logger.info("Fetching analytic job list...");
                try {
                  // 好吧,每次while都要check。HadoopSecurity.getInstance()已经做了check
                  _hadoopSecurity.checkLogin();
                } catch (IOException e) {
                  logger.info("Error with hadoop kerberos login", e);
                  //Wait for a while before retry
                  // sleep 60s
                  waitInterval(_retryInterval);
                  continue;
                }
    
                List<AnalyticJob> todos;
                try {
                  // 从RM中拉取成功或失败的jobs
                  // http://rmhost/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s
                  // http://rmhost/ws/v1/cluster/apps?finalStatus=FAILED&finishedTimeBegin=%s&finishedTimeEnd=%s
                  todos = _analyticJobGenerator.fetchAnalyticJobs();
                } catch (Exception e) {
                  logger.error("Error fetching job list. Try again later...", e);
                  // Wait for a while before retry
                  // IO异常时
                  waitInterval(_retryInterval);
                  continue;
                }
    
                for (AnalyticJob analyticJob : todos) {
                  // 多线程并发执行ExecutorJob的run()方法
                  // 获取job的Counters统计信息,启发式算法生成数据入库
                  _threadPoolExecutor.submit(new ExecutorJob(analyticJob));
                }
    
                int queueSize = _threadPoolExecutor.getQueue().size();
                MetricsController.setQueueSize(queueSize);
                logger.info("Job queue size is " + queueSize);
    
                // Wait for a while before next fetch
                waitInterval(_fetchInterval);
              }
              logger.info("Main thread is terminated.");
              return null;
            }
          });
        } catch (Exception e) {
          logger.error(e.getMessage());
          // Throwable转成String方式
          logger.error(ExceptionUtils.getStackTrace(e));
        }
      }
    
      private class ExecutorJob implements Runnable {
    
        private AnalyticJob _analyticJob;
    
        ExecutorJob(AnalyticJob analyticJob) {
          _analyticJob = analyticJob;
        }
    
        @Override
        public void run() {
          try {
            String analysisName = String.format("%s %s", _analyticJob.getAppType().getName(), _analyticJob.getAppId());
            long analysisStartTimeMillis = System.currentTimeMillis();
            logger.info(String.format("Analyzing %s", analysisName));
            
            // 启发式分析job
            AppResult result = _analyticJob.getAnalysis();
            // 分析数据保存入库
            result.save();
            long processingTime = System.currentTimeMillis() - analysisStartTimeMillis;
            logger.info(String.format("Analysis of %s took %sms", analysisName, processingTime));
            MetricsController.setJobProcessingTime(processingTime);
            MetricsController.markProcessedJobs();
          } catch (InterruptedException e) {
            logger.info("Thread interrupted");
            logger.info(e.getMessage());
            logger.info(ExceptionUtils.getStackTrace(e));
    
            Thread.currentThread().interrupt();
          } catch (TimeoutException e){
            logger.warn("Timed out while fetching data. Exception message is: " + e.getMessage());
            jobFate();
          }catch (Exception e) {
            logger.error(e.getMessage());
            logger.error(ExceptionUtils.getStackTrace(e));
            jobFate();
          }
        }
    
        // 将失败的job先放到first queue,retry 3次;再放入second queue,retry 5次。8次重试还不成功_skippedJobs.mark();
        private void jobFate () {
          // job retry limit 3
          if (_analyticJob != null && _analyticJob.retry()) {
            logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
            _analyticJobGenerator.addIntoRetries(_analyticJob);
          } else if (_analyticJob != null && _analyticJob.isSecondPhaseRetry()) {
            //Putting the job into a second retry queue which fetches jobs after some interval. Some spark jobs may need more time than usual to process, hence the queue.
            logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
            _analyticJobGenerator.addIntoSecondRetryQueue(_analyticJob);
          } else {
            if (_analyticJob != null) {
              MetricsController.markSkippedJob();
              logger.error("Drop the analytic job. Reason: reached the max retries for application id = [" + _analyticJob.getAppId() + "].");
            }
          }
        }
      }
    
      // 这里sleep不是固定时长:scheduleAtFixedRate
      private void waitInterval(long interval) {
        // Wait for long enough
        long nextRun = lastRun + interval;
        long waitTime = nextRun - System.currentTimeMillis();
    
        if (waitTime <= 0) {
          return;
        }
    
        try {
          Thread.sleep(waitTime);
        } catch (InterruptedException e) {
          // 必须要重新设置中断标记,不能丢失
          Thread.currentThread().interrupt();
        }
      }
    
      public void kill() {
        _running.set(false);
        if (_threadPoolExecutor != null) {
          _threadPoolExecutor.shutdownNow();
        }
      }
    }
    

    AnalyticJobGeneratorHadoop2

    该类负责:更新Active RM的URL: updateResourceManagerAddresses()、读取succeeded、failed任务: readApps()、fetchAnalyticJobs()

    返回的AnalyticJob对象包含: appId、type、user、name、queueName、trackingUrl、startTime、finishTime字段

    public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
      private static final Logger logger = Logger.getLogger(AnalyticJobGeneratorHadoop2.class);
      private static final String RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address";
      private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
      private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
      private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
      private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";
    
      private static Configuration configuration;
    
      // We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS
      private static final long FETCH_DELAY = 60000;
    
      // Generate a token update interval with a random deviation so that it does not update the token exactly at the same
      // time with other token updaters (e.g. ElephantFetchers).
      private static final long TOKEN_UPDATE_INTERVAL =
          Statistics.MINUTE_IN_MS * 30 + new Random().nextLong() % (3 * Statistics.MINUTE_IN_MS);
    
      private String _resourceManagerAddress;
      private long _lastTime = 0;
      private long _fetchStartTime = 0;
      private long _currentTime = 0;
      private long _tokenUpdatedTime = 0;
      private AuthenticatedURL.Token _token;
      private AuthenticatedURL _authenticatedURL;
      private final ObjectMapper _objectMapper = new ObjectMapper();
    
      private final Queue<AnalyticJob> _firstRetryQueue = new ConcurrentLinkedQueue<AnalyticJob>();
      
      // bug!应该使用ConcurrentLinkedQueue代替
      private final ArrayList<AnalyticJob> _secondRetryQueue = new ArrayList<AnalyticJob>();
    
      // 获取ACTIVE状态的RM HOST
      public void updateResourceManagerAddresses() {
        // "yarn.resourcemanager.ha.enabled"
        if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) {
          // "yarn.resourcemanager.ha.rm-ids"
          String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS);
          if (resourceManagers != null) {
            logger.info("The list of RM IDs are " + resourceManagers);
            List<String> ids = Arrays.asList(resourceManagers.split(","));
            _currentTime = System.currentTimeMillis();
            updateAuthToken();
            for (String id : ids) {
              try {
                // RM_HOST: "yarn.resourcemanager.webapp.address.xxxx"
                String resourceManager = configuration.get(RESOURCE_MANAGER_ADDRESS + "." + id);
                // "http://${RM_HOST}/ws/v1/cluster/info"
                String resourceManagerURL = String.format(RM_NODE_STATE_URL, resourceManager);
                logger.info("Checking RM URL: " + resourceManagerURL);
                JsonNode rootNode = readJsonNode(new URL(resourceManagerURL));
                String status = 
                // 读取clusterInfo下haState字段值
                rootNode.path("clusterInfo").path("haState").getValueAsText();
                if (status.equals("ACTIVE")) {
                  // 当haState是ACTIVE时,赋值RM_HOST
                  logger.info(resourceManager + " is ACTIVE");
                  _resourceManagerAddress = resourceManager;
                  break;
                } else {
                  logger.info(resourceManager + " is STANDBY");
                }
              } catch (AuthenticationException e) {
                logger.info("Error fetching resource manager " + id + " state " + e.getMessage());
              } catch (IOException e) {
                logger.info("Error fetching Json for resource manager "+ id + " status " + e.getMessage());
              }
            }
          }
        } else {
          // 当RM不是HA部署时,直接读取"yarn.resourcemanager.webapp.address"配置项
          _resourceManagerAddress = configuration.get(RESOURCE_MANAGER_ADDRESS);
        }
        if (_resourceManagerAddress == null) {
          throw new RuntimeException(
                  "Cannot get YARN resource manager address from Hadoop Configuration property: [" + RESOURCE_MANAGER_ADDRESS + "].");
        }
      }
    
      @Override
      public void configure(Configuration configuration)
          throws IOException {
        this.configuration = configuration;
        String initialFetchWindowString = configuration.get(FETCH_INITIAL_WINDOW_MS);
        if (initialFetchWindowString != null) {
          long initialFetchWindow = Long.parseLong(initialFetchWindowString);
          _lastTime = System.currentTimeMillis() - FETCH_DELAY - initialFetchWindow;
          _fetchStartTime = _lastTime;
        }
        updateResourceManagerAddresses();
      }
    
      /**
       *  Fetch all the succeeded and failed applications/analytic jobs from the resource manager.
       *
       * @return
       * @throws IOException
       * @throws AuthenticationException
       */
      @Override
      public List<AnalyticJob> fetchAnalyticJobs()
          throws IOException, AuthenticationException {
        List<AnalyticJob> appList = new ArrayList<AnalyticJob>();
    
        // There is a lag of job data from AM/NM to JobHistoryServer HDFS, we shouldn't use the current time, since there
        // might be new jobs arriving after we fetch jobs. We provide one minute delay to address this lag.
        _currentTime = System.currentTimeMillis() - FETCH_DELAY;
        updateAuthToken();
    
        logger.info("Fetching recent finished application runs between last time: " + (_lastTime + 1) + ", and current time: " + _currentTime);
    
        // Fetch all succeeded apps
        // "http://${RM_HOST}/ws/v1/cluster/apps?finalStatus=SUCCEEDED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s"
        URL succeededAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format("/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s",String.valueOf(_lastTime + 1), String.valueOf(_currentTime)));
        logger.info("The succeeded apps URL is " + succeededAppsURL);
        // 解析JSON
        List<AnalyticJob> succeededApps = readApps(succeededAppsURL);
        appList.addAll(succeededApps);
    
        // Fetch all failed apps
        // state: Application Master State
        // finalStatus: Status of the Application as reported by the Application Master
        // "http://${RM_HOST}/ws/v1/cluster/apps?finalStatus=FAILED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s"
        URL failedAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format("/ws/v1/cluster/apps?finalStatus=FAILED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s",
            String.valueOf(_lastTime + 1), String.valueOf(_currentTime)));
        List<AnalyticJob> failedApps = readApps(failedAppsURL);
        logger.info("The failed apps URL is " + failedAppsURL);
        appList.addAll(failedApps);
    
        // Append promises from the retry queue at the end of the list
        // first队列的job是每分钟重试一次,共重试3次
        while (!_firstRetryQueue.isEmpty()) {
          appList.add(_firstRetryQueue.poll());
        }
    
        Iterator iteratorSecondRetry = _secondRetryQueue.iterator();
        while (iteratorSecondRetry.hasNext()) {
          AnalyticJob job = (AnalyticJob) iteratorSecondRetry.next();
          // 每60s进行fetchAnalyticJobs,导致_timeLeftToRetry--
          // 而job每重试一次_secondRetries++,导致this._timeLeftToRetry = (this._secondRetries) * 5; 所以失败job每次重试的时间间隔是5 10 15 20 25分钟,共重试5次
          // 失败job尝试的时间间隔总为1 1 1 5 10 15 20 25。所以可以只维护一个retryQueue,然后job有个Queue列表保存执行时间点: 1 2 3 8 18 33 53 78,每次fetchAnalyticJobs时,++retry,若值在执行时间点Queue.peek中,则把job放入appList,同时Queue.poll,思路更清晰
          // 算法的时间间隔不太科学,可以参考Curator的BoundedExponentialBackoffRetry算法
          if(job.readyForSecondRetry()) {
            appList.add(job);
            iteratorSecondRetry.remove();
          }
        }
    
        _lastTime = _currentTime;
        return appList;
      }
    
      @Override
      public void addIntoRetries(AnalyticJob promise) {
        _firstRetryQueue.add(promise);
        int retryQueueSize = _firstRetryQueue.size();
        MetricsController.setRetryQueueSize(retryQueueSize);
        logger.info("Retry queue size is " + retryQueueSize);
      }
    
      @Override
      public void addIntoSecondRetryQueue(AnalyticJob promise) {
        // 这里会重置secondretry: this._secondRetries * 5
        _secondRetryQueue.add(promise.setTimeToSecondRetry());
        int secondRetryQueueSize = _secondRetryQueue.size();
        MetricsController.setSecondRetryQueueSize(secondRetryQueueSize);
        logger.info("Second Retry queue size is " + secondRetryQueueSize);
      }
    
      /**
       * Authenticate and update the token
       */
       // 项目中没用到: _token、_authenticatedURL
      private void updateAuthToken() {
        if (_currentTime - _tokenUpdatedTime > TOKEN_UPDATE_INTERVAL) {
          logger.info("AnalysisProvider updating its Authenticate Token...");
          _token = new AuthenticatedURL.Token();
          _authenticatedURL = new AuthenticatedURL();
          _tokenUpdatedTime = _currentTime;
        }
      }
    
      /**
       * Connect to url using token and return the JsonNode
       *
       * @param url The url to connect to
       * @return
       * @throws IOException Unable to get the stream
       * @throws AuthenticationException Authencation problem
       */
       // 根据URL,读取Stream,并解析成JsonNode对象
      private JsonNode readJsonNode(URL url)
          throws IOException, AuthenticationException {
        return _objectMapper.readTree(url.openStream());
      }
    
      /**
       * Parse the returned json from Resource manager
       *
       * @param url The REST call
       * @return
       * @throws IOException
       * @throws AuthenticationException Problem authenticating to resource manager
       */
      private List<AnalyticJob> readApps(URL url) throws IOException, AuthenticationException{
        List<AnalyticJob> appList = new ArrayList<AnalyticJob>();
    
        JsonNode rootNode = readJsonNode(url);
        // <apps>
        //      <app>
        //          <id>xxx</id>
        //          <user>xxx</user>
        //          <queue>xxx</queue>
        //      </app>
        // </apps>
        JsonNode apps = rootNode.path("apps").path("app");
    
        for (JsonNode app : apps) {
          String appId = app.get("id").getValueAsText();
    
          // When called first time after launch, hit the DB and avoid duplicated analytic jobs that have been analyzed
          // before.
          // 先判定时间,初始时需要检查数据库,保证job不重复获取
          if (_lastTime > _fetchStartTime || (_lastTime == _fetchStartTime && AppResult.find.byId(appId) == null)) {
            String user = app.get("user").getValueAsText();
            String name = app.get("name").getValueAsText();
            String queueName = app.get("queue").getValueAsText();
            String trackingUrl = app.get("trackingUrl") != null? app.get("trackingUrl").getValueAsText() : null;
            long startTime = app.get("startedTime").getLongValue();
            long finishTime = app.get("finishedTime").getLongValue();
    
            ApplicationType type =
    ElephantContext.instance().getApplicationTypeForName(app.get("applicationType").getValueAsText());
    
            // If the application type is supported
            if (type != null) {
              AnalyticJob analyticJob = new AnalyticJob();
    
    // 获取的字段信息。特别的,type字段是根据RM URL获取的,而不是XXXConf.xml随便写的
    analyticJob.setAppId(appId).setAppType(type).setUser(user).setName(name).setQueueName(queueName).setTrackingUrl(trackingUrl).setStartTime(startTime).setFinishTime(finishTime);
    
              appList.add(analyticJob);
            }
          }
        }
        return appList;
      }
    }
    

    AnalyticJob

    AnalyticJob类主要负责: 获取job的配置以及执行过程统计ElephantFetcher.fetchData()、根据统计信息进行启发式算法运算Heuristic.apply(data)

    根据FetcherConf.xml配置,MapReduce类型的任务通过MapReduceFSFetcherHadoop2类获取统计信息;Spark类型通过SparkFetcher类获取

    /**
     * This class wraps some basic meta data of a completed application run (notice that the information is generally the
     * same regardless of hadoop versions and application types), and then promises to return the analyzed result later.
     */
    public class AnalyticJob {
      /**
       * Returns the analysed AppResult that could be directly serialized into DB.
       *
       * This method fetches the data using the appropriate application fetcher, runs all the heuristics on them and
       * loads it into the AppResult model.
       *
       * @throws Exception if the analysis process encountered a problem.
       * @return the analysed AppResult
       */
      public AppResult getAnalysis() throws Exception {
        ElephantFetcher fetcher = ElephantContext.instance().getFetcherForApplicationType(getAppType());
        // 根据FetcherConf.xml获取: "tez"->"TezFetcher", "mapreduce"->"MapReduceFSFetcherHadoop2", "spark"->"SparkFetcher"
        // tez、mapreduce、spark对应AnalyticJob里的_type
        // 封装job的conf配置、map、reduce信息、Counters统计、Time等
        HadoopApplicationData data = fetcher.fetchData(this);
    
        // JobTypeConf.xml,以applicationtype为key,value是List<JobType>,JobType封装name、conf、value字段
        // 返回rm job的配置信息key跟jobType的conf字段匹配的jobType
        JobType jobType = ElephantContext.instance().matchJobType(data);
        String jobTypeName = jobType == null ? UNKNOWN_JOB_TYPE : jobType.getName();
    
        // Run all heuristics over the fetched data
        List<HeuristicResult> analysisResults = new ArrayList<HeuristicResult>();
        if (data == null || data.isEmpty()) {
          // Example: a MR job has 0 mappers and 0 reducers
          logger.info("No Data Received for analytic job: " + getAppId());
          analysisResults.add(HeuristicResult.NO_DATA);
        } else {
          // 获取HeuristicConf.xml里applicationtype对应的所有classname启发式对象
          List<Heuristic> heuristics = ElephantContext.instance().getHeuristicsForApplicationType(getAppType());
          for (Heuristic heuristic : heuristics) {
          // 获取HeuristicConf.xml里params标签下的exclude_jobtypes_filter值
            String confExcludedApps = heuristic.getHeuristicConfData().getParamMap().get(EXCLUDE_JOBTYPE);
    
            if (confExcludedApps == null || confExcludedApps.length() == 0 ||
                    !Arrays.asList(confExcludedApps.split(",")).contains(jobTypeName)) {        // 应用启发式算法生成HeuristicResult分析结果
              HeuristicResult result = heuristic.apply(data);
              if (result != null) {
                analysisResults.add(result);
              }
            }
          }
        }
    
        // 根据AggregatorConf.xml获取appType对应的HadoopMetricsAggregator实现类
        HadoopMetricsAggregator hadoopMetricsAggregator = ElephantContext.instance().getAggregatorForApplicationType(getAppType());
        // 分别计算Job的资源使用、资源浪费、等待耗时 3大指标
        hadoopMetricsAggregator.aggregate(data);
        HadoopAggregatedData hadoopAggregatedData = hadoopMetricsAggregator.getResult();
    
        // 配置Job基础信息
        // Utils.truncateField(),根据limit截断字符串: field.substring(0, limit - 3) + "..."
        AppResult result = new AppResult();
        result.id = Utils.truncateField(getAppId(), AppResult.ID_LIMIT, getAppId());
        result.trackingUrl = Utils.truncateField(getTrackingUrl(), AppResult.TRACKING_URL_LIMIT, getAppId());
        result.queueName = Utils.truncateField(getQueueName(), AppResult.QUEUE_NAME_LIMIT, getAppId());
        result.username = Utils.truncateField(getUser(), AppResult.USERNAME_LIMIT, getAppId());
        result.startTime = getStartTime();
        result.finishTime = getFinishTime();
        result.name = Utils.truncateField(getName(), AppResult.APP_NAME_LIMIT, getAppId());
        result.jobType = Utils.truncateField(jobTypeName, AppResult.JOBTYPE_LIMIT, getAppId());
        result.resourceUsed = hadoopAggregatedData.getResourceUsed();
        result.totalDelay = hadoopAggregatedData.getTotalDelay();
        result.resourceWasted = hadoopAggregatedData.getResourceWasted();
    
        // 配置Job启发式算法计算值
        int jobScore = 0;
        result.yarnAppHeuristicResults = new ArrayList<AppHeuristicResult>();
        Severity worstSeverity = Severity.NONE;
        for (HeuristicResult heuristicResult : analysisResults) {
          AppHeuristicResult detail = new AppHeuristicResult();
          detail.heuristicClass = Utils.truncateField(heuristicResult.getHeuristicClassName(),
              AppHeuristicResult.HEURISTIC_CLASS_LIMIT, getAppId());
          detail.heuristicName = Utils.truncateField(heuristicResult.getHeuristicName(),
              AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
          detail.severity = heuristicResult.getSeverity();
          detail.score = heuristicResult.getScore();
    
          // Load Heuristic Details
          for (HeuristicResultDetails heuristicResultDetails : heuristicResult.getHeuristicResultDetails()) {
            AppHeuristicResultDetails heuristicDetail = new AppHeuristicResultDetails();
            heuristicDetail.yarnAppHeuristicResult = detail;
            heuristicDetail.name = Utils.truncateField(heuristicResultDetails.getName(),
                AppHeuristicResultDetails.NAME_LIMIT, getAppId());
            heuristicDetail.value = Utils.truncateField(heuristicResultDetails.getValue(),
                AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
            heuristicDetail.details = Utils.truncateField(heuristicResultDetails.getDetails(),
                AppHeuristicResultDetails.DETAILS_LIMIT, getAppId());
            // This was added for AnalyticTest. Commenting this out to fix a bug. Also disabling AnalyticJobTest.
            //detail.yarnAppHeuristicResultDetails = new ArrayList<AppHeuristicResultDetails>();
            // bug! yarnAppHeuristicResultDetails没有new ArrayList<>();
            detail.yarnAppHeuristicResultDetails.add(heuristicDetail);
          }
          result.yarnAppHeuristicResults.add(detail);
          // 取最严重的预警值
          worstSeverity = Severity.max(worstSeverity, detail.severity);
          jobScore += detail.score;
        }
        result.severity = worstSeverity;
        result.score = jobScore;
    
        // Retrieve information from job configuration like scheduler information and store them into result.
        // 根据SchedulerConf.xml,将scheduler的信息保存到result
        InfoExtractor.loadInfo(result, data);
    
        return result;
      }
      
      ...
    }
    

    MapReduceFSFetcherHadoop2

    该类主要根据job的appId、finishTime获取{done-dir}、{intermediate-done-dir}两个hdfs路径,从而获取job对应的.jhist、.conf文件并解析,返回JobCounter、MapTask、ReducerTask信息

    public class MapReduceFSFetcherHadoop2 extends MapReduceFetcher {
      private static final Logger logger = Logger.getLogger(MapReduceFSFetcherHadoop2.class);
    
      private static final String LOG_SIZE_XML_FIELD = "history_log_size_limit_in_mb";
      private static final String HISTORY_SERVER_TIME_ZONE_XML_FIELD = "history_server_time_zone";
      private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
      private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
      protected static final double DEFALUT_MAX_LOG_SIZE_IN_MB = 500;
    
      private FileSystem _fs;
      private String _historyLocation;
      private String _intermediateHistoryLocation;
      private double _maxLogSizeInMB;
      private TimeZone _timeZone;
    
      public MapReduceFSFetcherHadoop2(FetcherConfigurationData fetcherConfData) throws IOException {
        super(fetcherConfData);
    
        // 设置单个.jhist文件最大数据量500MB
        _maxLogSizeInMB = DEFALUT_MAX_LOG_SIZE_IN_MB;
        if (fetcherConfData.getParamMap().get(LOG_SIZE_XML_FIELD) != null) {
          double[] logLimitSize = Utils.getParam(fetcherConfData.getParamMap().get(LOG_SIZE_XML_FIELD), 1);
          if (logLimitSize != null) {
            _maxLogSizeInMB = logLimitSize[0];
          }
        }
        logger.info("The history log limit of MapReduce application is set to " + _maxLogSizeInMB + " MB");
    
        // 设置时区,这个初始时要修改FetcherConf.xml里history_server_time_zone的值为CST
        String timeZoneStr = fetcherConfData.getParamMap().get(HISTORY_SERVER_TIME_ZONE_XML_FIELD);
        _timeZone = timeZoneStr == null ? TimeZone.getDefault() : TimeZone.getTimeZone(timeZoneStr);
        logger.info("Using timezone: " + _timeZone.getID());
    
        Configuration conf = new Configuration();
        this._historyLocation = conf.get("mapreduce.jobhistory.done-dir");
        this._intermediateHistoryLocation = conf.get("mapreduce.jobhistory.intermediate-done-dir");
        try {
          URI uri = new URI(this._historyLocation);
          // 基于"mapreduce.jobhistory.done-dir",创建FileSystem hdfs文件对象
          this._fs = FileSystem.get(uri, conf);
        } catch( URISyntaxException ex) {
          this._fs = FileSystem.get(conf);
        }
        logger.info("Intermediate history dir: " + _intermediateHistoryLocation);
        logger.info("History done dir: " + _historyLocation);
      }
    
      public String getHistoryLocation() {
        return _historyLocation;
      }
    
      public double getMaxLogSizeInMB() {
        return _maxLogSizeInMB;
      }
    
      public TimeZone getTimeZone() {
        return _timeZone;
      }
    
      /**
       * The location of a job history file is in format: {done-dir}/yyyy/mm/dd/{serialPart}.
       * yyyy/mm/dd is the year, month and date of the finish time.
       * serialPart is the first 6 digits of the serial number considering it as a 9 digits number.
       * PS: The serial number is the last part of an app id.
       * <p>
       * For example, if appId = application_1461566847127_84624, then serial number is 84624.
       * Consider it as a 9 digits number, serial number is 000084624. So the corresponding
       * serialPart is 000084. If this application finish at 2016-5-30, its history file will locate
       * at {done-dir}/2016/05/30/000084
       * </p>
       * <p>
       * Furthermore, this location format is only satisfied for finished jobs in {done-dir} and not
       * for running jobs in {intermediate-done-dir}.
       * </p>
       */
       // 根据job.getFinishTime()生成{done-dir}/{yyyy}/{MM}/{dd}/{serialPart}
       // appId = application_1461566847127_84624,则{serialNumber}: 84624。再高位补0取9位:000084624,最后取前6位:000084 生成{serialPart}
      protected String getHistoryDir(AnalyticJob job) {
        // generate the date part
        Calendar timestamp = Calendar.getInstance(_timeZone);
        timestamp.setTimeInMillis(job.getFinishTime());
        String datePart = String.format(TIMESTAMP_DIR_FORMAT,
                timestamp.get(Calendar.YEAR),
                timestamp.get(Calendar.MONTH) + 1,
                timestamp.get(Calendar.DAY_OF_MONTH));
    
        // generate the serial part
        String appId = job.getAppId();
        int serialNumber = Integer.parseInt(appId.substring(appId.lastIndexOf('_') + 1));
        String serialPart = String.format("%09d", serialNumber)
                .substring(0, SERIAL_NUMBER_DIRECTORY_DIGITS);
    
        return StringUtils.join(new String[]{_historyLocation, datePart, serialPart, ""}, File.separator);
      }
    
      private DataFiles getHistoryFiles(AnalyticJob job) throws IOException {
        // application_1461566847127_84624 -> job_1461566847127_84624
        String jobId = Utils.getJobIdFromApplicationId(job.getAppId());
        String jobConfPath = null;
        String jobHistPath = null;
    
        // Search files in done dir
        // {done-dir}/{yyyy}/{MM}/{dd}/{serialPart}。注意: 这是hdfs文件系统
        String jobHistoryDirPath = getHistoryDir(job);
    
        // 当jobHistoryDir存在时,循环遍历文件名,contains(jobId)且后缀为"_conf.xml"、".jhist"时停止遍历
        if (_fs.exists(new Path(jobHistoryDirPath))) {
          RemoteIterator<LocatedFileStatus> it = _fs.listFiles(new Path(jobHistoryDirPath), false);
          while (it.hasNext() && (jobConfPath == null || jobHistPath == null)) {
            String name = it.next().getPath().getName();
            if (name.contains(jobId)) {
              if (name.endsWith("_conf.xml")) {
                jobConfPath = jobHistoryDirPath + name;
              } else if (name.endsWith(".jhist")) {
                jobHistPath = jobHistoryDirPath + name;
              }
            }
          }
        }
    
        // If some files are missing, search in the intermediate-done-dir in case the HistoryServer has
        // not yet moved them into the done-dir.
        // 当jobConfPath、jobHistPath为null时,说明done文件夹没找到对应job的conf配置
        // intermediateDirPath: {intermediate-done-dir}/{user}/,从中转站读取job信息
        String intermediateDirPath = _intermediateHistoryLocation + File.separator + job.getUser() + File.separator;
        if (jobConfPath == null) {
          jobConfPath = intermediateDirPath + jobId + "_conf.xml";
          if (!_fs.exists(new Path(jobConfPath))) {
            throw new FileNotFoundException("Can't find config of " + jobId + " in neither " + jobHistoryDirPath + " nor " + intermediateDirPath);
          }
          logger.info("Found job config in intermediate dir: " + jobConfPath);
        }
        if (jobHistPath == null) {
          try {
            RemoteIterator<LocatedFileStatus> it = _fs.listFiles(new Path(intermediateDirPath), false);
            while (it.hasNext()) {
              String name = it.next().getPath().getName();
              if (name.contains(jobId) && name.endsWith(".jhist")) {
                jobHistPath = intermediateDirPath + name;
                logger.info("Found history file in intermediate dir: " + jobHistPath);
                break;
              }
            }
          } catch (FileNotFoundException e) {
            logger.error("Intermediate history directory " + intermediateDirPath + " not found");
          }
          if (jobHistPath == null) {
            throw new FileNotFoundException("Can't find history file of " + jobId + " in neither " + jobHistoryDirPath + " nor " + intermediateDirPath);
          }
        }
    
        return new DataFiles(jobConfPath, jobHistPath);
      }
    
      @Override
      public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException {
        // 封装jobConfPath、jobHistPath路径
        DataFiles files = getHistoryFiles(job);
        String confFile = files.getJobConfPath();
        String histFile = files.getJobHistPath();
        String appId = job.getAppId();
        
        // application_1461566847127_84624 -> job_1461566847127_84624
        String jobId = Utils.getJobIdFromApplicationId(appId);
    
        MapReduceApplicationData jobData = new MapReduceApplicationData();
        jobData.setAppId(appId).setJobId(jobId);
    
        // 加载confFile配置: jobConf.addResource(),然后转成Properties
        Configuration jobConf = new Configuration(false);
        jobConf.addResource(_fs.open(new Path(confFile)), confFile);
        Properties jobConfProperties = new Properties();
        for (Map.Entry<String, String> entry : jobConf) {
          jobConfProperties.put(entry.getKey(), entry.getValue());
        }
        jobData.setJobConf(jobConfProperties);
    
        // Check if job history file is too large and should be throttled
        // 限制job history文件大小,也就是字节长度<=500MB
        if (_fs.getFileStatus(new Path(histFile)).getLen() > _maxLogSizeInMB * FileUtils.ONE_MB) {
          String errMsg = "The history log of MapReduce application: " + appId + " is over the limit size of " + _maxLogSizeInMB + " MB, the parsing process gets throttled.";
          logger.warn(errMsg);
          jobData.setDiagnosticInfo(errMsg);
          jobData.setSucceeded(false);  // set succeeded to false to avoid heuristic analysis
          return jobData;
        }
    
        // Analyze job history file
        // 解析job history文件,new JobHistoryParser().parse()生成JobInfo对象
        JobHistoryParser parser = new JobHistoryParser(_fs, histFile);
        JobHistoryParser.JobInfo jobInfo = parser.parse();
        // 判定是否解析异常
        IOException parseException = parser.getParseException();
        if (parseException != null) {
          throw new RuntimeException("Could not parse history file " + histFile, parseException);
        }
    
        jobData.setSubmitTime(jobInfo.getSubmitTime());
        jobData.setStartTime(jobInfo.getLaunchTime());
        jobData.setFinishTime(jobInfo.getFinishTime());
    
        String state = jobInfo.getJobStatus();
        if (state.equals("SUCCEEDED")) {
          jobData.setSucceeded(true);
        }
        else if (state.equals("FAILED")) {
          jobData.setSucceeded(false);
          jobData.setDiagnosticInfo(jobInfo.getErrorInfo());
        } else {
          throw new RuntimeException("job neither succeeded or failed. can not process it ");
        }
    
        // Fetch job counter
        // 获取Counter统计信息 <group, <counter.name, counter.value>>
        MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters());
    
        // Fetch task data
        // 获取AllTasks,然后根据TaskType进行MAP、REDUCE分类。一个job对应多个Map、Reduce
        Map<TaskID, JobHistoryParser.TaskInfo> allTasks = jobInfo.getAllTasks();
        List<JobHistoryParser.TaskInfo> mapperInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
        List<JobHistoryParser.TaskInfo> reducerInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
        for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) {
          if (taskInfo.getTaskType() == TaskType.MAP) {
            mapperInfoList.add(taskInfo);
          } else {
            reducerInfoList.add(taskInfo);
          }
        }
        if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) {
          logger.debug(jobId + " total mappers: " + mapperInfoList.size());
        }
        if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) {
          logger.debug(jobId + " total reducers: " + reducerInfoList.size());
        }
        // 获取Task信息列表
        MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList);
        MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList);
    
    jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList);
    
        return jobData;
      }
    
      // <group, <counter.name, counter.value>>
      private MapReduceCounterData getCounterData(Counters counters) {
        MapReduceCounterData holder = new MapReduceCounterData();
        if (counters != null) {
          for (CounterGroup group : counters) {
            String groupName = group.getName();
            for (Counter counter : group) {
              holder.set(groupName, counter.getName(), counter.getValue());
            }
          }
        }
        return holder;
      }
    
      // 根据TaskAttemptInfo计算map、shuffle、sort、reduce各阶段的执行时间
      // taskExecTime按顺序:_totalTimeMs、_shuffleTimeMs、_sortTimeMs、_startTimeMs、_finishTimeMs
      // MAP类型时,没有shuffle、sort阶段;REDUCE类型时,start->shuffle->sort->finish
      private long[] getTaskExecTime(JobHistoryParser.TaskAttemptInfo attempInfo) {
        long startTime = attempInfo.getStartTime();
        long finishTime = attempInfo.getFinishTime();
        boolean isMapper = (attempInfo.getTaskType() == TaskType.MAP);
    
        long[] time;
        if (isMapper) {
          time = new long[]{finishTime - startTime, 0, 0, startTime, finishTime};
        } else {
          long shuffleFinishTime = attempInfo.getShuffleFinishTime();
          long mergeFinishTime = attempInfo.getSortFinishTime();
          time = new long[]{finishTime - startTime, shuffleFinishTime - startTime,
                  mergeFinishTime - shuffleFinishTime, startTime, finishTime};
        }
        return time;
      }
    
      // 获取Task信息列表,包括:taskId、attemptId、taskStatus、taskExecTime、taskCounter
      protected MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.TaskInfo> infoList) {
        // 获取FetcherConf.xml里sampling_enabled的设置。当需要抽样且infoList长度大于200时,Collections.shuffle(taskList),然后返回最小值Math.min(taskList.size(), MAX_SAMPLE_SIZE)
        // 这里的infoList是单个job对应的Map或Reduce类型任务数列表
        int sampleSize = sampleAndGetSize(jobId, infoList);
    
        List<MapReduceTaskData> taskList = new ArrayList<MapReduceTaskData>();
        for (int i = 0; i < sampleSize; i++) {
          JobHistoryParser.TaskInfo tInfo = infoList.get(i);
    
          String taskId = tInfo.getTaskId().toString();
          TaskAttemptID attemptId = null;
          // 根据TaskStatus获取对应的attemptId
          if(tInfo.getTaskStatus().equals("SUCCEEDED")) {
            attemptId = tInfo.getSuccessfulAttemptId();
          } else {
            attemptId = tInfo.getFailedDueToAttemptId();
          }
    
          MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId == null ? "" : attemptId.toString() , tInfo.getTaskStatus());
            
          // Counters细分Job粒度、Task粒度
          MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());
    
          long[] taskExecTime = null;
          if (attemptId != null) {
            // 根据TaskAttemptInfo,获取Task各阶段执行耗时
            taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));
          }
    
          taskData.setTimeAndCounter(taskExecTime, taskCounterData);
          taskList.add(taskData);
        }
        return taskList.toArray(new MapReduceTaskData[taskList.size()]);
      }
    
      private class DataFiles {
        private String jobConfPath;
        private String jobHistPath;
    
        public DataFiles(String confPath, String histPath) {
          this.jobConfPath = confPath;
          this.jobHistPath = histPath;
        }
    
        public String getJobConfPath() {
          return jobConfPath;
        }
    
        public void setJobConfPath(String jobConfPath) {
          this.jobConfPath = jobConfPath;
        }
    
        public String getJobHistPath() {
          return jobHistPath;
        }
    
        public void setJobHistPath(String jobHistPath) {
          this.jobHistPath = jobHistPath;
        }
      }
    }
    

    引申

    Hadoop集群Rest Api汇总

    MR Application Master
    MR History Server
    Introduction
    Resource Manager
    Node Manager
    Timeline Server
    Timeline Service V2

    相关文章

      网友评论

        本文标题:Dr.Elephant源码分析--MapReduce

        本文链接:https://www.haomeiwen.com/subject/xqbcyftx.html