美文网首页
flink sql upsert kafka 源码解读 你就是一

flink sql upsert kafka 源码解读 你就是一

作者: loukey_j | 来源:发表于2021-05-02 18:00 被阅读0次

    利弊

    在 flink 1.12 社区推出了 upsert kafka,他与普通 kafka source connect 的最大区别就是引入了 changelog 中的 RowKind 类型。借助 state 实现了所谓的 数据增删改,但其实很多事情和 kafka upsert 本身没有一毛钱关系,下面我们会从源码角度撕开他神秘的狼皮。与此同时它虽然好用,但是也存在一些弊端,我个人目前主要关注端在 source .

    • kafka 消息需要有 key. (至少社区大佬给的 demo 就是要携带 key)然而我们 kafka 生产端很多场景都是忽略了 key 关注的是 value. 这感觉限制了 upsert kafka 的使用,因为不太可能为了使用 upsert kafka 而让 kafka 生产端进行消息改造,这很不友好.
    • upsert kafka 默认写死从 earilest 开始消费,并且貌似没有开放其他的 消费位置设置,这简直就是灾难,你能忍我不能忍
      我会对以上两点做出源码改造

    核心源码解析

    介绍下 RowKind

    package org.apache.flink.types;
    public enum RowKind {
        INSERT("+I", (byte) 0), //代表新增
        UPDATE_BEFORE("-U", (byte) 1), //代表更新前的数据
        UPDATE_AFTER("+U", (byte) 2), //代表更新后的数据
        DELETE("-D", (byte) 3); //代表删除
    }
    /*
    结合 upsert kafka 打个比方 主键为id 1,name 为 2的 的数据
    如果消费到一条主键为1 的数据 print 出来是这样的
    | +I |                              1 |                              2 |  //代表新增
    如果再消费到一条主键为1 的name为其他比如是22的数据 print 出来是这样的
    | -U |                              1 |                              2 |  //代表更新前的数据
    | +U |                              1 |                              22 | //代表更新后的数据
    如果再消费到只含有主键为1 的数据 print 出来是这样的
    | -D |                              1 |                              22 | //代表删除
    */
    

    反序列化器

    在开始接触 upsert kafka 最想看的就是反序列化,因为先将数据反序列化 然后下发到下游,此时他应该可以做很多事情 比如先查 state 看看数据是否存在,如果存在就下发两条一条是 update before,一条 update after.代表更新前和更新后的数据类似于mysql 的binlog数据结构。另外肯定会对数据进行一些标记来标识这条数据是 update 还是 insert 还是 delete.

    回顾下create table 的逻辑到后台创建 source 的逻辑,如果你熟悉了这个流程就可以直接跳过直接看反序列化器 DynamicKafkaDeserializationSchema

    1、在 flink-connector-kafka_2.11-1.12.0.jarMETA-INF.services.org.apache.flink.table.factories.TableFactory 下有 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory 这样 flink 就可以加载到flink-connector-kafka 提供的 TableFactory
    2、在 create table 的时候会指定 connector' = 'upsert-kafka'等参数,这样 flink 在解析 建表 sql的时候根据参数去适配到合理的 TableFactory 然后 初始化 UpsertKafkaDynamicTableFactory
    3、然后创建 TableSource org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory#createDynamicTableSource -> new org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource()
    KafkaDynamicSource 接口继承如下
    KafkaDynamicSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown
    4、这里写死了

    // always use earliest to keep data integrity
    StartupMode earliest = StartupMode.EARLIEST; 
    

    源码如下

    public DynamicTableSource createDynamicTableSource(Context context) {
        // 其他代码省略
        // always use earliest to keep data integrity
        StartupMode earliest = StartupMode.EARLIEST;
    
        return new KafkaDynamicSource(
                schema.toPhysicalRowDataType(),
                keyDecodingFormat,
                new DecodingFormatWrapper(valueDecodingFormat),
                keyValueProjections.f0,
                keyValueProjections.f1,
                keyPrefix,
                KafkaOptions.getSourceTopics(tableOptions),
                KafkaOptions.getSourceTopicPattern(tableOptions),
                properties,
                earliest,
                Collections.emptyMap(),
                0,
                true);
    }
    

    4、在方法
    org.apache.flink.table.planner.sources.DynamicSourceUtils#prepareDynamicSource 中会调用org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource#getScanRuntimeProvider 方法里面会创建反序列化器和 FlinkKafkaConsumer 源码如下

    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        final DeserializationSchema<RowData> keyDeserialization =
                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
    
        final DeserializationSchema<RowData> valueDeserialization =
                createDeserialization(context, valueDecodingFormat, valueProjection, null);
    
        final TypeInformation<RowData> producedTypeInfo =
                context.createTypeInformation(producedDataType);
    
        final FlinkKafkaConsumer<RowData> kafkaConsumer =
                createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);
    
        return SourceFunctionProvider.of(kafkaConsumer, false);
    }
    

    5、创建 FlinkKafkaConsumer 的同时 创建反序列化起 DynamicKafkaDeserializationSchema

    protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
            DeserializationSchema<RowData> keyDeserialization,
            DeserializationSchema<RowData> valueDeserialization,
            TypeInformation<RowData> producedTypeInfo) {
    
        final KafkaDeserializationSchema<RowData> kafkaDeserializer = new DynamicKafkaDeserializationSchema(
                adjustedPhysicalArity,
                keyDeserialization,
                keyProjection,
                valueDeserialization,
                adjustedValueProjection,
                hasMetadata,
                metadataConverters,
                producedTypeInfo,
                upsertMode);
    
        final FlinkKafkaConsumer<RowData> kafkaConsumer;
        if (topics != null) {
            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
        } else {
            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
        }
    
        switch (startupMode) {
            case EARLIEST:
                kafkaConsumer.setStartFromEarliest();
                break;
            case LATEST:
                kafkaConsumer.setStartFromLatest();
                break;
            case GROUP_OFFSETS:
                kafkaConsumer.setStartFromGroupOffsets();
                break;
            case SPECIFIC_OFFSETS:
                kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
                break;
            case TIMESTAMP:
                kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
                break;
        }
    
        kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
    
        if (watermarkStrategy != null) {
            kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
        }
        return kafkaConsumer;
    }
    

    反序列化器 DynamicKafkaDeserializationSchema

    DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData>
    kafka source 消费到数据就会进行反序列化 调用 deserialize 方法,deserialize 代码逻辑如下

    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
        // shortcut in case no output projection is required,
        // also not for a cartesian product with the keys
        // 消息不需要解析 key
        if (keyDeserialization == null && !hasMetadata) {
            valueDeserialization.deserialize(record.value(), collector);
            return;
        }
    
        // buffer key(s)
        // 消息需要解析key
        if (keyDeserialization != null) {
            keyDeserialization.deserialize(record.key(), keyCollector);
        }
    
        // project output while emitting values
        outputCollector.inputRecord = record;
        outputCollector.physicalKeyRows = keyCollector.buffer;
        outputCollector.outputCollector = collector;
        // 在 upsert 模式下如果 消息的 value 为空当做一条 删除消息
        if (record.value() == null && upsertMode) {
            // collect tombstone messages in upsert mode by hand
            outputCollector.collect(null);
        } else {
            valueDeserialization.deserialize(record.value(), outputCollector);
        }
        keyCollector.buffer.clear();
    }
    

    详细看下 keyDeserialization.deserialize(record.key(), keyCollector);valueDeserialization.deserialize(record.value(), outputCollector); 方法

    先看 key 的keyDeserialization.deserialize(record.key(), keyCollector);

    keyCollectorDynamicKafkaDeserializationSchema$BufferingCollector
    keyDeserializationorg.apache.flink.formats.json.JsonRowDataDeserializationSchema

    再看 value 的 valueDeserialization.deserialize(record.value(), outputCollector);

    outputCollectorDynamicKafkaDeserializationSchema$OutputProjectionCollector
    valueDeserializationorg.apache.flink.formats.json.JsonRowDataDeserializationSchema

    key 和 value 的deserialize 链路是一样的都是先调用父类 org.apache.flink.api.common.serialization.DeserializationSchemadeserialize方法

    default void deserialize(byte[] message, Collector<T> out) throws IOException {
        T deserialize = deserialize(message);
        if (deserialize != null) {
            out.collect(deserialize);
        }
    }
    

    然后调用子类的 org.apache.flink.formats.json.JsonRowDataDeserializationSchemadeserialize方法

    org.apache.flink.formats.json.JsonRowDataDeserializationSchema
    public RowData deserialize(byte[] message) throws IOException {
        final JsonNode root = objectMapper.readTree(message);
        //重点看 怎么把一个jsonNode 怎么转成一个 RowData
        return (RowData) runtimeConverter.convert(root);
    }
    
    重点看 convert 方法 最后会调用如下方法
    
    org.apache.flink.formats.json.JsonToRowDataConverters#createRowConverter
    public JsonToRowDataConverter createRowConverter(RowType rowType) {
        // 省略了部分代码,把一个jsonNode 怎么转成一个 RowData
        return jsonNode -> {
            ObjectNode node = (ObjectNode) jsonNode;
            int arity = fieldNames.length;
            // 初始化 GenericRowData 这个里面会会设置 FLink RowKind
            GenericRowData row = new GenericRowData(arity);
            for (int i = 0; i < arity; i++) {
                String fieldName = fieldNames[i];
                JsonNode field = node.get(fieldName);
                Object convertedField = convertField(fieldConverters[i], fieldName, field);
                row.setField(i, convertedField);
            }
            return row;
        };
    }
    
    org.apache.flink.table.data.GenericRowData
    public GenericRowData(int arity) {
        this.fields = new Object[arity]; //字段个数
        this.kind = RowKind.INSERT; // INSERT as default
    }
    // 所以可以看到这个都是默认的  INSERT ,那在什么时候 会把他变成其他类型的 RowKind 呢?
    

    反序列化结束 最后看 collect 下发到下游

    先看 key 的 collect DynamicKafkaDeserializationSchema$BufferingCollector

    private static final class BufferingCollector implements Collector<RowData>, Serializable {
        private final List<RowData> buffer = new ArrayList<>();
        @Override
        public void collect(RowData record) {
            // 貌似很简单就是放入到了一个 buffer
            buffer.add(record);
        }
    }
    

    先看 value 的 collect DynamicKafkaDeserializationSchema$OutputProjectionCollector

    private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
        @Override
        public void collect(RowData physicalValueRow) {
            // no key defined
            int length = keyProjection.length;
            // 没有key 这不是 upsert kafka 的场景
            if (length == 0) {
                emitRow(null, (GenericRowData) physicalValueRow);
                return;
            }
            // 这里对 kafka mesg value  进行解析看看是否包含 除key 字段之外还有其他字段。如果其他字段都不存在相当于 kafka mesg value 为空
            Boolean hashValue = false;
            if(physicalValueRow != null && length > 0){
                Set<Integer> collect = Arrays.stream(keyProjection).boxed().collect(Collectors.toSet());
                for(int i = 0; i < physicalValueRow.getArity() && !hashValue; i ++){
                    if(collect.contains(i)){
                        continue;
                    }
                    hashValue = !physicalValueRow.isNullAt(i);
                }
            }
            // otherwise emit a value for each key
            for (RowData physicalKeyRow : physicalKeyRows) {
                if(!hashValue){
                    // 如果 hashValue 为空则为 null
                    emitRow((GenericRowData) physicalKeyRow,  null);
                }else{
                    emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
                }
            }
        }
    
        
    
        private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
            final RowKind rowKind;
            if (physicalValueRow == null) {
                // 如果 physicalValueRow 为 null 且是 upsertMode 模式下 则 rowKind 为删除
                if (upsertMode) {
                    rowKind = RowKind.DELETE;
                } else {
                    throw new DeserializationException(
                            "Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
                }
            } else {
                // 否则就取原本的 rowKind 类型
                rowKind = physicalValueRow.getRowKind();
            }
    
            //调用 org.apache.flink.util.Collector 下发下去
            outputCollector.collect(producedRow);
        }
    }
    

    所以可以出在整个反序列化到数据下发的过程 并没有找 sate 看主键数据是否之前到过 flink. 并且在这个过程中 rowKind 的设置就 2种 ,首先默认都是 insert 然后根据 value 是否为空进行判断如果 value 为空则将 rowKind 设为 delete . 所以到底在哪里做判断是否之前消费过这个主键数据呢?

    哪来的 state

    偶然在 flink UI 上发现到 fink 在每一个source 下都接了一个changelogNormalize 的算子如下图


    image.png

    恍然明白 这个应该 flink 框架在将 flink sql 解析成 执行计划的时候加了一个特殊的算子。跟踪源码如下,这个链路太长这里只说下核心的几个源码

    • 任务main 方法 executeSql tEnv.executeSql("SELECT * from shop").print();
    • 根据 QueryOperation 转化成一个 TableResult
    public TableResult executeInternal(QueryOperation operation) {
        SelectSinkOperation sinkOperation = new SelectSinkOperation(operation);
        //看这个 translate 方法
        List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation));
        String jobName = getJobName("collect");
        Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
    }
    
    • org.apache.flink.table.planner.delegation.PlannerBase#translate
    override def translate(
          modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
        if (modifyOperations.isEmpty) {
          return List.empty[Transformation[_]]
        }
        // prepare the execEnv before translating
        getExecEnv.configure(
          getTableConfig.getConfiguration,
          Thread.currentThread().getContextClassLoader)
        overrideEnvParallelism()
        // 这个方法也许是获取上游依赖节点的意思
        val relNodes = modifyOperations.map(translateToRel)
       //对节点进行优化 重点看这里
        val optimizedRelNodes = optimize(relNodes)
        val execNodes = translateToExecNodePlan(optimizedRelNodes)
        translateToPlan(execNodes)
      }
    
    • 在会optimize 的时候被一个 叫StreamExecTableSourceScanRule 的规则匹配上
      他会将 FlinkLogicalTableSourceScan 转换为 StreamExecTableSourceSource,能匹配的前提是它是一个 FlinkLogicalTableSourceScan 并且对于 upsert source 来说他还会优化生成一个 StreamExecChangelogNormalize.
    org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecTableSourceScanRule#convert
    def convert(rel: RelNode): RelNode = {
        val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]
        val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
        val config = ShortcutUtils.unwrapContext(rel.getCluster).getTableConfig
        val table = scan.getTable.asInstanceOf[TableSourceTable]
    
        val newScan = new StreamExecTableSourceScan(
          rel.getCluster,
          traitSet,
          table)
        // 如果是一个 upsert source
        if (isUpsertSource(table.catalogTable, table.tableSource) ||
            isSourceChangeEventsDuplicate(table.catalogTable, table.tableSource, config)) {
          // generate changelog normalize node
          // primary key has been validated in CatalogSourceTable
          val primaryKey = table.catalogTable.getSchema.getPrimaryKey.get()
          val keyFields = primaryKey.getColumns
          val inputFieldNames = newScan.getRowType.getFieldNames
          val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields)
          val requiredDistribution = FlinkRelDistribution.hash(primaryKeyIndices, requireStrict = true)
          val requiredTraitSet = rel.getCluster.getPlanner.emptyTraitSet()
            .replace(requiredDistribution)
            .replace(FlinkConventions.STREAM_PHYSICAL)
          val newInput: RelNode = RelOptRule.convert(newScan, requiredTraitSet)
          // 产生一个 StreamExecChangelogNormalize
          new StreamExecChangelogNormalize(
            scan.getCluster,
            traitSet,
            newInput,
            primaryKeyIndices)
        } else {
          newScan
        }
      }
    
    • 上面相当于加了一个 StreamExecChangelogNormalize 节点后面会对这个节点转成成 stream api
    org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecChangelogNormalize#translateToPlanInternal
    override protected def translateToPlanInternal(
          planner: StreamPlanner): Transformation[RowData] = {
    
        val inputTransform = getInputNodes.get(0).translateToPlan(planner)
          .asInstanceOf[Transformation[RowData]]
    
        val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
        val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
        val tableConfig = planner.getTableConfig
        val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
          ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
        val stateIdleTime = tableConfig.getIdleStateRetention.toMillis
        val operator = if (isMiniBatchEnabled) {
          //省略
        } else {
         // 这是重点
          val processFunction = new ProcTimeDeduplicateKeepLastRowFunction(
            rowTypeInfo,
            stateIdleTime,
            generateUpdateBefore,
            true,   // generateInsert
            false)  // inputInsertOnly
         // 并且是一个 KeyedProcessOperator
          new KeyedProcessOperator[RowData, RowData, RowData](processFunction)
        }
    
        val ret = new OneInputTransformation(
          inputTransform,
          getRelDetailedDescription,
          operator,
          rowTypeInfo,
          inputTransform.getParallelism)
    
        if (inputsContainSingleton()) {
          ret.setParallelism(1)
          ret.setMaxParallelism(1)
        }
    
        val selector = KeySelectorUtil.getRowDataSelector(uniqueKeys, rowTypeInfo)
        ret.setStateKeySelector(selector)
        ret.setStateKeyType(selector.getProducedType)
        ret
      }
    
    • 到这里已经很明朗了,就是 ProcTimeDeduplicateKeepLastRowFunction 搞的名堂 感觉这个 upsert 本身没有太大的关系喽,你说你是不是披着狼皮的羊,害我在upsert kafka 代码找了好久。

    ProcTimeDeduplicateKeepLastRowFunction 源码解析

    package org.apache.flink.table.runtime.operators.deduplicate;
    
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
    import org.apache.flink.util.Collector;
    
    import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog;
    import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime;
    
    /**
     * This function is used to deduplicate on keys and keeps only last row.
     */
    public class ProcTimeDeduplicateKeepLastRowFunction
            extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {
    
        private static final long serialVersionUID = -291348892087180350L;
        private final boolean generateUpdateBefore;
        private final boolean generateInsert;
        private final boolean inputIsInsertOnly;
    
        public ProcTimeDeduplicateKeepLastRowFunction(
                InternalTypeInfo<RowData> typeInfo,
                long stateRetentionTime,
                boolean generateUpdateBefore,
                boolean generateInsert,
                boolean inputInsertOnly) {
            super(typeInfo, null, stateRetentionTime);
            this.generateUpdateBefore = generateUpdateBefore;
            this.generateInsert = generateInsert;
            this.inputIsInsertOnly = inputInsertOnly;
        }
    
        @Override
        public void processElement(RowData input, Context ctx, Collector<RowData> out) throws Exception {
            if (inputIsInsertOnly) {
                processLastRowOnProcTime(input, generateUpdateBefore, generateInsert, state, out);
            } else {
                // 重点看这里 父类方法
                processLastRowOnChangelog(input, generateUpdateBefore, state, out);
            }
        }
    }
    
    • 好激动直接上源码 RowKind 的变化千呼万唤始出来 注释等很清晰了自己看吧
    static void processLastRowOnChangelog(
                RowData currentRow,
                boolean generateUpdateBefore,
                ValueState<RowData> state,
                Collector<RowData> out) throws Exception {
            RowData preRow = state.value();
            RowKind currentKind = currentRow.getRowKind();
            if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {
                if (preRow == null) {
                    // the first row, send INSERT message
                    currentRow.setRowKind(RowKind.INSERT);
                    out.collect(currentRow);
                } else {
                    if (generateUpdateBefore) {
                        preRow.setRowKind(RowKind.UPDATE_BEFORE);
                        out.collect(preRow);
                    }
                    currentRow.setRowKind(RowKind.UPDATE_AFTER);
                    out.collect(currentRow);
                }
                // normalize row kind
                currentRow.setRowKind(RowKind.INSERT);
                // save to state
                state.update(currentRow);
            } else {
                // DELETE or UPDATER_BEFORE
                if (preRow != null) {
                    // always set to DELETE because this row has been removed
                    // even the the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.
                    preRow.setRowKind(RowKind.DELETE);
                    // output the preRow instead of currentRow,
                    // because preRow always contains the full content.
                    // currentRow may only contain key parts (e.g. Kafka tombstone records).
                    out.collect(preRow);
                    // clear state as the row has been removed
                    state.clear();
                }
                // nothing to do if removing a non-existed row
            }
        }
    
    • 上面方法会 从 state 中拿旧数据,还会更新回新数据,再看一眼 state 是个啥玩意
      他是一个来自父类 DeduplicateFunctionBase 的 ValueState
    abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> {
    
        private static final long serialVersionUID = 1L;
    
        // the TypeInformation of the values in the state.
        protected final TypeInformation<T> typeInfo;
        protected final long stateRetentionTime;
        protected final TypeSerializer<OUT> serializer;
        // state stores previous message under the key.
        protected ValueState<T> state;
    
        public DeduplicateFunctionBase(
                TypeInformation<T> typeInfo,
                TypeSerializer<OUT> serializer,
                long stateRetentionTime) {
            this.typeInfo = typeInfo;
            this.stateRetentionTime = stateRetentionTime;
            this.serializer = serializer;
        }
    
        @Override
        public void open(Configuration configure) throws Exception {
            super.open(configure);
            ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo);
            StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
            if (ttlConfig.isEnabled()) {
                stateDesc.enableTimeToLive(ttlConfig);
            }
            state = getRuntimeContext().getState(stateDesc);
        }
    }
    

    先到此结束,后面再谈源码改造

    • kafka 消息需要有 key. (至少社区大佬给的 demo 就是要携带 key)然而我们 kafka 生产端很多场景都是忽略了 key 关注的是 value. 这感觉限制了 upsert kafka 的使用,因为不太可能为了使用 upsert kafka 而让 kafka 生产端进行消息改造,这很不友好.
    • upsert kafka 默认写死从 earilest 开始消费,并且貌似没有开放其他的 消费位置设置,这简直就是灾难,你能忍我不能忍
      我会对以上两点做出源码改造

    相关文章

      网友评论

          本文标题:flink sql upsert kafka 源码解读 你就是一

          本文链接:https://www.haomeiwen.com/subject/ncotdltx.html