美文网首页
hadoop-distcp源码篇

hadoop-distcp源码篇

作者: flood_d | 来源:发表于2020-09-20 01:27 被阅读0次

    0.背景介绍

    ~~~~DistCp(Distributed Copy)是Apache Hadoop自带的工具,目前存在两个版本,DistCp1和DistCp2。它是用于大规模集群内部或者集群之间的高性能拷贝工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝。
    ~~~~DistCp第一版使用了MapReduce并发拷贝数据,它将整个数据拷贝过程转化为一个map-only Job以加快拷贝速度。由于DistCp本质上是一个MapReduce作业,它需要保证文件中各个block的有序性,因此它的最小数据切分粒度是文件,也就是说,一个文件不能被切分成不同部分让多个任务并行拷贝,最小只能做到一个文件交给一个任务。

    1.使用样例

    1.1参数介绍

    标识 描述 备注
    -i 忽略失败 这个选项会比默认情况提供关于拷贝的更精确的统计, 同时它还将保留失败拷贝操作的日志,这些日志信息可以用于调试。最后,如果一个map失败了,但并没完成所有分块任务的尝试,这不会导致整个作业的失败。
    -log <logdir> 记录日志到 <logdir> DistCp为每个文件的每次尝试拷贝操作都记录日志,并把日志作为map的输出。 如果一个map失败了,当重新执行时这个日志不会被保留。
    -m <num_maps> 同时拷贝的最大数目 指定了拷贝数据时map的数目,默认是20。请注意并不是map数越多吞吐量越大。
    -overwrite 覆盖目标 如果一个map失败并且没有使用-i选项,不仅仅那些拷贝失败的文件,这个分块任务中的所有文件都会被重新拷贝。
    -update 如果源和目标的大小不一样则进行覆盖 像之前提到的,这不是"同步"操作。 执行覆盖的唯一标准是源文件和目标文件大小是否相同;如果不同,则源文件替换目标文件。
    -strategy {dynamic/uniformsize} 选择要在DistCp中使用的复制策略。 默认情况下,使用uniformsize(每个map分配均衡的文件数量)。如果指定了“dynamic”,则使用DynamicInputFormat(动态进行map承担文件数量的分配,速度快的map分配的文件数量更多)。
    ... ... ...

    1.2文件复制

    hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo
    

    将nn1上面的foo/bar文件复制导nn2的bar/foo路径下面。

    1.3文件夹复制

    hadoop distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
    

    将nn1上面文件夹source/first和source/second下面的文件复制到nn2的target文件夹下面。

    2.源码分析

    2.1源码宏观流程

    distcp宏观流程图.jpg

    2.2源码细节分析

    2.2.1细节主要的调用图

    distcp.jpg

    2.2.2步骤分析

    • shell入口
      进入hadoop文件
    ......
    elif [ "$COMMAND" = "distcp" ] ; then
          CLASS=org.apache.hadoop.tools.DistCp
          CLASSPATH=${CLASSPATH}:${TOOL_PATH}
    ......
    # Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
    HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
    
    HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
    
    export CLASSPATH=$CLASSPATH
    exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
    ;;
    ......
    
    • distcp的main方法
      Distcp是一个Tool、ToolRunner应用,Tool应用要求实现run方法。进入main方法主要有构造器构造distcp类;建立Cleanup,并加入ShutdownHookManager;最后运行ToolRunner.run执行复制功能。
    public class DistCp extends Configured implements Tool {
        ......
        public int run(String[] argv) {
            ......
        }
        ......
        ......
        public static void main(String argv[]) {
            System.out.println("test");
            int exitCode;
            try {
              DistCp distCp = new DistCp();
              Cleanup CLEANUP = new Cleanup(distCp);
        
              ShutdownHookManager.get().addShutdownHook(CLEANUP,
                SHUTDOWN_HOOK_PRIORITY);
              exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
            }
            catch (Exception e) {
              LOG.error("Couldn't complete DistCp operation: ", e);
              exitCode = DistCpConstants.UNKNOWN_ERROR;
            }
            System.exit(exitCode);
          }
        ......
    }
    
    • run方法
      OptionsParser类是distcp单独实现的参数解析工具类。将输入参数解析成DistCpOptions inputOptions类型。如常见的参数overwrite = false等等。程序中相关参数都有特定的默认值,比如map数量的默认值是20,分配策略类的默认方式是均衡分配等等。
      setTargetPathExists():从参数中解析出目标路径。
      execute():核心执行方法。
    @Override
      public int run(String[] argv) {
        if (argv.length < 1) {
          OptionsParser.usage();
          return DistCpConstants.INVALID_ARGUMENT;
        }
        
        try {
          inputOptions = (OptionsParser.parse(argv));
          setTargetPathExists();
          LOG.info("Input Options: " + inputOptions);
        } catch (Throwable e) {
          LOG.error("Invalid arguments: ", e);
          System.err.println("Invalid arguments: " + e.getMessage());
          OptionsParser.usage();      
          return DistCpConstants.INVALID_ARGUMENT;
        }
        
        try {
          execute();
        } catch (InvalidInputException e) {
          LOG.error("Invalid input: ", e);
          return DistCpConstants.INVALID_ARGUMENT;
        } catch (DuplicateFileException e) {
          LOG.error("Duplicate files in input path: ", e);
          return DistCpConstants.DUPLICATE_INPUT;
        } catch (AclsNotSupportedException e) {
          LOG.error("ACLs not supported on at least one file system: ", e);
          return DistCpConstants.ACLS_NOT_SUPPORTED;
        } catch (XAttrsNotSupportedException e) {
          LOG.error("XAttrs not supported on at least one file system: ", e);
          return DistCpConstants.XATTRS_NOT_SUPPORTED;
        } catch (Exception e) {
          LOG.error("Exception encountered ", e);
          return DistCpConstants.UNKNOWN_ERROR;
        }
        return DistCpConstants.SUCCESS;
      }
    
    • execute()
      在execute()方法中,会调用createAndSubmitJob()创建MR任务,准备数据,设定数据输入格式,并把任务提交到hadoop集群运行,最后等待任务执行完毕。于是我们可以看到,主体功能实现就在createAndSubmitJob()这个函数体里,工程中其它的各个类无非都是为这个函数接口服务的。
    public Job execute() throws Exception {
        Job job = createAndSubmitJob();
    
        if (inputOptions.shouldBlock()) {
          waitForJobCompletion(job);
        }
        return job;
      }
    
    public Job createAndSubmitJob() throws Exception {
        assert inputOptions != null;
        assert getConf() != null;
        Job job = null;
        try {
          synchronized(this) {
            //Don't cleanup while we are setting up.
            metaFolder = createMetaFolderPath();
            jobFS = metaFolder.getFileSystem(getConf());
            job = createJob();
          }
          if (inputOptions.shouldUseDiff()) {
            if (!DistCpSync.sync(inputOptions, getConf())) {
              inputOptions.disableUsingDiff();
            }
          }
          createInputFileListing(job);
    
          job.submit();
          submitted = true;
        } finally {
          if (!submitted) {
            cleanup();
          }
        }
    
        String jobID = job.getJobID().toString();
        job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
        LOG.info("DistCp job-id: " + jobID);
    
        return job;
      }
    
    • metaFolder
      一个Path类型。其存放着distcp工具需要的元数据信息,也就是所有需要拷贝的源目录/文件信息列表。这些数据在一个fileList.seq文件中以Key/Value结构进行保存,其中Key是源文件的Text格式的相对路径,而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类,metafolder目录中的fileList.seq最终会作为参数传递给MR任务中的Mapper。
      private Path createMetaFolderPath() throws Exception {
        Configuration configuration = getConf();
        Path stagingDir = JobSubmissionFiles.getStagingDir(
                new Cluster(configuration), configuration);
        Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
        if (LOG.isDebugEnabled())
          LOG.debug("Meta folder location: " + metaFolderPath);
        configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
        return metaFolderPath;
      }
    
    • job = createJob()
      创建MR job的地方。
      private Job createJob() throws IOException {
        String jobName = "distcp";
        String userChosenName = getConf().get(JobContext.JOB_NAME);
        if (userChosenName != null)
          jobName += ": " + userChosenName;
        Job job = Job.getInstance(getConf());
        job.setJobName(jobName);
        job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
        job.setJarByClass(CopyMapper.class);
        configureOutputFormat(job);
    
        job.setMapperClass(CopyMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(CopyOutputFormat.class);
        job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
        job.getConfiguration().set(JobContext.NUM_MAPS,
                      String.valueOf(inputOptions.getMaxMaps()));
    
        if (inputOptions.getSslConfigurationFile() != null) {
          setupSSLConfig(job);
        }
    
        inputOptions.appendToConf(job.getConfiguration());
        return job;
      }
    

    重点看这两行代码,job.setInputFormatClass主要是设定job中任务的分配策略,分为UniformSizeInputFormat和DynamicInputFormat两种,UniformSizeInputFormat表示均衡分配任务,也就是设定的map中,每个map分配同样的任务数,DynamicInputFormat表示动态分为任务书,也就是动态的根据每个map运行的速度来分为具体map的任务数;job.setMapperClass主要设定map任务的具体逻辑。

    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
    job.setMapperClass(CopyMapper.class)
    
    • CopyMapper类的源码
      拷贝工作实际做的地方,主要看setup()、map()方法
    public void setup(Context context) throws IOException, InterruptedException {
        conf = context.getConfiguration();
    
        syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
        ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
        skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
        overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
        append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
        preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
            PRESERVE_STATUS.getConfigLabel()));
    
        targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
        Path targetFinalPath = new Path(conf.get(
                DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
        targetFS = targetFinalPath.getFileSystem(conf);
    
        if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
          overWrite = true; // When target is an existing file, overwrite it.
        }
    
        if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
          initializeSSLConf(context);
        }
      }
    
      public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
              Context context) throws IOException, InterruptedException {
        Path sourcePath = sourceFileStatus.getPath();
    
        if (LOG.isDebugEnabled())
          LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
    
        Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
                              targetFS.getWorkingDirectory()) + relPath.toString());
    
        EnumSet<DistCpOptions.FileAttribute> fileAttributes
                = getFileAttributeSettings(context);
        final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
            DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
    
        final String description = "Copying " + sourcePath + " to " + target;
        context.setStatus(description);
    
        LOG.info(description);
    
        try {
          CopyListingFileStatus sourceCurrStatus;
          FileSystem sourceFS;
          try {
            sourceFS = sourcePath.getFileSystem(conf);
            final boolean preserveXAttrs =
                fileAttributes.contains(FileAttribute.XATTR);
            sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
              sourceFS.getFileStatus(sourcePath),
              fileAttributes.contains(FileAttribute.ACL), 
              preserveXAttrs, preserveRawXattrs);
          } catch (FileNotFoundException e) {
            throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
          }
    
          FileStatus targetStatus = null;
    
          try {
            targetStatus = targetFS.getFileStatus(target);
          } catch (FileNotFoundException ignore) {
            if (LOG.isDebugEnabled())
              LOG.debug("Path could not be found: " + target, ignore);
          }
    
          if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
            throw new IOException("Can't replace " + target + ". Target is " +
                getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
          }
    
          if (sourceCurrStatus.isDirectory()) {
            createTargetDirsWithRetry(description, target, context);
            return;
          }
    
          FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
          if (action == FileAction.SKIP) {
            LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                     + " to " + target);
            updateSkipCounters(context, sourceCurrStatus);
            context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
          } else {
            copyFileWithRetry(description, sourceCurrStatus, target, context,
                action, fileAttributes);
          }
    
          DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
              fileAttributes, preserveRawXattrs);
        } catch (IOException exception) {
          handleFailures(exception, sourceFileStatus, target, context);
        }
      }
    
    • copyFileWithRetry
      拷贝动作,这个函数最底层调用的是常用的Java输入输出流的方式,以此方式来完成点对点拷贝。即copyToFile里面的copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。
    private void copyFileWithRetry(String description,
          FileStatus sourceFileStatus, Path target, Context context,
          FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
          throws IOException {
        long bytesCopied;
        try {
          bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
              action).execute(sourceFileStatus, target, context, fileAttributes);
        } catch (Exception e) {
          context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
          throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
              " --> " + target, e);
        }
        incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
        incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
        incrementCounter(context, Counter.COPY, 1);
      }
    

    至此,拷贝的job任务完成设定、提交以及执行,也就意味着distcp命令执行完成,即实现了跨集群的文件拷贝。

    文本资源来自于互联网和书本整理,仅供学习,有侵权联系删除。

    相关文章

      网友评论

          本文标题:hadoop-distcp源码篇

          本文链接:https://www.haomeiwen.com/subject/lvhwjktx.html