美文网首页
MLSQL 内置Delta数据湖以及Compaction功能介绍

MLSQL 内置Delta数据湖以及Compaction功能介绍

作者: 祝威廉 | 来源:发表于2019-06-06 13:16 被阅读0次

前言

之前写过一篇文章 Delta的真正用处和价值,你可知道,该项目开源的那天我就集到MLSQL了。不过当时只是尝鲜性质,主要原因是因为我一直觉得delta缺了Compaction功能。很多公司其实都有小文件的困扰,而Delta这个问题会更严重。不过近期Delta团队应该就会发布新版本了,届时有可能相关的功能都会补上。不过MLSQL现在也自己实现了一个Compaction的功能,并且对delta做了一定的集成和增强。

写入delta数据

下面的例子都会使用这个数据:

set data='''
{"key":"a","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"a","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"b","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

流写入样例:

-- the stream name, should be uniq.
set streamName="streamExample";

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;


select *  from newkafkatable1 
as table21;

-- output the the result to console.
save append table21  
as rate.`/tmp/delta/rate-1-table`
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/rate-1" partitionBy key;

这里注意一下是流里面delta叫rate。

批写入示例:

load jsonStr.`data` as datasource;

save append datasource  
as delta.`/tmp/delta/rate-2-table`
partitionBy key;

Delta 工具集

!delta history /tmp/delta/rate-2-table;
image.png

Compaction前置条件

Compaction 实现参看这里Github。只有一个文件,用户兼容0.1.0版本,用户可以直接拷贝的自己项目里。

使用Compaction的前提有如下几个:

  1. delta表至少发生了一次checkpoint.默认是有十次提交就会产生一个新的checkpoint.
  2. 批/流都需要为append 模式往表里写数据

Compaction使用范例

!delta compact /tmp/delta/rate-2-table 8 1;

也可以使用

!delta help; 

查看这些命令怎么使用。

前面表示对第八个版本之前的所有数据都进行合并,每个目录(分区)都合并成一个文件。

我们看下合并前每个分区下面文件情况:

image.png

合并后文件情况:

image.png

一个Compaction也是一次提交:

image.png

我们删除了16个文件,生成了两个新文件。另外在compaction的时候,并不影响读和写。所以是非常有用的。

总结

Delta解决了几个痛点问题:

  1. 数据版本问题
  2. 并发读写问题(多个进程或者单个进程多线程)
  3. 流批共享表

这几个问题对于数仓是经常会遇到的。

而对于机器学习而言,譬如在MLSQL里,用户是可以将自己的训练参数保存成版本的,但是他们使用的数据目前只能存多份(实际使用时用户需要手动修改存储目录,每次训练的时候)。而使用了delta之后,意味着机器学习的完整实验版本都可以被追踪,什么样的数据通过什么算法,以及配合什么参数得到了什么样的结果,都可以得到保留。

相关文章

网友评论

      本文标题:MLSQL 内置Delta数据湖以及Compaction功能介绍

      本文链接:https://www.haomeiwen.com/subject/xoiyxctx.html