美文网首页
【2019-05-29】关于Avro

【2019-05-29】关于Avro

作者: BigBigFlower | 来源:发表于2019-05-29 20:40 被阅读0次

    Apache Avro 是一个独立于编程语言的数据序列化系统。解决Hadoop中的writable类型的不足:缺乏语言的可移植性。

    Avro数据类型和模式


    Avro基本类型
    Avro复杂类型
    Avro复杂类型(续)
    Avro的Java类型映射

    内存中的序列化和反序列化

    // cc AvroGenericMaxTemperature MapReduce program to find the maximum temperature, creating Avro output
    
    import java.io.IOException;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.mapred.AvroKey;
    import org.apache.avro.mapred.AvroValue;
    import org.apache.avro.mapreduce.AvroJob;
    import org.apache.avro.mapreduce.AvroKeyOutputFormat;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    //vv AvroGenericMaxTemperature
    public class AvroGenericMaxTemperature extends Configured implements Tool {
      
      private static final Schema SCHEMA = new Schema.Parser().parse(
          "{" +
          "  \"type\": \"record\"," +
          "  \"name\": \"WeatherRecord\"," +
          "  \"doc\": \"A weather reading.\"," +
          "  \"fields\": [" +
          "    {\"name\": \"year\", \"type\": \"int\"}," +
          "    {\"name\": \"temperature\", \"type\": \"int\"}," +
          "    {\"name\": \"stationId\", \"type\": \"string\"}" +
          "  ]" +
          "}"
      );
    
      public static class MaxTemperatureMapper
          extends Mapper<LongWritable, Text, AvroKey<Integer>,
                AvroValue<GenericRecord>> {
        private NcdcRecordParser parser = new NcdcRecordParser();
        private GenericRecord record = new GenericData.Record(SCHEMA);
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
          parser.parse(value.toString());
          if (parser.isValidTemperature()) {
            record.put("year", parser.getYearInt());
            record.put("temperature", parser.getAirTemperature());
            record.put("stationId", parser.getStationId());
            context.write(new AvroKey<Integer>(parser.getYearInt()),
                new AvroValue<GenericRecord>(record));
          }
        }
      }
      
      public static class MaxTemperatureReducer
          extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,
                AvroKey<GenericRecord>, NullWritable> {
    
        @Override
        protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>
            values, Context context) throws IOException, InterruptedException {
          GenericRecord max = null;
          for (AvroValue<GenericRecord> value : values) {
            GenericRecord record = value.datum();
            if (max == null || 
                (Integer) record.get("temperature") > (Integer) max.get("temperature")) {
              max = newWeatherRecord(record);
            }
          }
          context.write(new AvroKey(max), NullWritable.get());
        }
        private GenericRecord newWeatherRecord(GenericRecord value) {
          GenericRecord record = new GenericData.Record(SCHEMA);
          record.put("year", value.get("year"));
          record.put("temperature", value.get("temperature"));
          record.put("stationId", value.get("stationId"));
          return record;
        }
      }
    
      @Override
      public int run(String[] args) throws Exception {
        if (args.length != 2) {
          System.err.printf("Usage: %s [generic options] <input> <output>\n",
              getClass().getSimpleName());
          ToolRunner.printGenericCommandUsage(System.err);
          return -1;
        }
    
        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());
    
        job.getConfiguration().setBoolean(
            Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
        AvroJob.setMapOutputValueSchema(job, SCHEMA);
        AvroJob.setOutputKeySchema(job, SCHEMA);
    
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
    
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
    
        return job.waitForCompletion(true) ? 0 : 1;
      }
      
      public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
        System.exit(exitCode);
      }
    }
    // ^^ AvroGenericMaxTemperature
    
    

    使用Avro mapReduce进行排序

    // cc AvroSort A MapReduce program to sort an Avro data file
    
    import java.io.File;
    import java.io.IOException;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.mapred.AvroKey;
    import org.apache.avro.mapred.AvroValue;
    import org.apache.avro.mapreduce.AvroJob;
    import org.apache.avro.mapreduce.AvroKeyInputFormat;
    import org.apache.avro.mapreduce.AvroKeyOutputFormat;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    //vv AvroSort
    public class AvroSort extends Configured implements Tool {
    
      static class SortMapper<K> extends Mapper<AvroKey<K>, NullWritable,
          AvroKey<K>, AvroValue<K>> {
        @Override
        protected void map(AvroKey<K> key, NullWritable value,
            Context context) throws IOException, InterruptedException {
          context.write(key, new AvroValue<K>(key.datum()));
        }
      }
    
      static class SortReducer<K> extends Reducer<AvroKey<K>, AvroValue<K>,
          AvroKey<K>, NullWritable> {
        @Override
        protected void reduce(AvroKey<K> key, Iterable<AvroValue<K>> values,
            Context context) throws IOException, InterruptedException {
          for (AvroValue<K> value : values) {
            context.write(new AvroKey(value.datum()), NullWritable.get());
          }
        }
      }
    
      @Override
      public int run(String[] args) throws Exception {
        
        if (args.length != 3) {
          System.err.printf(
            "Usage: %s [generic options] <input> <output> <schema-file>\n",
            getClass().getSimpleName());
          ToolRunner.printGenericCommandUsage(System.err);
          return -1;
        }
        
        String input = args[0];
        String output = args[1];
        String schemaFile = args[2];
    
        Job job = new Job(getConf(), "Avro sort");
        job.setJarByClass(getClass());
    
        job.getConfiguration().setBoolean(
            Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));
    
        AvroJob.setDataModelClass(job, GenericData.class);
    
        Schema schema = new Schema.Parser().parse(new File(schemaFile));
        AvroJob.setInputKeySchema(job, schema);
        AvroJob.setMapOutputKeySchema(job, schema);
        AvroJob.setMapOutputValueSchema(job, schema);
        AvroJob.setOutputKeySchema(job, schema);
    
        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
    
        job.setOutputKeyClass(AvroKey.class);
        job.setOutputValueClass(NullWritable.class);
    
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
    
        return job.waitForCompletion(true) ? 0 : 1;
      }
      
      public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvroSort(), args);
        System.exit(exitCode);
      }
    }
    // ^^ AvroSort
    
    

    相关文章

      网友评论

          本文标题:【2019-05-29】关于Avro

          本文链接:https://www.haomeiwen.com/subject/wraioqtx.html