美文网首页Flink源码解析
一文搞定 Flink 消费消息的全流程

一文搞定 Flink 消费消息的全流程

作者: shengjk1 | 来源:发表于2020-06-13 18:04 被阅读0次

    我们以下面代码为例:

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("canal_monitor_order_astable", new SimpleStringSchema(), properties);
            consumer.setStartFromEarliest();
            
    env.addSource(consumer).flatMap(...).print()
    

    当 Flink 程序启动,leader、blobServer 等都创建完毕,当 ExecutionGraph 构建完成,提交成功之后。就到了,task 正式执行的阶段了。这个时候,一条消息是如何流转的呢?
    首先,进入了 Task 的 run 方法

    ......
    /*
                这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的
                 */
                // run the invokable
                invokable.invoke();
    ......
    

    然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方

    ......
    run();
    .....
    

    最为关键的就是 run 方法。
    进入 SourceStreamTask run 方法

    @Override
        // source task 获取数据的入口方法
        protected void run() throws Exception {
            headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
        }
    

    继续追踪就到了 StreamSource 的 run 方法

    ......
        // 生成上下文之后,接下来就是把上下文交给 SourceFunction 去执行,用户自定义的 run 方法开始正式运行
                userFunction.run(ctx);
    ......
    

    此处的 userFunction 实际上就是 FlinkKafkaConsumer
    具体是如何消费消息的可以参考
    写给大忙人看的Flink 消费 Kafka
    彻底搞懂 Flink Kafka OffsetState 存储
    继续追踪到 RecordWriter

    private void emit(T record, int targetChannel) throws IOException, InterruptedException {
            // 最底层的抽象是 MemorySegment,用于数据传输的是 Buffer,将 java 对象转化为 buffer 是这个
            // Flink 把对象调用该对象所属的序列化器序列化为字节数组
            serializer.serializeRecord(record);
            
            if (copyFromSerializerToTargetChannel(targetChannel)) {
                serializer.prune();
            }
        }
    

    RecordWriter 还是比较有意思的,RecordWriter 主要就是把 java 对象转化为 byte 数组( 也就是 flink 自己管理内存,不借助与 JVM )。而后面的传输也是基于 byte 数组的。

    copyFromSerializerToTargetChannel 会将 byte 数据 flush 到 相应的 targetChannel ( targetChannel 对于下游来说就是 InputChannel 具体可以参考一下 Flink反压机制 )
    底层通过 netty 进行数据的传送,传送至 PartitionRequestQueue

    ......
    if (cause != null) {
                            ErrorResponse msg = new ErrorResponse(
                                new ProducerFailedException(cause),
                                reader.getReceiverId());
    
                            // 真正往 netty 的 nio 通道里写入.
                            // 在这里,写入的是一个 RemoteInputChannel,对应的就是下游节点的 InputGate 的 channels。
                            ctx.writeAndFlush(msg);
                        }
    ......
    

    这个时候,这条数据就进入了下游的 InputChannel 。
    有写得需要有读,进入到 CreditBasedPartitionRequestClientHandler

    // nio 通道的另一端( 下游 )需要读入 buffer
        // 上游的算子写入,下游的算子读取,这也是反压的原理
        // 为什么叫 decodeMsg,主要上游传过来的是 byte 数组,这个将 byte 数组 转化为 record
        private void decodeMsg(Object msg) throws Throwable {
            final Class<?> msgClazz = msg.getClass();
    
            // ---- Buffer --------------------------------------------------------
            if (msgClazz == NettyMessage.BufferResponse.class) {
                NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
    
                RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
                if (inputChannel == null) {
                    bufferOrEvent.releaseBuffer();
    
                    cancelRequestFor(bufferOrEvent.receiverId);
    
                    return;
                }
    
                decodeBufferOrEvent(inputChannel, bufferOrEvent);
    
            } else if (msgClazz == NettyMessage.ErrorResponse.class) {
                // ---- Error ---------------------------------------------------------
                NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg;
    
                SocketAddress remoteAddr = ctx.channel().remoteAddress();
    
                if (error.isFatalError()) {
                    notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
                        "Fatal error at remote task manager '" + remoteAddr + "'.",
                        remoteAddr,
                        error.cause));
                } else {
                    RemoteInputChannel inputChannel = inputChannels.get(error.receiverId);
    
                    if (inputChannel != null) {
                        if (error.cause.getClass() == PartitionNotFoundException.class) {
                            inputChannel.onFailedPartitionRequest();
                        } else {
                            inputChannel.onError(new RemoteTransportException(
                                "Error at remote task manager '" + remoteAddr + "'.",
                                remoteAddr,
                                error.cause));
                        }
                    }
                }
            } else {
                throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
            }
        }
    

    至此呢,就该下游算子 flapMap 运行处理了。(当然啦,实际上应该是先 print 对应的 task 运行,然后 flatMap 对应的 task 运行,最后才是 source 对应的 task 运行 )。

    我们得回到 Task 的 run 方法

    ......
    /*
                这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的
                 */
                // run the invokable
                invokable.invoke();
    ......
    

    然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方

    ......
    run();
    .....
    

    最为关键的就是 run 方法。
    这次调用的是 flatMap 对应 task 的 run 方法,所以进入 OneInputStreamTask

        @Override
        protected void run() throws Exception {
            // cache processor reference on the stack, to make the code more JIT friendly
            final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
            //处理输入的消息
            while (running && inputProcessor.processInput()) {
                // all the work happens in the "processInput" method
            }
        }
    

    进入 processInput 方法

    //            程序首先获取下一个 buffer
                // 主要是尝试获取 buffer,然后赋值给当前的反序列化器
                // 处理 barrier 的逻辑,被包含在了getNextNonBlocked 中
                final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
                if (bufferOrEvent != null) {
                    if (bufferOrEvent.isBuffer()) {
                        currentChannel = bufferOrEvent.getChannelIndex();
                        currentRecordDeserializer = recordDeserializers[currentChannel];
                        currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                    }
                    else {
                        // Event received
                        final AbstractEvent event = bufferOrEvent.getEvent();
                        if (event.getClass() != EndOfPartitionEvent.class) {
                            throw new IOException("Unexpected event: " + event);
                        }
                    }
                }
    

    获取到 buffer 之后

    // 这里就是真正的,用户的代码即将被执行的地方
                            // now we can do the actual processing
                            StreamRecord<IN> record = recordOrMark.asRecord();
                            synchronized (lock) {
                                numRecordsIn.inc();
                                //set KeyContext setCurrentKey
                                streamOperator.setKeyContextElement1(record);
                                streamOperator.processElement(record);
                            }
                            return true;
    

    交给 flatMap 去处理。处理完了之后就又把数据发往 RecordWriter 的 emit 然后就这样反复执行,直到最后一个 operator ,这个消息也就消费完毕了。当然了,这仅仅是跨 taskManager 的消息流程,同一个 taskMananger 的消息流程就很简单了,就是简单的消息传递,不需要序列化成 byte 数组

    总结

    整体流程
    在这里插入图片描述

    1. 第一步必然是准备一个ResultPartition;

    1. 通知JobMaster;
    2. JobMaster通知下游节点;如果下游节点尚未部署,则部署之;
    3. 下游节点向上游请求数据
    4. 开始传输数据
    数据跨 task 传输
    在这里插入图片描述
    1. 数据在本operator处理完后,交给RecordWriter。每条记录都要选择一个下游节点,所以要经过ChannelSelector。
    2. 每个channel都有一个serializer(我认为这应该是为了避免多线程写的麻烦),把这条Record序列化为ByteBuffer
    3. 接下来数据被写入ResultPartition下的各个subPartition里,此时该数据已经存入DirectBuffer(MemorySegment)
    4. 单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入
    5. 对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel
    6. 有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码

    相关文章

      网友评论

        本文标题:一文搞定 Flink 消费消息的全流程

        本文链接:https://www.haomeiwen.com/subject/uzdztktx.html