前言
基于目前对table format的了解,这里自定义event json format,用来处理事件流数据,因为事件流字段不固定,可能只有少部分字段是固定的,其他字段都是扩展的,所以笔者想实现用户自定义schema指定公共字段,然后其他字段以json的行为存在metadata中的default字段中。
- 定义序列化和反序列化的工厂类,实现其方法
- 在SPI配置文件中对工厂类进行配置
- 在table format工厂类中初始化编解码的过程,并且声明所需要的option配置参数
package com.sht.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class EventJsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
public static final String IDENTIFIER = "event-json";
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
final String others = formatOptions.getOptional(EventJsonOptions.OTHER_FIELD).orElse(null);
return new EventJsonDecodingFormat(ignoreParseErrors, timestampFormat, others);
}
@Override
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
final String others = formatOptions.getOptional(EventJsonOptions.OTHER_FIELD).orElse(null);
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new EventJsonSerializationSchema(rowType, timestampFormat, others);
}
};
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
options.add(EventJsonOptions.OTHER_FIELD);
return options;
}
}
- 编码过程比较简单,直接new抽象方法来实现,解码方法相对比较复杂,单独定义类
package com.sht.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class EventJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
private List<String> metadataKeys;
private final String otherField;
private final boolean ignoreParseErrors;
private final TimestampFormat timestampFormat;
public EventJsonDecodingFormat(boolean ignoreParseErrors, TimestampFormat timestampFormat, String otherField) {
this.ignoreParseErrors = ignoreParseErrors;
this.timestampFormat = timestampFormat;
this.otherField = otherField;
this.metadataKeys = Collections.emptyList();
}
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType) {
final List<EventJsonDecodingFormat.ReadableMetadata> readableMetadata =
metadataKeys.stream()
.map(
k ->
Stream.of(EventJsonDecodingFormat.ReadableMetadata.values())
.filter(rm -> rm.key.equals(k))
.findFirst()
.orElseThrow(IllegalStateException::new))
.collect(Collectors.toList());
final List<DataTypes.Field> metadataFields =
readableMetadata.stream()
.map(m -> DataTypes.FIELD(m.key, m.dataType))
.collect(Collectors.toList());
final DataType producedDataType =
DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new EventJsonDeserializationSchema(producedDataType, producedTypeInfo, ignoreParseErrors, timestampFormat, otherField);
}
@Override
public Map<String, DataType> listReadableMetadata() {
final Map<String, DataType> metadataMap = new LinkedHashMap<>();
Stream.of(EventJsonDecodingFormat.ReadableMetadata.values())
.forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
return metadataMap;
}
@Override
public void applyReadableMetadata(List<String> metadataKeys) {
this.metadataKeys = metadataKeys;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
/** List of metadata that can be read with this format. */
enum ReadableMetadata {
OTHERS(
"others",
DataTypes.STRING().nullable(),
DataTypes.FIELD("others", DataTypes.STRING()));
final String key;
final DataType dataType;
final DataTypes.Field requiredJsonField;
ReadableMetadata(
String key,
DataType dataType,
DataTypes.Field requiredJsonField) {
this.key = key;
this.dataType = dataType;
this.requiredJsonField = requiredJsonField;
}
}
}
- Option配置类
package com.sht.formats.json;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.formats.json.JsonOptions;
/** Option utils for event-json format. */
public class EventJsonOptions {
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;
public static final ConfigOption<String> OTHER_FIELD =
ConfigOptions.key("others")
.stringType()
.defaultValue("others")
.withDescription("扩展字段以json的方式存入该字段");
}
- 序列化类,将RowData序列化为字节数组,用于输出到外部连接
package com.sht.formats.json;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import java.util.List;
public class EventJsonSerializationSchema implements SerializationSchema<RowData> {
private List<RowType.RowField> rowTypeFields;
private String othersField;
public EventJsonSerializationSchema(RowType rowType, TimestampFormat timestampFormat, String otherField) {
this.rowTypeFields = rowType.getFields();
this.othersField = otherField;
}
@Override
public byte[] serialize(RowData rowData) {
JSONObject jsonObject = new JSONObject();
for (int i=0; i< rowTypeFields.size(); i++) {
final RowType.RowField rowField = rowTypeFields.get(i);
final String rowLine = rowData.getString(i).toString();
if (othersField.equals(rowField.getName())) {
jsonObject.putAll(JSONObject.parseObject(rowLine));
} else {
jsonObject.put(rowField.getName(), rowLine);
}
}
return jsonObject.toJSONString().getBytes();
}
}
- 反序列化类,用于将外部数据流转化为flink sql可是识别的RowData数据结构
package com.sht.formats.json;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class EventJsonDeserializationSchema implements DeserializationSchema<RowData> {
private TypeInformation<RowData> resultTypeInfo;
private List<RowType.RowField> rowTypeFields;
private String otherField;
public EventJsonDeserializationSchema(DataType dataType, TypeInformation<RowData> resultTypeInfo, boolean ignoreParseErrors,
TimestampFormat timestampFormatOption, String otherField) {
this.resultTypeInfo = resultTypeInfo;
final RowType rowType = (RowType) dataType.getLogicalType();
this.rowTypeFields = rowType.getFields();
this.otherField = otherField;
}
@Override
public RowData deserialize(byte[] bytes) throws IOException {
throw new RuntimeException(
"Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
}
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
if (message == null || message.length == 0) {
return;
}
String line = new String(message);
final JSONObject jsonObject = JSONObject.parseObject(line);
GenericRowData rowData = new GenericRowData(rowTypeFields.size() );
JSONObject others = new JSONObject();
List<String> existField = new ArrayList<>();
for (int i=0; i<rowTypeFields.size(); i++) {
final RowType.RowField rowField = rowTypeFields.get(i);
if (jsonObject.containsKey(rowField.getName())) {
existField.add(rowField.getName());
rowData.setField(i, new BinaryStringData(jsonObject.getString(rowField.getName())));
}
}
for (String key : jsonObject.keySet()) {
if (!existField.contains(key)) {
others.put(key, jsonObject.get(key));
}
}
rowData.setField(rowTypeFields.size() - 1, new BinaryStringData(others.toJSONString()));
out.collect(rowData);
}
@Override
public boolean isEndOfStream(RowData rowData) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return resultTypeInfo;
}
}
- SPI接口配置。在resources源目录中创建META-INF/services两层文件夹,然后创建org.apache.flink.table.factories.Factory配置文件,内容如下:
com.sht.formats.json.EventJsonFormatFactory
- 测试用例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class EventJsonFormatTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql(" " +
" CREATE TABLE sourceTable ( " +
" others STRING METADATA FROM 'value.others', " +
" key STRING, " +
" uid STRING " +
" ) WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'event', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'properties.enable.auto.commit' = 'false', " +
" 'properties.session.timeout.ms' = '90000', " +
" 'properties.request.timeout.ms' = '325000', " +
" 'scan.startup.mode' = 'earliest-offset' , " +
" 'value.format' = 'event-json', " +
" 'value.event-json.others' = 'others' " +
" ) "
);
tableEnvironment.executeSql(" " +
" CREATE TABLE sinkTable ( " +
" others STRING, " +
" key STRING, " +
" uid STRING " +
" ) WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'dwd_event', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'properties.enable.auto.commit' = 'false', " +
" 'properties.session.timeout.ms' = '90000', " +
" 'properties.request.timeout.ms' = '325000', " +
" 'value.format' = 'event-json', " +
" 'value.event-json.others' = 'others', " +
" 'sink.partitioner' = 'round-robin', " +
" 'sink.parallelism' = '4' " +
" ) "
);
// tableEnvironment.executeSql("select * from sourceTable");
tableEnvironment.executeSql("insert into sinkTable(key, uid, others) select key, uid, others from sourceTable");
}
}
结
Table Format作为Connector组件单独用于序列化和反序列化内部数据的模块而单独存在,多个Connector可以公用。自定义Table Format可以让大家更好的理解Flink SQL时如何将外部数据转化为内部可以识别的RowData数据结构的,从而在排查问题的时候能准确定位到具体位置。
网友评论