Flink1.10的SQL支持FileSystem的SQL Connector。使用语法如下:
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'filesystem', -- required: specify to connector type
'connector.path' = 'file:///path/to/whatever', -- required: path to a file or directory
'format.type' = '...', -- required: file system connector requires to specify a format,
... -- currently only 'csv' format is supported.
-- Please refer to old CSV format part of Table Formats section for more details.
)
但是官方提供的这种方法有很多局限性:
- 不能自定义数据分区
- 不能指定覆盖原文件的参数(同样的路径,第二次插入必须要提前把写入的文件删除)
- 没有文件滚动策略
BucketingSink
BucketingSink是Flink提供的FileSystem的Connector,支持Hadoop文件系统支持的所有文件系统,提供了多种文件滚的策略, 但是没有SQL版本,下面就基于BucketingSink自己实现一个SQL版本的Connector。直接上代码了。
TableFactory
看过Flink的源码的应该都知道Flink的TableFactory机制,它是一个SPI,提供了Flink与外部数据源的扩展。我们在自定义TableSource或者TableSink时,需要在类路径下添加SPI机制的文件:META_INF/services/org.apache.flink.table.factories.TableFactory
,内容是TableFactory的实现类全名。比如我的是 com.szc.streaming.connectors.hdfs26.StreamingHDFS26TableSinkFactory
。Flink在解析完SQL后,会把SQL的schema和with属性转换成一个Map,Flink通过扫描类路径下的所有Jar包里的TableFactory,一个一个的与当前的with属性里的key,value匹配。感兴趣的同学可以去看下相关源码: 包名: flink-table-common, org.apache.flink.table.factories.TableFactoryService
。下面是我自己实现的TableFactory
先看下代码结构(因为涉及一些公司的信息,我把包名和一些涉及公司的类名打码了):
-
StreamingHDFS26TableSinkFactory.java
- TableFactory的实现类, 负责connector的验证和Sink的实例化
public class StreamingHDFS26TableSinkFactory implements StreamTableSinkFactory<Row> {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put(CONNECTOR_TYPE, CONNECTOR_HDFS_TYPE_VALUE);
context.put(CONNECTOR_PROPERTY_VERSION, "1");
return context;
}
@Override
public List<String> supportedProperties() {
List<String> properties = new ArrayList<>();
// update mode
properties.add(UPDATE_MODE);
//hdfs
properties.add(CONNECTOR_PATH);
properties.add(ROLLING_POLICY_PARTSIZE);
properties.add(ROLLING_POLICY_ROLL_OVER_INTERVAL);
properties.add(ROLLING_POLICY_INACTIVITY_INTERVAL);
properties.add(ROLLING_POLICY_INACTIVITY_THRESHOLD);
properties.add(BUCKET_ASSIGNER_FIELD);
properties.add(BUCKET_ASSIGNER_TYPE);
properties.add(BUCKET_ASSIGNER_FIELD_FORMATTER);
properties.add(BUCKET_ASSIGNER_OUTPUT_FORMATTER);
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
properties.add(SCHEMA + ".#." + SCHEMA_FROM);
return properties;
}
/**
* 创建TableSink实例
* @param properties
* @return
*/
@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
// 最终FlinkSql的table都会转换成Map的key-value
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
//FileSystemSink的builder
FileSystemBuilder builder = FileSystemSink.builder();
//schema
final TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
builder.setSchema(schema);
// bucket assigner,验证 bucket.assigner.field,是否自定义分区
final String assignerField = descriptorProperties.getOptionalString(BUCKET_ASSIGNER_FIELD).orElse(null);
if(StringUtils.isNotEmpty(assignerField)) {
final String assignerFieldType = descriptorProperties.getString(BUCKET_ASSIGNER_TYPE);
builder.setAssignerField(assignerField);
builder.setAssignerFieldType(assignerFieldType);
if(BUCKET_ASSIGNER_FIELD_TYPE_DATE_VALUE.equalsIgnoreCase(assignerFieldType)) {
builder.setInputFormatter(descriptorProperties.getString(BUCKET_ASSIGNER_FIELD_FORMATTER));
builder.setOutputFormatter(descriptorProperties.getString(BUCKET_ASSIGNER_OUTPUT_FORMATTER));
}
}
//验证滚动策略配置
//rolling policy
if(descriptorProperties.containsKey(ROLLING_POLICY_PARTSIZE)) {
builder.setMaxPartSize(descriptorProperties.getMemorySize(ROLLING_POLICY_PARTSIZE).getBytes());
}
if(descriptorProperties.containsKey(ROLLING_POLICY_ROLL_OVER_INTERVAL)) {
builder.setRollOverInterval(descriptorProperties.getDuration(ROLLING_POLICY_ROLL_OVER_INTERVAL).toMillis());
}
if(descriptorProperties.containsKey(ROLLING_POLICY_INACTIVITY_INTERVAL)) {
builder.setInactivityInterval(descriptorProperties.getDuration(ROLLING_POLICY_INACTIVITY_INTERVAL).toMillis());
}
if(descriptorProperties.containsKey(ROLLING_POLICY_INACTIVITY_THRESHOLD)) {
builder.setInactivityThreshold(descriptorProperties.getDuration(ROLLING_POLICY_INACTIVITY_THRESHOLD).toMillis());
}
builder.setOutputPath(descriptorProperties.getString(CONNECTOR_PATH));
return builder.build();
}
private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(properties);
// allow Kafka timestamps to be used, watermarks can not be received from source
new SchemaValidator(true, false, false).validate(descriptorProperties);
new HDFSValidator().validate(descriptorProperties);
return descriptorProperties;
}
}
-
HDFSValidator.java
- 验证with属性
public class HDFSValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_HDFS_TYPE_VALUE = "hdfs26";
public static final String CONNECTOR_PATH = "connector.path";
/**
* 滚动策略:
*/
public static final String ROLLING_POLICY_PARTSIZE = "rolling.policy.part.size";
// 打开的bucket最大存活时间,无论有没有达到part.size,也要关闭bucket
public static final String ROLLING_POLICY_ROLL_OVER_INTERVAL = "rolling.policy.over.interval";
// 多久检查一次非活跃状态的bucket
public static final String ROLLING_POLICY_INACTIVITY_INTERVAL = "rolling.policy.inactivity.interval";
// 处于非活跃状态时的bucket的阈值
public static final String ROLLING_POLICY_INACTIVITY_THRESHOLD = "rolling.policy.inactivity.threshold";
/**
* 指定数据中的某一个字段为bucket assigner,也就是数据存入的分区目录
*/
public static final String BUCKET_ASSIGNER_FIELD = "bucket.assigner.field";
// date,代表用时间来分区; string代表取数据源中的某一个字段 来分区
public static final String BUCKET_ASSIGNER_TYPE = "bucket.assigner.type";
public static final String BUCKET_ASSIGNER_FIELD_TYPE_DATE_VALUE = "date";
public static final String BUCKET_ASSIGNER_FIELD_TYPE_STRING_VALUE = "string";
// 时间格式,如果原field是一个long类型,直接写成BIGINT,否则为 通用的时间类型
public static final String BUCKET_ASSIGNER_FIELD_FORMATTER = "bucket.assigner.field.formatter";
public static final String BUCKET_ASSIGNER_OUTPUT_FORMATTER = "bucket.assigner.output.formatter";
@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateEnumValues(UPDATE_MODE, true, Collections.singletonList(UPDATE_MODE_VALUE_APPEND));
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_HDFS_TYPE_VALUE, false);
properties.validateString(CONNECTOR_PATH, false, 1);
//
validateBucket(properties);
}
private void validateBucket(DescriptorProperties properties) {
properties.validateEnumValues(UPDATE_MODE, true, Collections.singletonList(UPDATE_MODE_VALUE_APPEND));
final Map<String, Consumer<String>> assignerFieldTypeValidation = new HashMap<>();
assignerFieldTypeValidation.put(BUCKET_ASSIGNER_FIELD_TYPE_DATE_VALUE, noValidation());
assignerFieldTypeValidation.put(BUCKET_ASSIGNER_FIELD_TYPE_STRING_VALUE, noValidation());
properties.validateEnum(BUCKET_ASSIGNER_TYPE, true, assignerFieldTypeValidation);
properties.validateString(ROLLING_POLICY_PARTSIZE, true, 2);
properties.validateString(ROLLING_POLICY_ROLL_OVER_INTERVAL, true, 2);
properties.validateString(ROLLING_POLICY_INACTIVITY_INTERVAL, true, 2);
properties.validateString(CONNECTOR_PATH, false, 1);
}
}
-
FileSystemSink.java
- 主要的sink逻辑类,定义如何把数据发射到外部数据源
public class FileSystemSink implements AppendStreamTableSink<Row> {
public static final Long DEFAULT_BUCKET_CHECK_INTERVAL = 60 * 1000L;
private TableSchema schema;
//分区字段
private String assignerField;
// 分区字段是否是一个时间类型
private String assignerFieldType;
//如果是dateAssigner,formatter必须指定。
private String inputFormatter;
// 如果是dateAssigner,以哪种格式输出
private String outputFormatter;
// 输出的baseDir
private String outputPath;
// rolling policy中,滚动的最大size
private Long maxPartSize;
//InactivityInterva 单位 秒
private Long inactivityInterval;
private Long inactivityThreshold;
//单位 秒
private Long rollOverInterval;
private FileSystemSink(
TableSchema schema,
String assignerField,
String assignerFieldType,
String inputFormatter,
String outputFormatter,
String outputPath,
Long maxPartSize,
Long inactivityInterval,
Long rollOverInterval,
Long inactivityThreshold) {
this.schema = schema;
this.assignerField = assignerField;
this.assignerFieldType = assignerFieldType;
this.inputFormatter = inputFormatter;
this.outputFormatter = outputFormatter;
this.outputPath = outputPath;
this.maxPartSize = maxPartSize;
this.inactivityInterval = inactivityInterval;
this.rollOverInterval = rollOverInterval;
this.inactivityThreshold = inactivityThreshold;
}
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
BucketingSink<HDFSDomain> sink = new BucketingSink(outputPath);
// rolling policy
if(maxPartSize != null) {
sink.setBatchSize(maxPartSize);
}
if(inactivityInterval != null) {
sink.setInactiveBucketCheckInterval(inactivityInterval);
}
if(inactivityInterval != null) {
sink.setInactiveBucketThreshold(inactivityThreshold);
}
if(rollOverInterval != null) {
sink.setBatchRolloverInterval(rollOverInterval);
}
Bucketer<HDFSDomain> bucketer;
Integer fieldIndex = 0;
//bucketAssigner,如果不设置,则用默认的DateBucketAssigner
if(StringUtils.isEmpty(assignerField)) {
bucketer = new DateTimeBucketer<>(outputFormatter);
} else {
bucketer = new CustomBucketer();
String[] fieldNames = schema.getFieldNames();
for(String field : fieldNames) {
if(assignerField.equals(field)) {
break;
}
fieldIndex ++;
}
}
sink.setBucketer(bucketer);
// 把Row转换成HDFSDomain
return dataStream
.map(new HDFSStringMapFunction(fieldIndex, assignerFieldType, inputFormatter, outputFormatter))
.addSink(sink)
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
}
@Override
public DataType getConsumedDataType() {
return schema.toRowDataType();
}
@Override
public TableSchema getTableSchema() {
return schema;
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}
public static FileSystemBuilder builder() {
return new FileSystemBuilder();
}
public static final class FileSystemBuilder {
private TableSchema schema;
private String assignerField;
private String assignerFieldType;
private String inputFormatter;
private String outputFormatter;
private String outputPath;
private Long maxPartSize;
private Long inactivityInterval;
private Long inactivityThreshold;
private Long rollOverInterval;
private FileSystemBuilder() {}
public FileSystemBuilder setSchema(TableSchema schema) {
this.schema = schema;
return this;
}
public FileSystemBuilder setAssignerField(String assignerField) {
this.assignerField = assignerField;
return this;
}
public FileSystemBuilder setAssignerFieldType(String assignerFieldType) {
this.assignerFieldType = assignerFieldType;
return this;
}
public FileSystemBuilder setInputFormatter(String inputFormatter) {
this.inputFormatter = inputFormatter;
return this;
}
public FileSystemBuilder setOutputFormatter(String outputFormatter) {
this.outputFormatter = outputFormatter;
return this;
}
public FileSystemBuilder setOutputPath(String outputPath) {
this.outputPath = outputPath;
return this;
}
public FileSystemBuilder setMaxPartSize(Long maxPartSize) {
this.maxPartSize = maxPartSize;
return this;
}
public FileSystemBuilder setInactivityInterval(Long inactivityInterval) {
this.inactivityInterval = inactivityInterval;
return this;
}
public FileSystemBuilder setRollOverInterval(Long rollOverInterval) {
this.rollOverInterval = rollOverInterval;
return this;
}
public FileSystemBuilder setInactivityThreshold(Long inactivityThreshold) {
this.inactivityThreshold = inactivityThreshold;
return this;
}
public FileSystemSink build() {
return new FileSystemSink(
schema,
assignerField,
assignerFieldType,
inputFormatter,
outputFormatter,
outputPath,
maxPartSize,
inactivityInterval,
rollOverInterval,
inactivityThreshold);
}
}
}
-
HDFSStringMapFunction.java
- 把row转换成HDFSDomain,
public class HDFSStringMapFunction extends RichMapFunction<Row, HDFSDomain> {
private int assignerIndex;
private String assignerFieldType;
/**
* 如果是dateAssigner,formatter必须指定。
*/
private String inputFormatter;
/**
* 就是以哪种格式输出
*/
private String outputFormatter;
public HDFSStringMapFunction(int assignerIndex, String assignerFieldType, String inputFormatter, String outputFormatter) {
this.assignerIndex = assignerIndex;
this.assignerFieldType = assignerFieldType;
this.inputFormatter = inputFormatter;
this.outputFormatter = outputFormatter;
}
@Override
public HDFSDomain map(Row row) throws Exception {
String partition;
if(assignerIndex >= 0) {
Object dateObj = row.getField(assignerIndex);
String assignerValue = null;
if(dateObj != null) {
assignerValue = dateObj.toString();
}
if(assignerFieldType.equalsIgnoreCase(HDFSValidator.BUCKET_ASSIGNER_FIELD_TYPE_DATE_VALUE)) {
long millis;
//原数据就是一个毫秒数,不用格式化
if(inputFormatter.equalsIgnoreCase("bigint")) {
millis = StringUtils.isEmpty(assignerValue) ? System.currentTimeMillis() : Long.valueOf(assignerValue);
} else {
Date date = StringUtils.isEmpty(assignerValue) ? new Date() : DateUtils.parseDate(assignerValue, inputFormatter);
millis = date.getTime();
}
partition = DateFormatUtils.format(millis, outputFormatter);
} else {
partition = StringUtils.isEmpty(assignerValue) ? "undifined" : assignerValue;
}
} else {
partition = "default";
}
StringBuilder sb = new StringBuilder();
for(int i = 0; i < row.getArity(); i++) {
Object o = Objects.isNull(row.getField(i)) ? "" : row.getField(i);
sb.append(o).append("\t");
}
sb.deleteCharAt(sb.length() - 1);
return new HDFSDomain(partition, sb.toString());
}
}
-
HDFSDomain.java
- 输出对象,因为BucketingSink默认用了StringWriter,所以要实现toString方法
public class HDFSDomain {
private String bucketId;
private String value;
public HDFSDomain(String bucketId, String value) {
this.bucketId = bucketId;
this.value = value;
}
public String getBucketId() {
return bucketId;
}
public void setBucketId(String bucketId) {
this.bucketId = bucketId;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String toString() {
return value;
}
}
-
CustomBucketer.java
- 负责获取存入数据的路径
public class CustomBucketer implements Bucketer<HDFSDomain> {
@Override
public Path getBucketPath(Clock clock, Path basePath, HDFSDomain element) {
return new Path(basePath + "/" + element.getBucketId());
}
}
如何使用
以上就是基于BucketingSink的 Sql Connector的全部实现代码,具体使用方法如下:
CREATE TABLE binlog_locker_pay_detail_m(
id bitint,
birth_day bigint,
name varchar
) WITH (
'connector.type'='hdfs26',
'connector.path' = 'hdfs://xxx/flink/xxxx',
'update-mode' = 'append',
-- 可选,指定数据中的某个字段分桶,不配置用默认的分桶策略
'bucket.assigner.field' = 'birth_day',
-- 如果配置了bucket.assigner.field, 那这个字段必须配置。可选值为1.date: 代表这个字段值为一个时间属性, 2.string: 代表这个值为一个字符串。
'bucket.assigner.type' = 'date',
-- 配置分桶字段的原时间格式。如果bucket.assigner.type配置为date,则这个字段必须配置。可选值为1.bigint: 代表分桶的字段已经是一个long类型,可以不用格式化。2.标准时间格式,比如: yyyy-MM-dd HH:mm:ss
'bucket.assigner.field.formatter' = 'bigint',
-- 配置分桶目录的时间格式。如果bucket.assigner.type配置为date,则这个字段必须配置。1.标准时间格式,比如: yyyyMMdd
'bucket.assigner.output.formatter' = 'yyyyMMdd',
-- 滚动策略200M,参考 MemoryUnit
'rolling.policy.part.size' = '200m',
-- 滚动文件的时间间隔: 参考TimeUtils
'rolling.policy.over.interval' = '5min',
-- 多久检查一次非活跃状态的bucket
'rolling.policy.inactivity.interval' = '1min',
-- 非活跃bucket阈值
'rolling.policy.inactivity.threshold' = '5min'
);
全篇完
网友评论