美文网首页Hadoop
131.Avro格式数据与在spark中应用

131.Avro格式数据与在spark中应用

作者: 大勇任卷舒 | 来源:发表于2022-10-27 16:27 被阅读0次
    • Apache Avro 是一个数据序列化系统,Avro提供Java、Python、C、C++、C#等语言API接口,下面通过java的一个实例来说明Avro序列化和反序列化数据。
      • 支持丰富的数据结构
      • 快速可压缩的二进制数据格式
      • 存储持久数据的文件容器
      • 远程过程调用(RPC)
      • 动态语言的简单集成

    131.1 Avro数据生成

    • 下载
    Avro官网:http://avro.apache.org/
     Avro版本:1.8.1
     下载Avro相关jar包:avro-tools-1.8.1.jar 该jar包主要用户将定义好的schema文件生成对应的java文件
    
    • 定义一个schema文件
    {
       "namespace":"com.peach.arvo",
      "type": "record",
      "name": "CustomerAddress",
      "fields": [
         {"name":"ca_address_sk","type":"long"},
         {"name":"ca_address_id","type":"string"},
         {"name":"ca_street_number","type":"string"},
         {"name":"ca_street_name","type":"string"},
         {"name":"ca_street_type","type":"string"},
         {"name":"ca_suite_number","type":"string"},
         {"name":"ca_city","type":"string"},
         {"name":"ca_county","type":"string"},
         {"name":"ca_state","type":"string"},
         {"name":"ca_zip","type":"string"},
        {"name":"ca_country","type":"string"},
         {"name":"ca_gmt_offset","type":"double"},
         {"name":"ca_location_type","type":"string"}
      ]
    }
    
    • Schema说明:
      • namespace:在生成java文件时import包路径
      • type:omplex types(record, enum,array, map, union, and fixed)
      • name:生成java文件时的类名
      • fileds:schema中定义的字段及类型
    • 生成java代码文件
    # 使用第1步下载的avro-tools-1.8.1.jar包,生成java code
    java -jar  avro-tools-1.8.1.jar compile schema CustomerAddress.avsc .
    # 末尾的"."代表java code 生成在当前目录,命令执行成功后显示:
    

    131.2 Java生成Avro文件

    • 创建java工程
    # 在pom.xml文件中添加如下依赖
    <dependency>
         <groupId>org.apache.avro</groupId> 
         <artifactId>avro</artifactId> 
         <version>1.8.1</version>   
    </dependency>
    
    • java类GenerateDataApp
    package com.peach; 
    import java.io.BufferedReader; 
    import java.io.File; 
    import java.io.FileInputStream; 
    import java.io.InputStreamReader; 
    import java.util.StringTokenizer; 
    import  org.apache.avro.file.DataFileWriter; 
    import  org.apache.avro.io.DatumWriter; 
    import  org.apache.avro.specific.SpecificDatumWriter;   
    import com.peach.arvo.CustomerAddress;  
    
    /**   
     *  @author  peach 
     * 主要用于生成avro数据文件 
     */ 
    public class GenerateDataApp { 
    //     private static String customerAddress_avsc_path; 
    // 
    //     static { 
    //         customerAddress_avsc_path =    GenerateDataApp.class.getClass().getResource("/CustomerAddress.avsc").getPath();
    //     } 
         private static String source_data_path =  "F:\\data\\customer_address.dat"; //源数据文件路  径 
         private static String dest_avro_data_path =  "F:\\data\\customeraddress.avro"; //生成的avro数据文件路径 
    
        public  static void main(String[] args) { 
    
             try { 
    //            if(customerAddress_avsc_path !=  null) { 
    //                File file = new  File(customerAddress_avsc_path); 
    //                Schema schema = new  Schema.Parser().parse(file); 
    //            }   
                 DatumWriter<CustomerAddress> caDatumwriter = new  SpecificDatumWriter<>(CustomerAddress.class); 
                 DataFileWriter<CustomerAddress> dataFileWriter = new  DataFileWriter<>(caDatumwriter);   
                 dataFileWriter.create(new CustomerAddress().getSchema(), new  File(dest_avro_data_path)); 
                 loadData(dataFileWriter); 
                 dataFileWriter.close(); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } 
         } 
    
         /** 
          * 加载源数据文件 
          * @param dataFileWriter 
          */ 
         private static void loadData(DataFileWriter<CustomerAddress>  dataFileWriter) { 
             File file = new File(source_data_path); 
             if(!file.isFile()) { 
                 return; 
             }  
             try { 
                 InputStreamReader isr = new InputStreamReader(new  FileInputStream(file)); 
                 BufferedReader reader = new BufferedReader(isr); 
                 String line; 
                 CustomerAddress address; 
                 while ((line = reader.readLine()) != null) { 
                    address =  getCustomerAddress(line); 
                    if (address != null) { 
                         dataFileWriter.append(address);   
                    } 
                 } 
                 isr.close(); 
                 reader.close(); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } 
         } 
    
         /** 
          * 解析单条文本数据封装CustomerAddress对象 
          * @param line 
          * @return 
          */ 
         private static CustomerAddress getCustomerAddress(String line) { 
             CustomerAddress ca = null; 
             try { 
                 if (line != null && line != "") { 
                    StringTokenizer token = new  StringTokenizer(line, "|"); //使用stringtokenizer拆分字符串时,会去自动除""类型 
                     if(token.countTokens()  >= 13) { 
                         ca = new  CustomerAddress(); 
                         ca.setCaAddressSk(Long.parseLong(token.nextToken())); 
                         ca.setCaAddressId(token.nextToken());   
                         ca.setCaStreetNumber(token.nextToken()); 
                         ca.setCaStreetName(token.nextToken());   
                         ca.setCaStreetType(token.nextToken());   
                         ca.setCaSuiteNumber(token.nextToken()); 
                         ca.setCaCity(token.nextToken()); 
                         ca.setCaCounty(token.nextToken());   
                         ca.setCaState(token.nextToken());   
                         ca.setCaZip(token.nextToken());   
                         ca.setCaCountry(token.nextToken());   
                         ca.setCaGmtOffset(Double.parseDouble(token.nextToken())); 
                         ca.setCaLocationType(token.nextToken()); 
                    } else { 
                         System.err.println(line); 
                    } 
                 } 
            } catch (NumberFormatException e)  { 
                 System.err.println(line); 
             } 
    
             return ca; 
         } 
    }
    
    # 动态生成avro文件,通过将数据封装为GenericRecord对象,动态的写入avro文件
    private static void  loadData(DataFileWriter<GenericRecord> dataFileWriter, Schema schema)  { 
         File file = new File(sourcePath);   
         if(file == null) { 
             logger.error("[peach], source data not found"); 
             return ; 
         } 
    
         InputStreamReader inputStreamReader = null; 
         BufferedReader bufferedReader = null;   
         try { 
             inputStreamReader = new InputStreamReader(new  FileInputStream(file)); 
             bufferedReader = new BufferedReader(inputStreamReader); 
             String line; 
             GenericRecord genericRecord; 
             while((line = bufferedReader.readLine()) != null) { 
                 if(line != "") { 
                    String[] values =  line.split("\\|"); 
                    genericRecord =  SchemaUtil.convertRecord(values, schema);   
                    if(genericRecord != null) { 
                         dataFileWriter.append(genericRecord);   
                    } 
                 } 
             } 
    
         } catch (Exception e) { 
             e.printStackTrace(); 
         } finally { 
             try { 
                 if(bufferedReader != null) { 
                    bufferedReader.close(); 
                 } 
                 if(inputStreamReader != null) {   
                     inputStreamReader.close(); 
                 } 
             } catch (IOException e) { 
             } 
         } 
    }
    

    131.3 Avro在spark中的应用

    • 创建一个scala工程
    # 在pom.xml文件中增加如下依赖
    <dependency> 
         <groupId>com.peach</groupId> 
         <artifactId>generatedata</artifactId> 
         <version>1.0-SNAPSHOT</version> 
    </dependency> 
    <dependency> 
         <groupId>com.databricks</groupId> 
        <artifactId>spark-avro_2.10</artifactId> 
        <version>2.1.0</version> 
    </dependency> 
    <!--  https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-sql_2.10</artifactId> 
        <version>2.1.0</version> 
    </dependency> 
    <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-core_2.10</artifactId> 
        <version>2.1.0</version> 
    </dependency> 
    <dependency> 
         <groupId>org.apache.avro</groupId> 
        <artifactId>avro</artifactId> 
        <version>1.8.1</version> 
    </dependency>
    
    • Scala
    case class  CustomerAddressData(ca_address_sk: Long,
                                    ca_address_id:  String,
                                    ca_street_number: String,
                                    ca_street_name: String,
                                    ca_street_type: String,
                                    ca_suite_number: String,
                                    ca_city:  String,
                                    ca_county: String,
                                    ca_state:  String,
                                    ca_zip:  String,
                                    ca_country:  String,
                                    ca_gmt_offset:  Double,
                                    ca_location_type: String
                                  ) 
    //   org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 
    
    def main(args:  Array[String]): Unit = { 
      val path =  "/Users/zoulihan/Desktop/customeraddress.avro" 
      val conf = new SparkConf().setAppName("test").setMaster("local[2]") 
      val sc = new SparkContext(conf) 
      val sqlContext = new SQLContext(sc) 
      import sqlContext.implicits._ //为什么要加此段代码? 
    
      val _rdd =  sc.hadoopFile[AvroWrapper[CustomerAddress], NullWritable, AvroInputFormat[CustomerAddress]](path) 
      val ddd = _rdd.map(line => new  CustomerAddressData( 
        line._1.datum().getCaAddressSk, 
         line._1.datum().getCaAddressId.toString, 
         line._1.datum().getCaStreetNumber.toString, 
         line._1.datum().getCaStreetName.toString, 
         line._1.datum().getCaStreetType.toString, 
         line._1.datum().getCaSuiteNumber.toString, 
         line._1.datum().getCaCity.toString, 
         line._1.datum().getCaCounty.toString,   
         line._1.datum().getCaState.toString, 
         line._1.datum().getCaZip.toString, 
         line._1.datum().getCaCountry.toString,   
        line._1.datum().getCaGmtOffset, 
         line._1.datum().getCaLocationType.toString 
      )) 
      val ds = sqlContext.createDataset(ddd) 
      ds.show()   
      val df = ds.toDF(); 
      df.createTempView("customer_address");
    //    sqlContext.sql("select count(*) from  customer_address").show()
      sqlContext.sql("select * from  customer_address limit 10").show()
    }
    

    大数据视频推荐:
    网易云课堂
    CSDN
    人工智能算法竞赛实战
    AIops智能运维机器学习算法实战
    ELK7 stack开发运维实战
    PySpark机器学习从入门到精通
    AIOps智能运维实战
    腾讯课堂
    大数据语音推荐:
    ELK7 stack开发运维
    企业级大数据技术应用
    大数据机器学习案例之推荐系统
    自然语言处理
    大数据基础
    人工智能:深度学习入门到精通

    相关文章

      网友评论

        本文标题:131.Avro格式数据与在spark中应用

        本文链接:https://www.haomeiwen.com/subject/njhmzrtx.html