美文网首页
【2019-05-03】Hadoop的IO操作

【2019-05-03】Hadoop的IO操作

作者: BigBigFlower | 来源:发表于2019-05-03 16:50 被阅读0次

    数据完整性
    检测数据是否损坏的常见措施:在数据第一次引入系统时计算校验和(checksum)并在数据通过一个不可靠的通道进行传输时再次计算校验和,这样就能发现数据是否损坏。如果计算所得的新校验和与原来的校验和不匹配,即认为数据已经损坏。

    压缩
    Hadoop常见的压缩格式:


    Hadoop压缩格式

    如何用API来压缩从标准输入中读取的数据并写到标准输出。

    // // cc StreamCompressor A program to compress data read from standard input and write it to standard output
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.util.ReflectionUtils;
    
    // vv StreamCompressor
    public class StreamCompressor {
    
      public static void main(String[] args) throws Exception {
        String codecClassname = args[0];
        Class<?> codecClass = Class.forName(codecClassname);
        Configuration conf = new Configuration();
        CompressionCodec codec = (CompressionCodec)
          ReflectionUtils.newInstance(codecClass, conf);
        
        CompressionOutputStream out = codec.createOutputStream(System.out);
        IOUtils.copyBytes(System.in, out, 4096, false);
        out.finish();
      }
    }
    // ^^ StreamCompressor
    
    

    CompressionCodecFactory提供了一种可以将文件扩展名映射到一个CompressionCodec的方法。该方法取文件的path对象作为参数。

    // 根据文件扩展名选择codec解压缩格式
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    
    // vv FileDecompressor
    public class FileDecompressor {
    
      public static void main(String[] args) throws Exception {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        
        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if (codec == null) {
          System.err.println("No codec found for " + uri);
          System.exit(1);
        }
    
        String outputUri =
          CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
    
        InputStream in = null;
        OutputStream out = null;
        try {
          in = codec.createInputStream(fs.open(inputPath));
          out = fs.create(new Path(outputUri));
          IOUtils.copyBytes(in, out, conf);
        } finally {
          IOUtils.closeStream(in);
          IOUtils.closeStream(out);
        }
      }
    }
    // ^^ FileDecompressor
    
    

    如果使用的是原声代码库并且需要在应用中执行大量压缩和解压缩操作,可以考虑使用CodecPool,支持反复使用压缩和解压缩,以分摊创建这些对象的开销。

    // cc PooledStreamCompressor A program to compress data read from standard input and write it to standard output using a pooled compressor
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.compress.*;
    import org.apache.hadoop.util.ReflectionUtils;
    
    // vv PooledStreamCompressor
    public class PooledStreamCompressor {
    
      public static void main(String[] args) throws Exception {
        String codecClassname = args[0];
        Class<?> codecClass = Class.forName(codecClassname);
        Configuration conf = new Configuration();
        CompressionCodec codec = (CompressionCodec)
          ReflectionUtils.newInstance(codecClass, conf);
        /*[*/Compressor compressor = null;
        try {
          compressor = CodecPool.getCompressor(codec);/*]*/
          CompressionOutputStream out =
            codec.createOutputStream(System.out, /*[*/compressor/*]*/);
          IOUtils.copyBytes(System.in, out, 4096, false);
          out.finish();
        /*[*/} finally {
          CodecPool.returnCompressor(compressor);
        }/*]*/
      }
    }
    // ^^ PooledStreamCompressor
    
    

    序列化
    序列化是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。序列化用于分布式数据处理的两大领域:进程间通信和永久存储。反序列化是指将字节流转回结构化对象的逆过程。
    writable接口定义了两个方法:一个将其状态写入DataOutput二进制流,另一个从DataInput二进制流读取状态。


    writable类层次结构
    Java基本类型的writable封装器
    example:值163需要两个字节
    // StringTextComparisonTest Tests showing the differences between the String and Text classes对比String和Text
    import static org.hamcrest.CoreMatchers.is;
    import static org.junit.Assert.assertThat;
    
    import java.io.*;
    
    import org.apache.hadoop.io.Text;
    import org.junit.Test;
    
    // vv StringTextComparisonTest
    public class StringTextComparisonTest {
    
      @Test
      public void string() throws UnsupportedEncodingException {
        
        String s = "\u0041\u00DF\u6771\uD801\uDC00";
        assertThat(s.length(), is(5));
        assertThat(s.getBytes("UTF-8").length, is(10));
        
        assertThat(s.indexOf("\u0041"), is(0));
        assertThat(s.indexOf("\u00DF"), is(1));
        assertThat(s.indexOf("\u6771"), is(2));
        assertThat(s.indexOf("\uD801\uDC00"), is(3));
        
        assertThat(s.charAt(0), is('\u0041'));
        assertThat(s.charAt(1), is('\u00DF'));
        assertThat(s.charAt(2), is('\u6771'));
        assertThat(s.charAt(3), is('\uD801'));
        assertThat(s.charAt(4), is('\uDC00'));
        
        assertThat(s.codePointAt(0), is(0x0041));
        assertThat(s.codePointAt(1), is(0x00DF));
        assertThat(s.codePointAt(2), is(0x6771));
        assertThat(s.codePointAt(3), is(0x10400));
      }
      
      @Test
      public void text() {
        
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
        assertThat(t.getLength(), is(10));
        
        assertThat(t.find("\u0041"), is(0));
        assertThat(t.find("\u00DF"), is(1));
        assertThat(t.find("\u6771"), is(3));
        assertThat(t.find("\uD801\uDC00"), is(6));
    
        assertThat(t.charAt(0), is(0x0041));
        assertThat(t.charAt(1), is(0x00DF));
        assertThat(t.charAt(3), is(0x6771));
        assertThat(t.charAt(6), is(0x10400));
      }  
    }
    // ^^ StringTextComparisonTest
    
    

    基于文件的数据结构
    (1)写入SequenceFile对象

    // cc SequenceFileWriteDemo Writing a SequenceFile
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    
    // vv SequenceFileWriteDemo
    public class SequenceFileWriteDemo {
      
      private static final String[] DATA = {
        "One, two, buckle my shoe",
        "Three, four, shut the door",
        "Five, six, pick up sticks",
        "Seven, eight, lay them straight",
        "Nine, ten, a big fat hen"
      };
      
      public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
    
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
          writer = SequenceFile.createWriter(fs, conf, path,
              key.getClass(), value.getClass());
          
          for (int i = 0; i < 100; i++) {
            key.set(100 - i);
            value.set(DATA[i % DATA.length]);
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
            writer.append(key, value);
          }
        } finally {
          IOUtils.closeStream(writer);
        }
      }
    }
    // ^^ SequenceFileWriteDemo
    
    

    (2)读SequenceFile对象

    // cc SequenceFileReadDemo Reading a SequenceFile
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.util.ReflectionUtils;
    
    // vv SequenceFileReadDemo
    public class SequenceFileReadDemo {
      
      public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
    
        SequenceFile.Reader reader = null;
        try {
          reader = new SequenceFile.Reader(fs, path, conf);
          Writable key = (Writable)
            ReflectionUtils.newInstance(reader.getKeyClass(), conf);
          Writable value = (Writable)
            ReflectionUtils.newInstance(reader.getValueClass(), conf);
          long position = reader.getPosition();
          while (reader.next(key, value)) {
            String syncSeen = reader.syncSeen() ? "*" : "";
            System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
            position = reader.getPosition(); // beginning of next record
          }
        } finally {
          IOUtils.closeStream(reader);
        }
      }
    }
    // ^^ SequenceFileReadDemo
    

    (3)SequenceFile的格式
    顺序文件由文件头和随后的一条或多条记录组成。


    压缩前和压缩后顺序文件的内部结构

    记录的内部结构取决于是否启用压缩。如果已经启用压缩,则结构取决于是记录压缩还是数据块压缩。


    块压缩

    相关文章

      网友评论

          本文标题:【2019-05-03】Hadoop的IO操作

          本文链接:https://www.haomeiwen.com/subject/rxrmnqtx.html