AVRO

作者: kimibob | 来源:发表于2016-04-28 11:26 被阅读5080次

    Avro([ævrə])是Hadoop的一个子项目,由Hadoop的创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)牵头开发。Avro是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro提供的机制使动态语言可以方便地处理Avro数据。

    一、数据序列化/反序列化(data serialization/deserialization)

    Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小;而JSON一般用于调试系统或是基于WEB的应用。对Avro数据序列化/反序列化时都需要对模式以深度优先(Depth-First),从左到右(Left-to-Right)的遍历顺序来执行。
    Avro依赖模式(Schema)来实现数据结构定义。可以把模式理解为Java的类,它定义每个实例的结构,可以包含哪些属性。可以根据类来产生任意多个实例对象。对实例序列化操作时必须需要知道它的基本结构,也就需要参考类的信息。这里,根据模式产生的Avro对象类似于类的实例对象。每次序列化/反序列化时都需要知道模式的具体结构。所以,在Avro可用的一些场景下,如文件存储或是网络通信,都需要模式与数据同时存在。Avro数据以模式来读和写(文件或是网络),并且写入的数据都不需要加入其它标识,这样序列化时速度快且结果内容少。由于程序可以直接根据模式来处理数据,所以Avro更适合于脚本语言的发挥。

    1. 需要的Jar包依赖
      avro-1.7.3.jar,avro-tools-1.7.3.jar,jackson-core-asl-1.9.3.jar,jackson-mapper-asl-1.9.3.jar

    2. 定义模式(Schema)
      在avro中,它是用Json格式来定义模式的。模式可以由基础类型(null, boolean, int, long, float, double, bytes, and string)和复合类型(record, enum, array, map, union, and fixed)的数据组成。这里定义了一个简单的模式user.avsc:

    {
        "namespace": "com.zq.avro",
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "favorite_number", "type": ["int", "null"]},
            {"name": "favorite_color", "type": ["string", "null"]}
        ]
    }
    

    上面的模式是定义了一个用户的记录,在模式定义中,必须包含它的类型("type": "record")、一个名字("name": "User")以及fields。在本例中fields包括了name, favorite_number和favorite_color,上面的模式我们还定义了一个命名空间 ("namespace": "com.zq.avro"),namespace可以名字一起使用,从而组成模式的全名(本例为com.zq.avro.User)。

    1. 编译模式(compile schema)
      Avro可以允许我们根据模式的定义而生成相应的类,一旦我们定义好相关的类,程序中就不需要直接使用模式了。可以用avro-tools jar包根据user.avsc生成User.java,语法如下:
    java -jar avro-tools-1.7.4.jar compile schema . [注意这里有第三个参数"."]
    

    命令执行后会在当前目录根据设定的包结构生成一个User.java类,然后就可以将定义的User对象用avro将它序列化存放到本地文件中,再将其反序列化。

    1. 编写Java代码
    public static void main(String[] args) throws Exception {
            User user1 = new User();
            user1.setName("Arway");
            user1.setFavoriteNumber(3);
            user1.setFavoriteColor("green");
            User user2 = new User("Ben", 7, "red");
            // construct with builder
            User user3 = User.newBuilder().setName("Charlie").setFavoriteColor("blue").setFavoriteNumber(100).build();
            // Serialize user1, user2 and user3 to disk
            File file = new File("C:\\Users\\kimibob\\Desktop\\users.avro");
            DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
            DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
            try {
                dataFileWriter.create(user1.getSchema(), file);
                dataFileWriter.append(user1);
                dataFileWriter.append(user2);
                dataFileWriter.append(user3);
                dataFileWriter.close();
            } catch (IOException e) {
            }
            // Deserialize Users from dist
            DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
            DataFileReader<User> dataFileReader = null;
            try {
                dataFileReader = new DataFileReader<User>(file, userDatumReader);
            } catch (IOException e) {
            }
            User user = null;
            try {
                while (dataFileReader.hasNext()) {
                    // Reuse user object by passing it to next(). This saves
                    // us from allocating and garbage collecting many objects for
                    // files with many items.
                    user = dataFileReader.next(user);
                    System.out.println(user);
                }
            } catch (IOException e) {
            }
        }
    

    运行完这段代码之后,将会在磁盘产生users.avro文件,里面是用avro序列化user的二进制数据,再对其进行反序列化,在控制台输出文本json格式的数据。

    {"name": "Arway", "favorite_number": 3, "favorite_color": "green"}
    {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    {"name": "Charlie", "favorite_number": 100, "favorite_color": "blue"}
    

    对比可以看出序列化后的avro格式文件大小远小于文本格式,有利于节省网络传输的开销。

    二、Avro RPC框架

    • RPC逻辑上分为二层,一是传输层,负责网络通信;二是协议层,将数据按照一定协议格式打包和解包
    • 从序列化方式来看,Apache Thrift 和Google的Protocol Buffers和Avro应该是属于同一个级别的框架,都能跨语言,性能优秀,数据精简,但是Avro的动态模式(不用生成代码,而且性能很好)这个特点让人非常喜欢,比较适合RPC的数据交换。
    • Avro RPC 是一个支持跨语言实现的RPC服务框架。非常轻量级,实现简洁,使用方便,同时支持使用者进行二次开发,逻辑上该框架分为两层:
      1.网络传输层使用Netty的Nio实现。
      2.协议层可扩展,目前支持的数据序列化方式有Avro, Protocol Buffers, Json, Hessian,Java序列化。 使用者可以注册自己的协议格式及序列化方式。

    上面是将Avro对象序列化到文件的操作。与之相应的,Avro也被作为一种RPC框架来使用。客户端希望同服务器端交互时,就需要交换双方通信的协议,它类似于模式,需要双方来定义,在Avro中被称为消息(Message)。通信双方都必须保持这种协议,以便于解析从对方发送过来的数据,这也就是传说中的握手(handshake)阶段。

    Avro RPC开发
    1. 需要的Jar包依赖
      avro-1.7.3.jar, avro-ipc-1.7.3.jar, netty-3.5.12.Final.jar, slf4j-api-1.6.1.jar
    2. 定义协议模式(protocol Schema)
    {"namespace": "com.zq.avro",
     "protocol": "Mail",
    
     "types": [
         {"name": "Message", "type": "record",
          "fields": [
              {"name": "to",   "type": "string"},
              {"name": "from", "type": "string"},
              {"name": "body", "type": "string"}
          ]
         }
     ],
     "messages": {
         "send": {
             "request": [{"name": "message", "type": "Message"}],
             "response": "string"
         }
     }
    }
    
    1. 编译模式(compile schema)
    java -jar avro-tools-1.7.3.jar compile protocol mail.avpr . [注意这里有第三个参数"."]
    

    命令执行后会在当前目录根据设定的包结构生成一个Mail接口和一个Message类。

    1. 编写Java代码
      AvroServer类:
    class MailImpl implements Mail {
    
        @Override
        public CharSequence send(Message message) throws AvroRemoteException {
            System.out.println("Message Received:" + message);
            return new Utf8("Received your message: " + message.getFrom().toString() + " with body "
                    + message.getBody().toString());
            }
    }
    public class AvroServer {
    
        private static Server server;
    
        public static void main(String[] args) throws Exception {
    
            System.out.println("Starting server");
            startServer();
            Thread.sleep(1000);
            System.out.println("Server started");
            Thread.sleep(60 * 1000);
            server.close();
        }
    
        private static void startServer() throws IOException {
            server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));
        }
    }
    

    AvroClient类:

    public class AvroClient {
        public static void main(String[] args) throws Exception {
            NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111));
            /// 获取Mail接口的proxy实现
            Mail proxy = SpecificRequestor.getClient(Mail.class, client);
            System.out.println("Client of Mail Proxy is built");
    
            // fill in the Message record and send it
            args = new String[] { "to:Tom", "from:Jack", "body:How are you" };
            Message message = new Message();
            message.setTo(new Utf8(args[0]));
            message.setFrom(new Utf8(args[1]));
            message.setBody(new Utf8(args[2]));
            System.out.println("RPC call with message:  " + message.toString());
    
            /// 底层给服务器发送send方法调用
            System.out.println("Result: " + proxy.send(message));
    
            // cleanup
            client.close();
        }
    }
    

    扩展:

    • Netty是什么?
      本质:JBoss做的一个Jar包
      目的:快速开发高性能、高可靠性的网络服务器和客户端程序
      优点:提供异步的、事件驱动的网络应用程序框架和工具
      通俗的说:Netty是一个NIO的框架,可以用于开发分布式的Java程序
    • 如果没有Netty?
      远古:java.net + java.io
      近代:java.nio
      其他:Mina,Grizzly

    相关文章

      网友评论

      • 我的星辰大海:我们用此rpc,每写一个服务,有新的类型结构,是不是每次都要自己生成proto文件。那还不如dubbo。你怎么看

      本文标题:AVRO

      本文链接:https://www.haomeiwen.com/subject/hgxhrttx.html