美文网首页
Java操作parquet

Java操作parquet

作者: 明翼 | 来源:发表于2018-03-19 16:04 被阅读2540次

    这段时间因为项目,对parquet做了一系列研究,从写入跟踪到合并及spark使用等等场景。
    选择parquet来对流数据进行序列化,用于后续离线分析的理由有以下几点:
    1、流数据一般格式比较杂乱,可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
    2、网络流量数据量非常的庞大,即使过滤部分,还是非常吓人,parquet压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。
    3、后续分析,只读取需要的列,支持向量运算,能够获取更好的扫描性能

    写入parquet的过程,代码部分还是比较简单的

    1、创建schema

    public void builderSchema(List<Map<String, String>> list) throws IllegalArgumentException, IOException {  
    MessageTypeBuilder builder = Types.buildMessage();  
     list.get(0).forEach((key, value) -> {  
    builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(key);  
      });  
     File file = new File("./conf/parquet.schema");  
      if (file.exists()) {  
     FileUtils.readLines(file).forEach(str -> {  
     builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(str);  
     });  
      }  
      MessageType schema = builder.named("aus");  
      hdfsConfig.set("parquet.example.schema", schema.toString());  
      factory = new SimpleGroupFactory(schema);  
     }  
    
    我这个是动态根据传入的数据,自动创建schema的,同时也支持手动配置
    

    2、创建writer

     public void refreshWriter(String path) throws IllegalArgumentException, IOException {  
    closeParquetWriter();  
    if (factory != null) {  
     writer = new ParquetWriter<>(new Path(path + "/" + new Date().getTime() + ".parquet"), hdfsConfig,  
    new GroupWriteSupport());  
     }  
     }  
    
    ps:声明ParquetWriter有非常多的构造,我这个选择的参数最少的。原因后面分析会讲。
    

    3、写入数据

    public void writeParquet(List<Map<String, String>> list) throws IOException {  
     for (Map<String, String> map : list) {  
    Group group = factory.newGroup();  
    map.forEach((key, value) -> {  
    group.append(key, value);  
     });  
     writer.write(group);  
      }  
    }  
    
    以上的代码便可以实现把数据序列化成parquet格式文件。
    
    在实际开发过程中,遇到过好几个坑,首先开始我选择的是另外一个构造器,详情如下,此构造器可以手动指定block,pagesize等大小。
    
     public ParquetWriter(  
     Path file,  
    ParquetFileWriter.Mode mode,  
     WriteSupport<T> writeSupport,  
     CompressionCodecName compressionCodecName,  
     int blockSize,  
     int pageSize,  
     int dictionaryPageSize,  
     boolean enableDictionary,  
    boolean validating,  
    WriterVersion writerVersion,  
      Configuration conf) throws IOException {  
      this(file, mode, writeSupport, compressionCodecName, blockSize,  
    validating, conf, MAX_PADDING_SIZE_DEFAULT,  
     ParquetProperties.builder()  
     .withPageSize(pageSize)  
      .withDictionaryPageSize(dictionaryPageSize)  
      .withDictionaryEncoding(enableDictionary)  
      .withWriterVersion(writerVersion)  
     .build());  
      }  
    
    在调试中,发现这几个参数死活都不生效,后来经过源码跟踪调试,发现,在parquet的底层他实现这样的功能,因为我HDFS的block大小为128M,而我代码中设置的为4K,所以死活都不生效,看着数据不停地写入,但是hdfs上文件大小没有丝毫变化,原来都是在内存中。
    
    1. // use the default block size, unless row group size is larger
      2 long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);

    2. 即获取了DFS的block块大小,然后跟设置的值比较取最大值。

      有一种办法可以强制性提交到服务器,那就是调用writer的close方法。但问题是,close当前这个文件之后,下次就不能再打开续写的,parquet只有两种模式,要么创建,要么覆盖。所以对于流数据场景,想要比较好的实时性,那就会创建非常多的小文件,这对hdfs的压力是非常大的。所以,在项目中,我选择了定时刷新writer,意思就是每隔一个小时,或者每隔一天来创建一个writer,这样可以保证一个文件不至于太小,且可以及时关闭掉,好让spark读取。(ps:未close掉的parquet文件,spark是没法加载的,会提示不是parquet格式的文件)

      还有一种办法可以合并parquet小文件,在spark研究中发现,有这样一个特性,coalesce函数可以指定block个数来输出,并且可以加载父目录下全部的parquet文件,所以可实现将多个parquet文件合并成一个文件。


      image.png

      可以看到之前目录下有4001个文件:


      image.png

    处理之后,可以发现成功合并:


    image.png

    至于spark读取parquet文件,进行分析,就非常简单了,用spark-shell做一个简单的演示:


    image.png

    转个同事写的:http://blog.csdn.net/cyony/article/details/79608261

    相关文章

      网友评论

          本文标题:Java操作parquet

          本文链接:https://www.haomeiwen.com/subject/rfdncxtx.html