美文网首页我爱编程
Apache Avro序列化/反序列化数据及Spark读取avr

Apache Avro序列化/反序列化数据及Spark读取avr

作者: 就一个名字而已 | 来源:发表于2017-03-27 22:04 被阅读0次
    导语

    本篇文章主要讲如何使用Apache Avro序列化数据以及如何通过spark将序列化数据转换成DataSet和DataFrame进行操作。

    Apache Arvo是什么?


    Apache Avro 是一个数据序列化系统。

    1. 支持丰富的数据结构
    1. 快速可压缩的二进制数据格式
    2. 存储持久数据的文件容器
    3. 远程过程调用(RPC)
    4. 动态语言的简单集成

    Avro提供Java、Python、C、C++、C#等语言API接口,下面我们通过java的一个实例来说明Avro序列化和反序列化数据。


    Avro官网:http://avro.apache.org/
    Avro版本:1.8.1
    下载Avro相关jar包:avro-tools-1.8.1.jar 该jar包主要用户将定义好的schema文件生成对应的java文件

    定义一个schema文件,命名为CustomerAdress.avsc,格式如下:

    {
      "namespace":"com.peach.arvo",
      "type": "record",
      "name": "CustomerAddress",
      "fields": [
        {"name":"ca_address_sk","type":"long"},
        {"name":"ca_address_id","type":"string"},
        {"name":"ca_street_number","type":"string"},
        {"name":"ca_street_name","type":"string"},
        {"name":"ca_street_type","type":"string"},
        {"name":"ca_suite_number","type":"string"},
        {"name":"ca_city","type":"string"},
        {"name":"ca_county","type":"string"},
        {"name":"ca_state","type":"string"},
        {"name":"ca_zip","type":"string"},
        {"name":"ca_country","type":"string"},
        {"name":"ca_gmt_offset","type":"double"},
        {"name":"ca_location_type","type":"string"}
      ]
    }``` 
    * namespace:在生成java文件时import包路径
    * type:omplex types(record, enum, array, map, union, and fixed)
    * name:生成java文件时的类名
    * fileds:schema中定义的字段及类型
    
    在这里schema文件定义完成后,通过上面下载的avro-tools-1.8.1.jar包,来生成java code,命令如下:
    ```java -jar avro-tools-1.8.1.jar compile schema CustomerAddress.avsc .```
    末尾的"."代表java code 生成在当前目录,命令执行成功后显示:
    ![生成javacode](https://img.haomeiwen.com/i4861551/84cc805ecf1dfa48.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    在当前目录的com/peach/avro/目录下有生成相应的CustomerAddress.java文件,待工程创建后使用。
    ####使用maven创建一个java工程,下面为工程的目录结构
    <p>添加maven依赖:</p>
    
    
        <dependency>
            <groupId>org.apache.avro</groupId>  
            <artifactId>avro</artifactId>  
            <version>1.8.1</version>  
        </dependency>  
    
    
    ![maven工程目录结构](https://img.haomeiwen.com/i4861551/45804c10c2d94871.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    编写代码生成avro数据文件,代码片段
    

    package com.peach;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    import java.util.StringTokenizer;
    import org.apache.avro.file.DataFileWriter;
    import org.apache.avro.io.DatumWriter;
    import org.apache.avro.specific.SpecificDatumWriter;
    import com.peach.arvo.CustomerAddress;

    /**

    • @author peach

    • 2017-03-02

    • 主要用于生成avro数据文件
      */
      public class GenerateDataApp {
      // private static String customerAddress_avsc_path;
      //
      // static {
      // customerAddress_avsc_path = GenerateDataApp.class.getClass().getResource("/CustomerAddress.avsc").getPath();
      // }
      private static String source_data_path = "F:\data\customer_address.dat"; //源数据文件路 径
      private static String dest_avro_data_path = "F:\data\customeraddress.avro"; //生成的avro数据文件路径

      public static void main(String[] args) {

       try {  
      

    // if(customerAddress_avsc_path != null) {
    // File file = new File(customerAddress_avsc_path);
    // Schema schema = new Schema.Parser().parse(file);
    // }
    DatumWriter<CustomerAddress> caDatumwriter = new SpecificDatumWriter<>(CustomerAddress.class);
    DataFileWriter<CustomerAddress> dataFileWriter = new DataFileWriter<>(caDatumwriter);
    dataFileWriter.create(new CustomerAddress().getSchema(), new File(dest_avro_data_path));
    loadData(dataFileWriter);
    dataFileWriter.close();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    /**  
     * 加载源数据文件  
     * @param dataFileWriter  
     */  
    private static void loadData(DataFileWriter<CustomerAddress> dataFileWriter) {  
        File file = new File(source_data_path);  
        if(!file.isFile()) {  
            return;  
        }  
        try {  
            InputStreamReader isr = new InputStreamReader(new FileInputStream(file));  
            BufferedReader reader = new BufferedReader(isr);  
            String line;  
            CustomerAddress address;  
            while ((line = reader.readLine()) != null) {  
                address = getCustomerAddress(line);  
                if (address != null) {  
                    dataFileWriter.append(address);  
                }  
            }  
            isr.close();  
            reader.close();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
    
    /**  
     * 通过记录封装CustomerAddress对象  
     * @param line  
     * @return  
     */  
    private static CustomerAddress getCustomerAddress(String line) {  
        CustomerAddress ca = null;  
        try {  
            if (line != null && line != "") {  
                StringTokenizer token = new StringTokenizer(line, "|"); //使用stringtokenizer拆分字符串时,会去自动除""类型  
                if(token.countTokens() >= 13) {  
                    ca = new CustomerAddress();  
                    ca.setCaAddressSk(Long.parseLong(token.nextToken()));  
                    ca.setCaAddressId(token.nextToken());  
                    ca.setCaStreetNumber(token.nextToken());  
                    ca.setCaStreetName(token.nextToken());  
                    ca.setCaStreetType(token.nextToken());  
                    ca.setCaSuiteNumber(token.nextToken());  
                    ca.setCaCity(token.nextToken());  
                    ca.setCaCounty(token.nextToken());  
                    ca.setCaState(token.nextToken());  
                    ca.setCaZip(token.nextToken());  
                    ca.setCaCountry(token.nextToken());  
                    ca.setCaGmtOffset(Double.parseDouble(token.nextToken()));  
                    ca.setCaLocationType(token.nextToken());  
                } else {  
                    System.err.println(line);  
                }  
            }  
        } catch (NumberFormatException e) {  
            System.err.println(line);  
        }  
    
        return ca;  
    }  
    

    }

    
    动态生成avro文件,通过将数据封装为GenericRecord对象,动态的写入avro文件,以下代码片段
    

    private static void loadData(DataFileWriter<GenericRecord> dataFileWriter, Schema schema) {
    File file = new File(sourcePath);
    if(file == null) {
    logger.error("[peach], source data not found");
    return ;
    }

        InputStreamReader inputStreamReader = null;  
        BufferedReader bufferedReader = null;  
        try {  
            inputStreamReader = new InputStreamReader(new FileInputStream(file));  
            bufferedReader = new BufferedReader(inputStreamReader);  
            String line;  
            GenericRecord genericRecord;  
            while((line = bufferedReader.readLine()) != null) {  
                if(line != "") {  
                    String[] values = line.split("\\|");  
                    genericRecord = SchemaUtil.convertRecord(values, schema);  
                    if(genericRecord != null) {  
                        dataFileWriter.append(genericRecord);  
                    }  
                }  
            }  
    
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if(bufferedReader != null) {  
                    bufferedReader.close();  
                }  
                if(inputStreamReader != null) {  
                    inputStreamReader.close();  
                }  
            } catch (IOException e) {  
            }  
        }  
    
    }  
    
    
    avro文件生成完成后,创建scala工程,使用sparkapi读取avro文件,添加spark maven 依赖
    
        <dependency>  
            <groupId>com.peach</groupId>  
            <artifactId>generatedata</artifactId>  
            <version>1.0-SNAPSHOT</version>  
        </dependency>  
        <dependency>  
            <groupId>com.databricks</groupId>  
            <artifactId>spark-avro_2.10</artifactId>  
            <version>2.1.0</version>  
        </dependency>  
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>  
            <groupId>org.apache.spark</groupId>  
            <artifactId>spark-sql_2.10</artifactId>  
            <version>2.1.0</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.spark</groupId>  
            <artifactId>spark-core_2.10</artifactId>  
            <version>2.1.0</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.avro</groupId>  
            <artifactId>avro</artifactId>  
            <version>1.8.1</version>  
        </dependency>  
    
    ![maven scala 工程](https://img.haomeiwen.com/i4861551/4c3e36494e8b6f36.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    编写scala读取代码,以下代码片段
    

    case class CustomerAddressData(ca_address_sk: Long,
    ca_address_id: String,
    ca_street_number: String,
    ca_street_name: String,
    ca_street_type: String,
    ca_suite_number: String,
    ca_city: String,
    ca_county: String,
    ca_state: String,
    ca_zip: String,
    ca_country: String,
    ca_gmt_offset: Double,
    ca_location_type: String
    )
    // org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    def main(args: Array[String]): Unit = {
    val path = "/Users/zoulihan/Desktop/customeraddress.avro"
    val conf = new SparkConf().setAppName("test").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._ //为什么要加此段代码?

    val _rdd = sc.hadoopFile[AvroWrapper[CustomerAddress], NullWritable, AvroInputFormat[CustomerAddress]](path)  
    val ddd = _rdd.map(line => new CustomerAddressData(  
      line._1.datum().getCaAddressSk,  
      line._1.datum().getCaAddressId.toString,  
      line._1.datum().getCaStreetNumber.toString,  
      line._1.datum().getCaStreetName.toString,  
      line._1.datum().getCaStreetType.toString,  
      line._1.datum().getCaSuiteNumber.toString,  
      line._1.datum().getCaCity.toString,  
      line._1.datum().getCaCounty.toString,  
      line._1.datum().getCaState.toString,  
      line._1.datum().getCaZip.toString,  
      line._1.datum().getCaCountry.toString,  
      line._1.datum().getCaGmtOffset,  
      line._1.datum().getCaLocationType.toString  
    ))  
    val ds = sqlContext.createDataset(ddd)  
    ds.show()  
    val df = ds.toDF();  
    df.createTempView("customer_address");
    

    // sqlContext.sql("select count(*) from customer_address").show()
    sqlContext.sql("select * from customer_address limit 10").show()
    }

    
    <p>spark运行结果</p>
    
    ![Paste_Image.png](https://img.haomeiwen.com/i4861551/b539947108706374.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    源代码:
    https://github.com/javaxsky/avrotospark
    扩展:
    1.如何将avro数据文件load到hive中
    2.通过sparksql将统计后的数据加载到hive中

    相关文章

      网友评论

        本文标题:Apache Avro序列化/反序列化数据及Spark读取avr

        本文链接:https://www.haomeiwen.com/subject/zivvgttx.html