美文网首页
MapReduce:N keys,N files(二)

MapReduce:N keys,N files(二)

作者: YG_9013 | 来源:发表于2018-11-22 20:22 被阅读0次

    如果你看了MapReduce:N keys,N files(一)这篇文章,并按其介绍的方法尝试去将N个key映射到N的文件中,你会发现分割后数据量比分割前的要多,并且有些文件不能正常读取。
    用presto读取的话,可能会报这种错:

    Query 20181122_073113_31966_aeiaw failed: Error opening Hive split hdfs://xxx3/dt=20181122/uiappid=300046/20181122150918_100.110.30.239_SkgSHZT8 (offset=62576808, length=62576807): Protocol message tag had invalid wire type.

    【问题现象】

    问题的直观现象是MR输出的orc文件,presto认为是无效的,无法读取。但并不是每一次MR的输出都会产生这种无效文件,有时有,有时没有。

    【尝试一】

    初步怀疑是reduce保存当前所有reduce文件RecordWriter的map被回收了(Map<String,OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>()),但又觉得不应该,因为OrcReNameMapreduceRecordWriter一直保持着引用关系。但还是将其调到了父类OrcReNameFileOutputFormat中。如下:

    package is.split;
    
    import org.apache.commons.lang.RandomStringUtils;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.orc.OrcFile;
    import org.apache.orc.Writer;
    import org.apache.orc.mapred.OrcStruct;
    import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
    import org.apache.orc.mapreduce.OrcOutputFormat;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class OrcReNameFileOutputFormat extends OrcOutputFormat {
        //保存各个key对应文件的writer
        static Map<String, OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>(); //移到了父类中
            //private OrcMapreduceRecordWriter realWrite ;
        @Override
        public RecordWriter<Text, OrcStruct> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
            return new OrcReNameMapreduceRecordWriter(taskAttemptContext);
        }
    
    
        private class OrcReNameMapreduceRecordWriter extends RecordWriter<Text, OrcStruct>{
    
            private TaskAttemptContext taskAttemptContext;
    
            public OrcReNameMapreduceRecordWriter(TaskAttemptContext taskAttemptContext){
                this.taskAttemptContext = taskAttemptContext;
            }
    
            //该函数接收的key是map阶段输出的key
            public void write(Text key, OrcStruct value) throws IOException, InterruptedException {
              //真正向文件中写数据的Writer,还是OrcMapreduceRecordWriter
                OrcMapreduceRecordWriter realWrite = map.get(key.toString());
                if (realWrite == null){
                    //String outputFileName = taskAttemptContext.getConfiguration().get("ouputfile.prefix.col.name", "uiappid");
                
                    String split_key =  key.toString();
                    //输出路径文件夹,以该key为文件夹
                    String outputDirPath = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR ) + "/" + split_key ;
                    //输出路径文件名,文件名是根据当前时间戳和六位随机数生成的
                    Path filename = new Path(new Path(outputDirPath), ISTool.getCurTime() + "_" + ISTool.getLocalIp() + "_" + RandomStringUtils.randomAlphanumeric(6) );
                    Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(taskAttemptContext.getConfiguration()));
                    realWrite = new OrcMapreduceRecordWriter<OrcStruct>(writer);
                    map.put(key.toString(), realWrite);
                }
                realWrite.write(NullWritable.get(), value);
            }
    
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                for (Map.Entry<String, OrcMapreduceRecordWriter> entry : map.entrySet()) {
                    if (entry.getValue() != null){
                        entry.getValue().close(context);
                    }
    
                }
            }
        }
    }
    
    

    尝试修改后没有效果。。。依旧是有时候生成不完整的orc文件,有时没有。

    【尝试二】

    网上没有找到答案。文件名中加上机器IP后发现单个key的文件会由两台机器分别生成。一个key不应该对应一个reduce吗?为啥一个key会生成两个文件。。难道reduce跑偏了??
    在浏览 https://www.cnblogs.com/YDDMAX/p/6828363.html 这篇文章的时候,看到OutputCommitter 有个abortJob方法,突然灵光一闪,无效的orc文件是不是备用的reduce任务生成的?后面查看OutputCommitter的方法验证了猜想。
    MR框架会对跑的慢的reduce任务起一个备份任务,两个同时跑。如果一个reduce一个输出的话不会出现这种问题,因为reduce的输出会写到一个临时文件,只有整个reduce跑成功之后,才会将该临时文件移动到指定的输出目录。任务跑成功后掉的是commitTask方法。可以看到,任务跑成功后,OutputCommitter会调用rename方法移动文件,如果algorithmVersion大于1,还会对生成的文件做合并。

      @Private
      public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
          throws IOException {
    
        TaskAttemptID attemptId = context.getTaskAttemptID();
        if (hasOutputPath()) {
          context.progress();
          if(taskAttemptPath == null) {
            taskAttemptPath = getTaskAttemptPath(context);
          }
          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
          FileStatus taskAttemptDirStatus;
          try {
            taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
          } catch (FileNotFoundException e) {
            taskAttemptDirStatus = null;
          }
    
          if (taskAttemptDirStatus != null) {
            if (algorithmVersion == 1) {
              Path committedTaskPath = getCommittedTaskPath(context);
              if (fs.exists(committedTaskPath)) {
                 if (!fs.delete(committedTaskPath, true)) {
                   throw new IOException("Could not delete " + committedTaskPath);
                 }
              }
              if (!fs.rename(taskAttemptPath, committedTaskPath)) { //移动文件
                throw new IOException("Could not rename " + taskAttemptPath + " to "
                    + committedTaskPath);
              }
              LOG.info("Saved output of task '" + attemptId + "' to " +
                  committedTaskPath);
            } else {
              // directly merge everything from taskAttemptPath to output directory
              mergePaths(fs, taskAttemptDirStatus, outputPath);
              LOG.info("Saved output of task '" + attemptId + "' to " +
                  outputPath);
            }
          } else {
            LOG.warn("No Output found for " + attemptId);
          }
        } else {
          LOG.warn("Output Path is null in commitTask()");
        }
      }
    
    

    而如果kill掉任务,OutputCommitter会删除该任务产生的临时目录:

     public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
        if (hasOutputPath()) { 
          context.progress();
          if(taskAttemptPath == null) {
            taskAttemptPath = getTaskAttemptPath(context);
          }
          FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
          if(!fs.delete(taskAttemptPath, true)) {//删除临时文件
            LOG.warn("Could not delete "+taskAttemptPath);
          }
        } else {
          LOG.warn("Output Path is null in abortTask()");
        }
      }
    

    回到我们这个程序,我是在reduce的过程中直接向指定的输出目录中写数据,如果reduce任务被kill了,其实数据已经写进去了,被kill的reduce产生的orc文件不是一个完整的orc文件,所以存在读数据格式问题。

    【解决方案】:

    将reduce任务备份执行的功能取消即可。

    conf.set("mapreduce.reduce.speculative","false");
    conf.set("mapreduce.map.speculative","false");
    

    相关文章

      网友评论

          本文标题:MapReduce:N keys,N files(二)

          本文链接:https://www.haomeiwen.com/subject/jysrqqtx.html