美文网首页程序员
avro源码阅读-读avro文件

avro源码阅读-读avro文件

作者: 天不沽 | 来源:发表于2017-05-30 17:06 被阅读231次

    在实际工作中,会将avro文件读到一个类中。下面就看看代码是如何实现的(这里看的是avro-1.7.7版本)。

    读取整个avro文件的类是DataFileReader,用于读取data的类是GenericDatumReader(data指的是用户的数据),有关avro文件结构可参见avro源码阅读-写avro文件中的图。

    我们从AvroPairInputFormat切入看下DataFileReader和SpecificDatumReader的使用。

    如果你看过了avro源码阅读-写avro文件,那你会觉得很熟悉。
    在读avro文件时,有DataFileReader和AvroPairInputFormat。
    在写avro文件时,有DataFileWrite和AvroPairOutputFormat。

    AvroInputFormat

    1. AvroInputFormat.getRecordReader()返回的是AvroRecordReader;
    2. AvroRecordReader.next()读取下一条记录;
    3. AvroRecordReader.next()又调用了DataFileReader.next();
    4. DataFileReader.next()调用了GenericDatumReader.read()。
      其中,
    5. DataFileReader的next()和reader继承自DataFileStream;
    6. GenericDatumReader.read(reuse, decoder)参数中的decoder是BinaryDecoder。

    类的继承及引用关系图如下:

    GenericDatumReader有关类的继承及引用图

    对于DataFileReader读取整个avro文件的代码这里不再详述,这里主要关心GenericDatumReader是如何读取data的。

    GenericDatumReader

    GenericDatumReader.read()的构造了一个ResolvingDecoder。ResolvingDecoder维护了一个解码器Decoder和解析器SkipParser。SkipParser维护了一个语法堆栈,用于指导下一步该如何处理(如是读数据还是跳过,若读数据是什么类型的数据)。Decoder实际上是BinaryDecoder,在需要读取数据时,由它从avro文件中读取一小块内容到内存中。ResolvingDecoder还引用了ResolvingGrammarGenerator,该类用于生成语法,语法由Symbol类表示。这几个类的关系图如下所示:

    ResolvingDecoder有关类的继承和引用
    1. Symbol
      SkipParser维护了一个语法堆栈(从Parser继承而来,实现上是一个数组),Symbol就是存储在堆栈中的元素。Symbol就像是写avro文件时用到的Schema。写avro文件时,是将一条数据按照schema写到文件中;而读avro文件时,则是按照Symbol将一条数据读取到内存中。

    2. SkipParser
      SkipParser维护了一个Symbol的数组,也就是一个堆栈,每次弹出栈顶元素,然后执行相应动作:
      -将栈顶元素扩展后再压栈;
      -直接从avro文件中读取一小块数据;
      -其他操作比如跳过。

    3. ResolvingGrammarGenerator
      ResolvingGrammarGenerator用于生成压入堆栈中的第一个元素(根元素)。

    代码里对这几个类的注释:
    Symbol:Symbol is the base of all symbols (terminals and non-terminals) of the grammar.
    SkipParser:A parser that capable of skipping as well read and write. This class is used by decoders who (unlink encoders) are required to implement methods to skip.
    ResolvingGrammarGenerator:The class that generates validating grammar.

    Symbol

    和Schema一样,Symbol是个抽象类,它有很多具体的实现。这里关注三种实现:

    1. Terminal: 一个基本类型如int、long等,就对应一个terminal。当弹出的栈顶元素为terminal时,就表示应该从数据中读取某个类型对应大小的bytes了。
    2. Sequence:比如说一个类,就对应一个Sequence。
      Symbol本身又维护了一个Symbol数组production,production表示该Symbol的产物。
      Symbol本身包含一个Symbol数组,就像类包含的成员变量也可以是其他类一样。
      他们的对应关系可描述为下图:


      Sequence和Class的对应关系

      Terminal的production是空的,而Sequence的production包含了其产物。

    3. Root:根Symbol,就是第一个压入语法堆栈的元素。

    GenericDatumReader.read()代码如下:
    <pre>
    public D read(D reuse, Decoder in) throws IOException {
    ResolvingDecoder resolver = getResolver(actual, expected); //ResolvingDecoder的初始化
    resolver.configure(in); //将ResolvingDecoder 的解码器配置为in(即BInaryDecoder)
    D result = (D) read(reuse, expected, resolver); //读取一条记录到reuse中
    resolver.drain();
    return result;
    }
    </pre>

    下面分别介绍:

    1. ResolvingDecoder的初始化;
    2. 读取一条记录到reuse中

    ResolvingDecoder的初始化

    ResolvingDecoder初始化的时候,new了一个SkipParser,生成Root Symbol并将其压入了SkipParser维护的堆栈中。初始化中复杂之处在于Root Symbol的生成,生成Root Symbol的代码如下:
    <pre>
    return Symbol.root(generate(writer, reader, new HashMap<LitS, Symbol>()));
    </pre>

    这里有两步调用:

    1. generate(writer, reader, map);
    2. Symbol.root(symbol)

    generate(writer, reader, map)

    writer指的是avro文件中记录的schema,reader指的是类的schema。

    1. 当writer和reader的类型不同时,generate()会推测可能是UNION导致的,或者进行一些类型转换,否则会抛出异常;

    2. 当writer和reader的类型相同时,代码如下:
      <pre>
      public Symbol generate(Schema writer, Schema reader,
      Map<LitS, Symbol> seen) throws IOException {
      final Schema.Type writerType = writer.getType();
      final Schema.Type readerType = reader.getType();

      if (writerType == readerType) {
      switch (writerType) {
      case NULL:
      return Symbol.NULL;
      case BOOLEAN:
      return Symbol.BOOLEAN;
      case INT:
      return Symbol.INT;
      case LONG:
      return Symbol.LONG;
      ...
      case RECORD:
      return resolveRecords(writer, reader, seen);
      ...
      }
      </pre>

    即使类型相同,writer和reader的schema也可能有所差别,比如用新的类(可能增加了一些字段,或是字段顺序有所调整)来读取历史文件。resolveRecords()会综合writerSchema和readerSchema中的字段,生成并返回一个Symbol。主要的情况有下面两种:
    -wirterSchema中有,但是readerSchema中没有的字段,将会被跳过;
    -readerSchema中有,但是writerSchema中没有的字段,必须有默认值,否则就会出错。
    resolveRecords()的代码如下:
    <pre>
    private Symbol resolveRecords(Schema writer, Schema reader,
    Map<LitS, Symbol> seen) throws IOException {
    LitS wsc = new LitS2(writer, reader);
    Symbol result = seen.get(wsc); //先从缓存中取(根据writer和reader构造symbol也是一件比较耗时的事情)
    if (result == null) {
    List<Field> wfields = writer.getFields();
    List<Field> rfields = reader.getFields();

    // First, compute reordering of reader fields, plus
    // number elements in the result's production
    Field[] reordered = new Field[rfields.size()];
    int ridx = 0;
    int count = 1 + wfields.size(); //count是Symbol的产物production的大小,也就是writerSchema展开后字段的个数(既然是Record类型,肯定是有产物的嘛)
    //这里的"1"被fieldOrderAction占用的

    //将在writerSchema和readerSchema中都存在的字段,按照writerSchema中的顺序,存储在reordered中
    //因为最终是要从文件中依次将数据读出来,所以要按照writerSchema中的字段顺序
    for (Field f : wfields) {
    Field rdrField = reader.getField(f.name());
    if (rdrField != null) {
    reordered[ridx++] = rdrField;
    }
    }

    //处理readerSchema中有,而writerSchema中没有的字段(如类新增了一个字段,这在历史数据中是没有的)
    //此时,该字段必须有自己的默认值,否则就会返回错误信息
    for (Field rf : rfields) {
    String fname = rf.name();
    if (writer.getField(fname) == null) {
    if (rf.defaultValue() == null) {
    result = Symbol.error("Found " + writer.getFullName()
    + ", expecting " + reader.getFullName()
    + ", missing required field " + fname);
    seen.put(wsc, result);
    return result;
    } else {
    reordered[ridx++] = rf;
    count += 3;//使用默认值的字段需要占用3个位置
    }
    }
    }

    Symbol[] production = new Symbol[count]; //初始化production数组
    production[--count] = Symbol.fieldOrderAction(reordered); //FieldOrderAction记录了production中其他各个字段的schema

    /**

    • We construct a symbol without filling the array. Please see
    • {@link Symbol#production} for the reason.
      */
      result = Symbol.seq(production); //生成了一个Sequence Symbol
      seen.put(wsc, result); //缓存

    /*

    • For now every field in read-record with no default value
    • must be in write-record.
    • Write record may have additional fields, which will be
    • skipped during read.
      */

    // Handle all the writer's fields
    for (Field wf : wfields) {
    String fname = wf.name();
    Field rf = reader.getField(fname);
    if (rf == null) { //如果writerSchema中有的字段,readerSchema中没有(比如类的某些字段删掉了,但是历史数据中还存在),则标识为跳过
    production[--count] =
    Symbol.skipAction(generate(wf.schema(), wf.schema(), seen));
    } else {
    production[--count] =
    generate(wf.schema(), rf.schema(), seen); //递归的调用generate()(产物production中可能也有一些复杂的结构)
    }
    }

    // Add default values for fields missing from Writer
    for (Field rf : rfields) {
    String fname = rf.name();
    Field wf = writer.getField(fname);
    if (wf == null) { //处理readerSchema中有,而writerSchema中没有,但是有默认值的字段(上面提到过了),可以看到,这里需要使用三个位置
    byte[] bb = getBinary(rf.schema(), rf.defaultValue());
    production[--count] = Symbol.defaultStartAction(bb);
    production[--count] = generate(rf.schema(), rf.schema(), seen);
    production[--count] = Symbol.DEFAULT_END_ACTION;
    }
    }
    }
    return result;
    }
    </pre>

    举个栗子,如writerSchema和readerSchema都是下面这样。
    <pre>
    {
    "type": "record",
    "name": "User",
    "namespace": "test",
    "fields": [
    {
    "name": "id",
    "type": "int"
    },
    {
    "name": "name",
    "type": "string",
    "default": null
    },
    {
    "name": "dog",
    "type": {
    "name": "Dog",
    "type": "record",
    "fields": [
    {
    "name": "id",
    "type": "int"
    },
    {
    "name": "name",
    "type": "string",
    "default": null
    }
    ]
    }
    }
    ]
    }
    </pre>

    最终generate返回的Symbol结构如下(图最左边的sequenceSymbol0即是返回的结果。图中方框左侧的数字代表production数组的下标,注意,各个Symbol存储的顺序和schema中的顺序正好相反)。

    sequenceSymbol0

    Symbol.root(symbol)

    root()会new一个RootSymbol,并将上面生成的sequenceSymbol0展开,作为该RootSymbol的production。这里最复杂的就在于将sequenceSymbol0展开的操作flatten()。这里关注两个flatten()。

    1. Symbol类的静态方法
      将In:Symbol[]平铺开为out:Symbol[],大概就是下面这么个意思。


      in2out

      代码如下:
      <pre>
      static void flatten(Symbol[] in, int start,
      Symbol[] out, int skip,
      Map<Sequence, Sequence> map,
      Map<Sequence, List<Fixup>> map2) {
      for (int i = start, j = skip; i < in.length; i++) {
      Symbol s = in[i].flatten(map, map2); //如果in[i]是一个Terminal,则直接返回其本身
      //如果in[i]是一个Sequence,则递归的展开自身
      //其他类型的Symbol在此不再关注
      if (s instanceof Sequence) {
      Symbol[] p = s.production;
      List<Fixup> l = map2.get(s);
      if (l == null) {
      System.arraycopy(p, 0, out, j, p.length);
      } else {
      l.add(new Fixup(out, j));
      }
      j += p.length;
      } else { //如果不是Sequence,直接将in赋值给out
      out[j++] = s;
      }
      }
      }
      </pre>

    2. Sequence的私有方法
      将sequence的production铺开。将上图"in2out"中的in定义为某个Sequence展开前prodcution,out定义为某个Sequence展开后的production,则上图即可表示这个方法的意思。
      <pre>
      @Override
      public Sequence flatten(Map<Sequence, Sequence> map,
      Map<Sequence, List<Fixup>> map2) {
      Sequence result = map.get(this);
      if (result == null) {
      result = new Sequence(new Symbol[flattenedSize()]); //初始化展开后的Sequence(flattenedSize也是个递归调用,用于获取一个Sequence的production层层展开后的Symbol的个数)
      map.put(this, result); //缓存展开结果
      List<Fixup> l = new ArrayList<Fixup>();
      map2.put(result, l);

      flatten(production, 0,
      result.production, 0, map, map2);
      for (Fixup f : l) {
      System.arraycopy(result.production, 0, f.symbols, f.pos,
      result.production.length);
      }
      map2.remove(result);
      }
      return result;
      }
      </pre>

    最简单的情况下,假设sequenceSymbol0只有一堆terminalSymbol,那就只调用了Symbol类静态的flatten(),直接将in赋值给了out。

    复杂一点的情况下,如sequenceSymbol0还包含了sequenceSymbol1,或是嵌套着更多类的引用。此时,两个flatten()会相互递归调用,但最终,总会有一个sequence只包含Terminal之类的Symbol,从而结束递归。此时,代码中的map可以用来缓存展开结果,比如ClassA中有两个ClassB成员的情况。

    不过,还有个关键的地方没弄懂。据注释所说,map、map2以及Fixup可用来解决无限递归的。比如,设result为SequenceA扩展后的结果,在result的production填充之前,便存储在map中。下次因递归而再次遇到SequenceA时,则直接从map中取到result。但此时,result的production并未被填充,而是依赖后续的Fixup进行填充。
    可能产生无限递归的情况我只想到一种,那就是两个类之间相互引用(即ClassA引用了ClassB,ClassB又引用了ClassA)。但是代码最终未能调试起来,因为在这种情况下,flattenedSize()没能规避无限递归,导致了StackOverflowError。
    导致问题的代码如下。
    类A
    <pre>
    public class A {
    public B b;
    }
    </pre>
    类B
    <pre>
    public class B {
    public A a;
    }
    </pre>
    主函数
    <pre>
    public static void main(String[] args) throws IOException {
    Schema s = ReflectData.get().getSchema(A.class);
    DecoderFactory.get().resolvingDecoder(
    Schema.applyAliases(s, s), s, null);
    }
    </pre>

    仍以上面的sequenceSymbol0为例,最终生成的Root Symbol及语法堆栈如下图所示。

    stack初始状态

    读取一条记录到reuse中

    此时,resolver已经初始化好了,并且语法堆栈已经有了一个Root Symbol,接下来就是基于语法堆栈读取数据。

    read(reuse, expected, resolver)根据expected(也就是类的schema)的类型不同,调用不同的方法,代码如下。
    <pre>
    protected Object read(Object old, Schema expected,
    ResolvingDecoder in) throws IOException {
    switch (expected.getType()) {
    case RECORD: return readRecord(old, expected, in);
    case ENUM: return readEnum(expected, in);
    ....
    }
    }
    </pre>

    这里主要还是看readRecord(),代码如下。
    <pre>
    protected Object readRecord(Object old, Schema expected,
    ResolvingDecoder in) throws IOException {
    ...
    for (Field f : in.readFieldOrder()) { //获取record的fields
    ...
    readField(r, f, oldDatum, in, state); //读取field,其中又递归的调用了read()
    }
    ...
    }
    </pre>

    其中,for循环内递归调用了上面的read(),不再细述。这里主要看一下代码是如何获取record的fields的。

    readFieldOrder()就一行代码,调用了Parser.advance()。
    <pre>return ((Symbol.FieldOrderAction) parser.advance(Symbol.FIELD_ACTION)).fields;</pre>

    Parser维护了语法堆栈,advance()顾名思义,也即弹出栈顶,并据此进行一次处理。
    advance()代码如下。
    <pre>
    public final Symbol advance(Symbol input) throws IOException {
    for (; ;) {
    Symbol top = stack[--pos]; //栈顶出栈
    if (top == input) { //如top和input都是一个terminal,像readDouble()、readInt()这些方法,就会执行这个分支
    return top; // A common case
    }

      Symbol.Kind k = top.kind;
      if (k == Symbol.Kind.IMPLICIT_ACTION) { //一个隐式动作,这里的doAction()其实就是ResolvingDecoder.doAction()
        Symbol result = symbolHandler.doAction(input, top);
        if (result != null) {
          return result;
        }
      } else if (k == Symbol.Kind.TERMINAL) { //如果input是一个terminal,但是栈顶却不是terminal,抛出异常
        throw new AvroTypeException("Attempt to process a "
                + input + " when a "
                + top + " was expected.");
      } else if (k == Symbol.Kind.REPEATER
          && input == ((Symbol.Repeater) top).end) {
        return input;
      } else {
        pushProduction(top); //将栈顶的production压栈
      }
    }
    

    }
    </pre>

    第一个栈顶是Root Symbol,所以执行的是最后一个分支:pushProduction()。pushProdcution将栈顶(刚刚出栈)的production压栈。如果堆栈大小不够,扩展堆栈。
    <pre>
    public final void pushProduction(Symbol sym) {
    Symbol[] p = sym.production;
    while (pos + p.length > stack.length) { //大小不够,则扩栈
    expandStack();
    }
    System.arraycopy(p, 0, stack, pos, p.length);
    pos += p.length;
    }

    </pre>

    执行完pushProduction()后,堆栈成为了下面这个样子。

    pushProduction()后的stack

    之后会继续执行for()循环,继续弹出栈顶。
    此时的栈顶是一个FieldOrderAction1,FieldOrderAction属于IMPLICIT_ACTION。此时,将会执行symbolHandler.doAction()。
    这里的symbolHandler其实就是ResolvingDecoder,从ResolvingDecoder.doAction()中可以看到哪些Symbol属于IMPLICT_ACTION。这里不再深入。
    doAction()的开头就是对FieldOrderAction的处理,可以看到直接返回了top(也即返回了FieldOrderAction1)。
    <pre>
    public Symbol doAction(Symbol input, Symbol top) throws IOException {
    if (top instanceof Symbol.FieldOrderAction) {
    return input == Symbol.FIELD_ACTION ? top : null;
    ...
    }
    </pre>

    这就将FieldOrderAction1取出来了。FieldOrderAction1中包含了intSchema、stringSchema和recordSchema,在遍历到recordSchema时,又会递归的调用readRecord()。

    left

    还有问题没搞清楚,先搁着吧。
    RootSymbol的production为何提前铺开了?不铺开行么?
    GenericDatumReader.read()返回的是GenericData,GenericData最终是如何映射到类上的?

    相关文章

      网友评论

        本文标题:avro源码阅读-读avro文件

        本文链接:https://www.haomeiwen.com/subject/cyyrfxtx.html