0.背景介绍
DistCp(Distributed Copy)是Apache Hadoop自带的工具,目前存在两个版本,DistCp1和DistCp2。它是用于大规模集群内部或者集群之间的高性能拷贝工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝。
DistCp第一版使用了MapReduce并发拷贝数据,它将整个数据拷贝过程转化为一个map-only Job以加快拷贝速度。由于DistCp本质上是一个MapReduce作业,它需要保证文件中各个block的有序性,因此它的最小数据切分粒度是文件,也就是说,一个文件不能被切分成不同部分让多个任务并行拷贝,最小只能做到一个文件交给一个任务。
1.使用样例
1.1参数介绍
标识 | 描述 | 备注 |
---|---|---|
-i | 忽略失败 | 这个选项会比默认情况提供关于拷贝的更精确的统计, 同时它还将保留失败拷贝操作的日志,这些日志信息可以用于调试。最后,如果一个map失败了,但并没完成所有分块任务的尝试,这不会导致整个作业的失败。 |
-log <logdir> | 记录日志到 <logdir> | DistCp为每个文件的每次尝试拷贝操作都记录日志,并把日志作为map的输出。 如果一个map失败了,当重新执行时这个日志不会被保留。 |
-m <num_maps> | 同时拷贝的最大数目 | 指定了拷贝数据时map的数目,默认是20。请注意并不是map数越多吞吐量越大。 |
-overwrite | 覆盖目标 | 如果一个map失败并且没有使用-i选项,不仅仅那些拷贝失败的文件,这个分块任务中的所有文件都会被重新拷贝。 |
-update | 如果源和目标的大小不一样则进行覆盖 | 像之前提到的,这不是"同步"操作。 执行覆盖的唯一标准是源文件和目标文件大小是否相同;如果不同,则源文件替换目标文件。 |
-strategy {dynamic/uniformsize} | 选择要在DistCp中使用的复制策略。 | 默认情况下,使用uniformsize(每个map分配均衡的文件数量)。如果指定了“dynamic”,则使用DynamicInputFormat(动态进行map承担文件数量的分配,速度快的map分配的文件数量更多)。 |
... | ... | ... |
1.2文件复制
hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo
将nn1上面的foo/bar文件复制导nn2的bar/foo路径下面。
1.3文件夹复制
hadoop distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target
将nn1上面文件夹source/first和source/second下面的文件复制到nn2的target文件夹下面。
2.源码分析
2.1源码宏观流程
distcp宏观流程图.jpg2.2源码细节分析
2.2.1细节主要的调用图
distcp.jpg2.2.2步骤分析
- shell入口
进入hadoop文件
......
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.tools.DistCp
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
......
# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"
export CLASSPATH=$CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
;;
......
- distcp的main方法
Distcp是一个Tool、ToolRunner应用,Tool应用要求实现run方法。进入main方法主要有构造器构造distcp类;建立Cleanup,并加入ShutdownHookManager;最后运行ToolRunner.run执行复制功能。
public class DistCp extends Configured implements Tool {
......
public int run(String[] argv) {
......
}
......
......
public static void main(String argv[]) {
System.out.println("test");
int exitCode;
try {
DistCp distCp = new DistCp();
Cleanup CLEANUP = new Cleanup(distCp);
ShutdownHookManager.get().addShutdownHook(CLEANUP,
SHUTDOWN_HOOK_PRIORITY);
exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
}
catch (Exception e) {
LOG.error("Couldn't complete DistCp operation: ", e);
exitCode = DistCpConstants.UNKNOWN_ERROR;
}
System.exit(exitCode);
}
......
}
- run方法
OptionsParser类是distcp单独实现的参数解析工具类。将输入参数解析成DistCpOptions inputOptions类型。如常见的参数overwrite = false等等。程序中相关参数都有特定的默认值,比如map数量的默认值是20,分配策略类的默认方式是均衡分配等等。
setTargetPathExists():从参数中解析出目标路径。
execute():核心执行方法。
@Override
public int run(String[] argv) {
if (argv.length < 1) {
OptionsParser.usage();
return DistCpConstants.INVALID_ARGUMENT;
}
try {
inputOptions = (OptionsParser.parse(argv));
setTargetPathExists();
LOG.info("Input Options: " + inputOptions);
} catch (Throwable e) {
LOG.error("Invalid arguments: ", e);
System.err.println("Invalid arguments: " + e.getMessage());
OptionsParser.usage();
return DistCpConstants.INVALID_ARGUMENT;
}
try {
execute();
} catch (InvalidInputException e) {
LOG.error("Invalid input: ", e);
return DistCpConstants.INVALID_ARGUMENT;
} catch (DuplicateFileException e) {
LOG.error("Duplicate files in input path: ", e);
return DistCpConstants.DUPLICATE_INPUT;
} catch (AclsNotSupportedException e) {
LOG.error("ACLs not supported on at least one file system: ", e);
return DistCpConstants.ACLS_NOT_SUPPORTED;
} catch (XAttrsNotSupportedException e) {
LOG.error("XAttrs not supported on at least one file system: ", e);
return DistCpConstants.XATTRS_NOT_SUPPORTED;
} catch (Exception e) {
LOG.error("Exception encountered ", e);
return DistCpConstants.UNKNOWN_ERROR;
}
return DistCpConstants.SUCCESS;
}
- execute()
在execute()方法中,会调用createAndSubmitJob()创建MR任务,准备数据,设定数据输入格式,并把任务提交到hadoop集群运行,最后等待任务执行完毕。于是我们可以看到,主体功能实现就在createAndSubmitJob()这个函数体里,工程中其它的各个类无非都是为这个函数接口服务的。
public Job execute() throws Exception {
Job job = createAndSubmitJob();
if (inputOptions.shouldBlock()) {
waitForJobCompletion(job);
}
return job;
}
public Job createAndSubmitJob() throws Exception {
assert inputOptions != null;
assert getConf() != null;
Job job = null;
try {
synchronized(this) {
//Don't cleanup while we are setting up.
metaFolder = createMetaFolderPath();
jobFS = metaFolder.getFileSystem(getConf());
job = createJob();
}
if (inputOptions.shouldUseDiff()) {
if (!DistCpSync.sync(inputOptions, getConf())) {
inputOptions.disableUsingDiff();
}
}
createInputFileListing(job);
job.submit();
submitted = true;
} finally {
if (!submitted) {
cleanup();
}
}
String jobID = job.getJobID().toString();
job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
LOG.info("DistCp job-id: " + jobID);
return job;
}
- metaFolder
一个Path类型。其存放着distcp工具需要的元数据信息,也就是所有需要拷贝的源目录/文件信息列表。这些数据在一个fileList.seq文件中以Key/Value结构进行保存,其中Key是源文件的Text格式的相对路径,而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类,metafolder目录中的fileList.seq最终会作为参数传递给MR任务中的Mapper。
private Path createMetaFolderPath() throws Exception {
Configuration configuration = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(
new Cluster(configuration), configuration);
Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
if (LOG.isDebugEnabled())
LOG.debug("Meta folder location: " + metaFolderPath);
configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());
return metaFolderPath;
}
- job = createJob()
创建MR job的地方。
private Job createJob() throws IOException {
String jobName = "distcp";
String userChosenName = getConf().get(JobContext.JOB_NAME);
if (userChosenName != null)
jobName += ": " + userChosenName;
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS,
String.valueOf(inputOptions.getMaxMaps()));
if (inputOptions.getSslConfigurationFile() != null) {
setupSSLConfig(job);
}
inputOptions.appendToConf(job.getConfiguration());
return job;
}
重点看这两行代码,job.setInputFormatClass主要是设定job中任务的分配策略,分为UniformSizeInputFormat和DynamicInputFormat两种,UniformSizeInputFormat表示均衡分配任务,也就是设定的map中,每个map分配同样的任务数,DynamicInputFormat表示动态分为任务书,也就是动态的根据每个map运行的速度来分为具体map的任务数;job.setMapperClass主要设定map任务的具体逻辑。
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setMapperClass(CopyMapper.class)
- CopyMapper类的源码
拷贝工作实际做的地方,主要看setup()、map()方法
public void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
PRESERVE_STATUS.getConfigLabel()));
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get(
DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
targetFS = targetFinalPath.getFileSystem(conf);
if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
overWrite = true; // When target is an existing file, overwrite it.
}
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
initializeSSLConf(context);
}
}
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
Context context) throws IOException, InterruptedException {
Path sourcePath = sourceFileStatus.getPath();
if (LOG.isDebugEnabled())
LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);
Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
targetFS.getWorkingDirectory()) + relPath.toString());
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= getFileAttributeSettings(context);
final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);
final String description = "Copying " + sourcePath + " to " + target;
context.setStatus(description);
LOG.info(description);
try {
CopyListingFileStatus sourceCurrStatus;
FileSystem sourceFS;
try {
sourceFS = sourcePath.getFileSystem(conf);
final boolean preserveXAttrs =
fileAttributes.contains(FileAttribute.XATTR);
sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
sourceFS.getFileStatus(sourcePath),
fileAttributes.contains(FileAttribute.ACL),
preserveXAttrs, preserveRawXattrs);
} catch (FileNotFoundException e) {
throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
}
FileStatus targetStatus = null;
try {
targetStatus = targetFS.getFileStatus(target);
} catch (FileNotFoundException ignore) {
if (LOG.isDebugEnabled())
LOG.debug("Path could not be found: " + target, ignore);
}
if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
throw new IOException("Can't replace " + target + ". Target is " +
getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
}
if (sourceCurrStatus.isDirectory()) {
createTargetDirsWithRetry(description, target, context);
return;
}
FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
if (action == FileAction.SKIP) {
LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
+ " to " + target);
updateSkipCounters(context, sourceCurrStatus);
context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
} else {
copyFileWithRetry(description, sourceCurrStatus, target, context,
action, fileAttributes);
}
DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
fileAttributes, preserveRawXattrs);
} catch (IOException exception) {
handleFailures(exception, sourceFileStatus, target, context);
}
}
- copyFileWithRetry
拷贝动作,这个函数最底层调用的是常用的Java输入输出流的方式,以此方式来完成点对点拷贝。即copyToFile里面的copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。
private void copyFileWithRetry(String description,
FileStatus sourceFileStatus, Path target, Context context,
FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
throws IOException {
long bytesCopied;
try {
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
action).execute(sourceFileStatus, target, context, fileAttributes);
} catch (Exception e) {
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
" --> " + target, e);
}
incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
incrementCounter(context, Counter.COPY, 1);
}
至此,拷贝的job任务完成设定、提交以及执行,也就意味着distcp命令执行完成,即实现了跨集群的文件拷贝。
文本资源来自于互联网和书本整理,仅供学习,有侵权联系删除。
网友评论