Flink 源码之batch问题处理

作者: AlienPaul | 来源:发表于2022-01-25 11:01 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    Flink 批处理问题

    问题描述

    Flink 1.14.x版本BATCH模式执行会丢失数据。

    演示程序:

    //create env and tableEnv
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    //    env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    env.setParallelism(3)
    val tableEnv = StreamTableEnvironment.create(env)
    
    // make data ,6 line
    val resultDS2 = env.fromElements(
        Row.of("Alice"),
        Row.of("alice"),
        Row.of("Bob"),
        Row.of("lily"),
        Row.of("lily"),
        Row.of("lily")
    )(Types.ROW(Types.STRING))
    
    // dataStream[Row] --> Table --> sql to upper transform table
    val table = tableEnv.fromDataStream(resultDS2).as("word")
    tableEnv.createTemporaryView(s"tmp_table",table)
    val resultTable = tableEnv.sqlQuery(s" select UPPER(word)  as word from tmp_table ")
    
    // sql transformed table -->  DataStream[String]
    val resultDs = tableEnv.toDataStream(resultTable).map(row => {
        row.getField("word").asInstanceOf[String]
    })
    
    // keyby reduce
    val counts: DataStream[(String, Int)] = resultDs
    .map((_, 1))
    .keyBy(_._1)
    .sum(1)
    
    // print result
    counts.print()
    
    env.execute("WordCount")
    

    期待的输出为:

    (BOB,1)
    (ALICE,2)
    (LILY,3)
    

    输出只有

    (BOB,1)
    

    Alice和Lily不会被输出。

    如果修改并行度为1,那么输出会漏掉Lily。也就是说丢失的数据会随着并行度的不同而变化。

    问题调查

    将算子链一步步断开debug,发现直到keyBy算子的输出都正常,到了sum算子之后才出现数据丢失问题。接下来重点对sum算子背后逻辑进行调查。

    Flink 1.13.3 中调试

    Flink 1.13.3 中即便配置成env.setRuntimeMode(RuntimeExecutionMode.BATCH),实际执行sum算子的仍然是

    StreamGroupedReduceOperator。可见batch模式并没有启用,这是一个明显的bug。我们顺便看一下它的processElement方法代码:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        IN value = element.getValue();
        IN currentValue = values.value();
    
        if (currentValue != null) {
            // 如果当前元素不为null,说明这是reduce执行过程中
            // 执行reduce方法,上面的sum算子生成的reduce函数逻辑为求两个字段值之和
            IN reduced = userFunction.reduce(currentValue, value);
            // 更新保存状态
            values.update(reduced);
            // 将数据发往下游
            output.collect(element.replace(reduced));
        } else {
            // 这个分支是sum第一个元素的时候执行,第一个元素到来之前的currentValue为null
            // 直接将第一个元素的值作为初始值,更新到状态中
            values.update(value);
            // 将数据发往下游
            output.collect(element.replace(value));
        }
    }
    

    这种流式计算operator的特点是会输出中间结果,数据到来一次计算一次。如果将问题描述中的例子并行度设置为1

    env.setParallelism(1)
    

    输出结果可能为:

    (BOB,1)
    (ALICE,1)
    (ALICE,2)
    (LILY,1)
    (LILY,2)
    (LILY,3)
    

    Flink 1.14.2 中调试

    Flink 1.14.2 中执行sum算子的是BatchGroupedReduceOperator。和StreamGroupedReduceOperator设计上不同的地方是,BatchGroupedReduceOperator只会输出最终的结果,不输出中间结果。

    我们查看它的processElement方法:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // 获取当前元素的值
        IN value = element.getValue();
        // 获取当前缓存的值(上次计算的结果)
        // 注意,这个缓存的值的key和当前元素的key是一致的
        IN currentValue = values.value();
    
        // 如果缓存值为null,说明之前没有处理过和当前元素对应key相同的其他元素
        // 注册一个EventTime定时器,触发时刻为Long.MAX_VALUE(后面解释)
        // 该定时器负责触发和当前元素对应key相同的一系列元素
        // 每个不同的key对应一个timer
        // 这样key相同的数据计算结果只输出一个,这是和stream模式不同的地方
        if (currentValue == null) {
            // register a timer for emitting the result at the end when this is the
            // first input for this key
            timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.MAX_VALUE);
        } else {
            // otherwise, reduce things
            // 否则,只进行reduce计算(例子中为求和)
            value = userFunction.reduce(currentValue, value);
        }
        // 更新缓存的值为当前计算结果
        values.update(value);
    }
    

    通过上面分析可知,只在第一次接收到key为某个值的元素的时候,才会注册一个event time timer,触发时间为Long.MAX_VALUE。确保了key相同的元素只在最后输出一次(不像是Streaming模式,每次接收到数据都会输出)。

    接下来需要分析这个timer注册之后是怎么触发的。我们从OneInputStreamTask开始分析。

    public void emitRecord(StreamRecord<IN> record) throws Exception {
        // 流入数据计数器加一
        this.numRecordsIn.inc();
        // 调用operator的setKeyContextElement方法
        // 需要从record中抽取出key,然后传给state backend
        // 对于keyedStateBackend,每个key对应的缓存值是不同的
        this.operator.setKeyContextElement(record);
        // 处理这个record
        this.operator.processElement(record);
    }
    

    接着调用的是OneInputStreamOperatorsetKeyContextElement方法:

    @Override
    default void setKeyContextElement(StreamRecord<IN> record) throws Exception {
        setKeyContextElement1(record);
    }
    

    继续跟踪到AbstractStreamOperator

    public void setKeyContextElement1(StreamRecord record) throws Exception {
        setKeyContextElement(record, stateKeySelector1);
    }
    
    private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
        throws Exception {
        if (selector != null) {
            // 从record中抽取出key
            Object key = selector.getKey(record.getValue());
            // 调用setCurrentKey
            setCurrentKey(key);
        }
    }
    
    public void setCurrentKey(Object key) {
        // 调用stateHandler
        // StreamOperatorStateHandler是各种各样state backend的封装
        stateHandler.setCurrentKey(key);
    }
    

    接下来是StreamOperatorStateHandlersetCurrentKey方法:

    public void setCurrentKey(Object key) {
        if (keyedStateBackend != null) {
            try {
                // need to work around type restrictions
                @SuppressWarnings("rawtypes")
                CheckpointableKeyedStateBackend rawBackend = keyedStateBackend;
                // 调用原始状态后端的setCurrentKey方法
                rawBackend.setCurrentKey(key);
            } catch (Exception e) {
                throw new RuntimeException(
                    "Exception occurred while setting the current key context.", e);
            }
        }
    }
    

    经过debug我们发现这里rawBackend实际上是BatchExecutionKeyedStateBackend。我们查看它的setCurrentKey代码:

    @Override
    public void setCurrentKey(K newKey) {
        // 如果新的key值和之前处理的key值不同,才进行后面的逻辑
        if (!Objects.equals(newKey, currentKey)) {
            // 通知key变更
            notifyKeySelected(newKey);
            // 清除状态后端的值,因为之前的已经计算过了
            for (State value : states.values()) {
                ((AbstractBatchExecutionKeyState<?, ?, ?>) value).clearAllNamespaces();
            }
            // 清除key对应的所有内容
            for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
                while (value.poll() != null) {
                    // remove everything for the key
                }
            }
            // 设置当前key为新的key
            this.currentKey = newKey;
        }
    }
    

    通过分析可知,只有key改变的时候才会触发notifyKeySelected,这是一个关键点。接下来我们探究notifyKeySelected方法做了什么事。

    private void notifyKeySelected(K newKey) {
        // we prefer a for-loop over other iteration schemes for performance reasons here.
        for (KeySelectionListener<K> keySelectionListener : keySelectionListeners) {
            keySelectionListener.keySelected(newKey);
        }
    }
    

    这个方法通知所有的keySelectionListener。通过debug,keySelectionListeners只有一个,为BatchExecutionInternalTimeServiceManager。我们查看它的keySelected方法:

    @Override
    public void keySelected(K newKey) {
        try {
            for (BatchExecutionInternalTimeService<K, ?> value : timerServices.values()) {
                value.setCurrentKey(newKey);
            }
        } catch (Exception e) {
            throw new WrappingRuntimeException(e);
        }
    }
    

    它挨个调用所有BatchExecutionInternalTimeServicesetCurrentKey方法。debug时候timerServices只有一个对象。接下来需要分析的目标就明确了。查看BatchExecutionInternalTimeServicesetCurrentKey方法,代码如下:

    public void setCurrentKey(K currentKey) throws Exception {
        // 如果当前key(上次处理的key)为null,继续下面逻辑
        // 当前key不为null并且参数中的key和当前key不相同,继续下面逻辑
        // 其他情况,直接返回
        if (currentKey != null && currentKey.equals(this.currentKey)) {
            return;
        }
        // 设置当前watermark为Long.MAX_VALUE
        currentWatermark = Long.MAX_VALUE;
        InternalTimer<K, N> timer;
        // 调用所有eventTime定时器
        while ((timer = eventTimeTimersQueue.poll()) != null) {
            // 触发定时器目标的onEventTime方法
            triggerTarget.onEventTime(timer);
        }
        // 调用所有processingTime定时器
        // 这里并没有检查timer的触发时间(通过前面分析可知timer的触发时间为Long.MAX_VALUE)
        while ((timer = processingTimeTimersQueue.poll()) != null) {
            // 触发定时器目标的onProcessingTime方法
            triggerTarget.onProcessingTime(timer);
        }
        // 设置当前watermark为Long.MIN_VALUE
        currentWatermark = Long.MIN_VALUE;
        // 更新当前key
        this.currentKey = currentKey;
    }
    

    这里的triggerTarget为最早提到的BatchGroupedReduceOperator。我们看看onEventTime方法做了什么事情。

    @Override
    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        // 获取缓存的value
        IN currentValue = values.value();
        if (currentValue != null) {
            // 如果缓存的value不为null,输出这个value到下游
            output.collect(new StreamRecord<>(currentValue, Long.MAX_VALUE));
        }
    }
    

    流程走到这里,下游才能够接收到数据。但条件是接收到一个key不同的数据。分析到这里问题就显而易见了:最后一种不同key的数据永远没有机会输出。这就是数据丢失的原因。

    修复方式

    修复的方式其实很明确,在数据输入结束的时候,强制将缓存的value输出就可以了。我们需要在数据输入结束的时候通知BatchGroupedReduceOperator,然后调用onEventTime方法就可以了。问题来了,如何在输入结束的时候得到通知?

    我们需要用到BoundedOneInput接口。它用于单个输入的operator(例如OneInputStreamOperator),在数据输入结束的时候endInput方法会被调用。

    @PublicEvolving
    public interface BoundedOneInput {
        void endInput() throws Exception;
    }
    

    我们修改BatchGroupedReduceOperator类定义,实现BoundedOneInput接口,然后在endInput中调用onEventTime方法,即可完成修复。

    @Internal
    public class BatchGroupedReduceOperator<IN, KEY>
            extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
            implements OneInputStreamOperator<IN, IN>,
                    Triggerable<KEY, VoidNamespace>,
                    BoundedOneInput {
    // ...
                        
        @Override
        public void endInput() throws Exception {
            onEventTime(null);
        }
    }
    

    我们可以将修改完的代码重新编译,用输出的jar替换项目中的之后重新运行,无论怎么改变并行度,发现输出均正常。

    问题是解决了,我们顺便分析下为什么BoundedOntInputendInput方法会在数据输入结束的时候得到调用。

    我们从StreamTaskprocessInput方法开始分析。processInput方法负责处理输入的数据。

    protected void processInput(Controller controller) throws Exception {
        DataInputStatus status = this.inputProcessor.processInput();
        // ...
    }
    

    这里调用的inputProcessorprocessInput方法,代码如下:

    public DataInputStatus processInput() throws Exception {
        // input发送一个数据到output,获取数据发送状态
        DataInputStatus status = this.input.emitNext(this.output);
        // 如果发送状态为END_OF_DATA,表示数据输入结束
        if (status == DataInputStatus.END_OF_DATA) {
            // 调用endOfInputAware的endInput方法
            this.endOfInputAware.endInput(this.input.getInputIndex() + 1);
            this.output = new FinishedDataOutput();
        } else if (status == DataInputStatus.END_OF_RECOVERY) {
            if (this.input instanceof RecoverableStreamTaskInput) {
                this.input = ((RecoverableStreamTaskInput)this.input).finishRecovery();
            }
    
            return DataInputStatus.MORE_AVAILABLE;
        }
    
        return status;
    }
    

    经过debug我们知道endOfInputAwareRegularOperatorChain。我们查看它的endInput方法:

    public void endInput(int inputId) throws Exception {
        if (this.mainOperatorWrapper != null) {
            this.mainOperatorWrapper.endOperatorInput(inputId);
        }
    
    }
    

    它调用了operator包装器的endOperatorInput方法。继续跟踪。

    public void endOperatorInput(int inputId) throws Exception {
        if (this.wrapped instanceof BoundedOneInput) {
            ((BoundedOneInput)this.wrapped).endInput();
        } else if (this.wrapped instanceof BoundedMultiInput) {
            ((BoundedMultiInput)this.wrapped).endInput(inputId);
        }
    
    }
    

    上面方法中的wrapped正是BatchGroupedReduceOperator。这个方法会检测被包装的operator是否实现了BoundedOneInput接口,如果实现了就调用endInput方法。

    上面的分析为数据输入结束后operator得到通知的调用逻辑。

    即便问题已经解决,然而问题的根因真的是这样吗?下一章我们继续分析。

    进一步分析

    这一节进行进一步调查。经过社区大佬的提醒,我们使用纯DataStream API编写同样的程序,如下所示:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    
    env.setParallelism(3)
    val tableEnv = StreamTableEnvironment.create(env)
    
    val sourceStream = env.fromElements(
        "Alice",
        "Alice",
        "lily",
        "Bob",
        "lily",
        "lily"
    )
    
    val counts: DataStream[(String, Int)] = sourceStream
    .map((_, 1))
    .keyBy(_._1)
    .sum(1)
    
    counts.print()
    
    env.execute("WordCount")
    

    执行这个程序,我们惊奇的发现,结果是正常的。问题真实触发的原因是数据流从Table API转换为DataStream API的时候才会有问题。从这里我们推断,问题也许出现在上游Table处理相关的operator中。

    使用纯DataStream API的时候结果是正常的,说明keySelected方法在程序结束的时候的确得到了调用。在继续分析Table相关operator前我们温热下BatchExecutionInternalTimeServiceManager,看看它哪里还会调用到keySelected方法。果然我们找到了advanceWatermark方法,如下所示:

    @Override
    public void advanceWatermark(Watermark watermark) {
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            keySelected(null);
        }
    }
    

    它检查watermarktimestamp,如果为Long.MAX_VALUE调用keySelected(null)。通过前面的分析我们已经知道:只在第一次接收到key为某个值的元素的时候,才会注册一个event time timer,触发时间为Long.MAX_VALUE。所以我们猜测,纯DataStream API正常情况下在程序结束的时候会调用advanceWatermark方法。

    我们找到StreamTaskendData方法,这个方法在输入数据结束的时候调用,它又间接调用了advanceToEndOfEventTime,如下所示。

    protected void endData(StopMode mode) throws Exception {
    
        if (mode == StopMode.DRAIN) {
            advanceToEndOfEventTime();
        }
        // finish all operators in a chain effect way
        operatorChain.finishOperators(actionExecutor, mode);
        this.finishedOperators = true;
    
        for (ResultPartitionWriter partitionWriter : getEnvironment().getAllWriters()) {
            partitionWriter.notifyEndOfData(mode);
        }
    
        this.endOfDataReceived = true;
    }
    

    我们继续查看advanceToEndOfEventTime方法。这个方法的实现在它的子类中。我们查看下子类SourceStreamTaskadvanceToEndOfEventTime方法:

    @Override
    protected void advanceToEndOfEventTime() throws Exception {
        operatorChain.getMainOperatorOutput().emitWatermark(Watermark.MAX_WATERMARK);
    }
    

    发现它为下游算子发送了一个时间戳为Long.MAX_VALUEwatermark。这点就和上面为什么纯DataStream API下能够正常运行对应上了。正常情况下Flink作业结束的时候,上游会发送一个时间戳为Long.MAX_VALUEwatermark到下游。

    接下来我们开始debug Table API 向DataStream API转换的这种异常情形。我们发现,它的上游operator为InputConversionOperator。它应该也有一个processWatermark方法。查看代码如下所示:

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (propagateWatermark) {
            super.processWatermark(mark);
        }
    }
    

    发现如果propagateWatermark变量为false,watermark会被忽略。经过debug发现propagateWatermark真的为false。这是才是数据丢失的根本原因。

    综上分析,我们修改代码如下:

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
            super.processWatermark(mark);
        }
    }
    

    确保它一定会将时间戳为Long.MAX_VALUEwatermark发往下游。修改完毕后重新编译测试,问题成功解决。目前该问题修复已被社区采纳。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之batch问题处理

        本文链接:https://www.haomeiwen.com/subject/snkwhrtx.html