美文网首页大数据Flink精选学习
【Flink 精选】Kafka Consumer 源码详解

【Flink 精选】Kafka Consumer 源码详解

作者: 熊本极客 | 来源:发表于2020-11-03 17:54 被阅读0次

    本文首先进行 Flink Kafka Consumer 原理分析,结合 SourceFunction 和 Kafka Client API 详解源码。


    1.Flink Kafka Consumer 原理

    本文基于 flink-1.11 分析 Kafka Consumer 原理。

    FlinkKafkaConsumer 主要是继承基类 RichParallelSourceFunction,不但可以执行 run(...) 方法读取数据,而且拥有状态、metric 和多并发等功能。

    1.1 RichParallelSourceFunction 分析

    RichParallelSourceFunction 与父类的继承关系,如下图所示。一方面,RichParallelSourceFunction 间接实现接口 SourceFunction,可以执行 run(...) 方法读取数据;另一方面,RichParallelSourceFunction 间接实现接口 RichFunction,拥有状态、metric 和多并发等功能。因此,RichParallelSourceFunction 是有状态的和多并发的 Source 基类

    ParallelSourceFunction 是接口 SourceFunction 的子类。共同点是 Source 的基类,需要实现 run() 读取数据。不同点是前者提供多并发的能力,后者的并发度只能为 1;
    AbstractRichFunction 是接口 RichFunction 的实现类,可以提供 open() 方法获取 RuntimeContext,而 RuntimeContext 拥有 metric、subtasks 信息、accumulator、state 等功能;

    RichParallelSourceFunction继承图.jpg

    1.2 Flink Kafka Consumer 流程分析

    如下图所示,Flink Kafka Consumer 流程主要分为 ①主线程循环获取缓存数据,发送到下游;②消费线程循环消费 Kafka 数据,保存到缓存

    Handover.next:Handover 类的 next 属性,即 ConsumerRecords 类型的缓存数据。Handover 的主要作用是协调主线程和消费线程,有序地消费 Kafka 和发送数据到下游算子

    Flink Kafka Consumer流程图.JPG

    (1)主线程

    主线程获取缓存的 Handover.next 对象即 ConsumerRecords,发送到下游算子。首先创建 KafkaFetcher,同时内部创建消费线程 KafkaConsumerThread。然后,调用 KafkaFetcher.runFetchLoop() 方法,启动消费线程、循环获取缓存数据;最后,根据分区往下游发送数据

    (2)消费线程

    消费线程 KafkaConsumerThread 主要循环消费 Kafka 数据,保存到缓存。首先,主线程启动消费线程。接着,KafkaConsumer 从 Kafka Broker 循环 poll 数据,同时保持到缓存中。

    2.Flink Kafka Consumer 源码详解

    问题1:如何使用 FlinkKafkaConsumer ?如何直接使用 KafkaClient API ?

    
    /**
    * 示例1:  Flink DataStream API 使用 FlinkKafkaConsumer 
    **/
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //SimpleStringSchema为数据字段解析类
    env.addSource(new FlinkKafkaConsumer<>("eventTopic", new SimpleStringSchema(), properties)
    
    
    /**
    * 示例2:  KafkaClient API 直接使用 KafkaConsumer 
    **/
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    consumer.poll(Duration.ofMillis(100));
    
    

    问题2:FlinkKafkaConsumer 内部是如何使用 KafkaClient API ?

    ① 初始化

    执行 env.addSource 的时候会创建 StreamSource 算子对象;StreamSource 构造函数中将 function 即 FlinkKafkaConsumer 对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量;

    StreamExecutionEnvironment源码:

        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
            // 省略...
            // function 即 FlinkKafkaConsumer 
            final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
            // 省略...
        }
    

    AbstractUdfStreamOperator源码:

        // userFunction 即 FlinkKafkaConsumer 
        public AbstractUdfStreamOperator(F userFunction) {
            this.userFunction = requireNonNull(userFunction);
            checkUdfCheckpointingPreconditions();
        }
    

    ② Task 启动和运行

    Task 实现 Java多线程接口 Runnable。Task 启动后,函数调用链如下 Task.run() -> Task.doRun() -> StreamTask.invoke() -> StreamTask.runMailboxLoop() -> MailboxProcessor.runMailboxLoop() -> MailboxProcessor.runMailboxStep() -> SourceStreamTask .processInput()。processInput() 方法里面启动线程 sourceThread.start()。上述的关键源码,如下所示。

    StreamTask 源码如下:

        @Override
        public final void invoke() throws Exception {
                // 省略...
                // 调用 MailboxProcessor.runMailboxLoop()
                runMailboxLoop();
                // 省略...
        }
    

    MailboxProcessor 源码如下:

        public void runMailboxLoop() throws Exception {
            // 省略...
            // 循环执行 runMailboxStep
            while (runMailboxStep(localMailbox, defaultActionContext)) {
            }
        }
    
        private boolean runMailboxStep(TaskMailbox localMailbox, MailboxController defaultActionContext) throws Exception {
            if (processMail(localMailbox)) {
                // 执行 mailboxDefaultAction.runDefaultAction,即执行 SourceStreamTask .processInput()
                mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
                return true;
            }
            return false;
        }
    
    

    SourceStreamTask 源码如下:

        @Override
        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            // 由于目前没有输入,TaskMailbox 先暂停 loop 主线程
            controller.suspendDefaultAction();
    
            sourceThread.setTaskDescription(getName());
            sourceThread.start();
            // 省略...
        }
    
        private class LegacySourceFunctionThread extends Thread {
            // 省略...
            @Override
            public void run() {
                try {
                    // 执行 source function 的 run() 方法
                    mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
                    completionFuture.complete(null);
                } catch (Throwable t) {
                    completionFuture.completeExceptionally(t);
                }
            }
            // 省略...
        }
    

    ③ 消费 Kafka

    FlinkKafkaConsumerBase 间接实现了 SourceFunction 接口,主要实现 run() 方法。然后,在 run() 方法创建了一个 KafkaFetcher 对象,并主要调用 KafkaFetcher.runFetchLoop()。最终,运行消费线程 KafkaConsumerThread,并 while 循环地 poll Kafka 数据。上述的关键源码,如下所示。

    FlinkKafkaConsumerBase 源码如下:

        @Override
        public void run(SourceContext<T> sourceContext) throws Exception {
            // 省略...
            // 创建 KafkaFetcher 对象 
            this.kafkaFetcher = createFetcher(
                    sourceContext,
                    subscribedPartitionsToStartOffsets,
                    watermarkStrategy,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode,
                    getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                    useMetrics);
    
            // 省略...
            // kafkaFetcher 执行 runFetchLoop(),即循环消费数据
            kafkaFetcher.runFetchLoop();
            // 省略...
        }
    

    KafkaFetcher 源码如下:

        @Override
        public void runFetchLoop() throws Exception {
            try {
                // 启动消费线程 KafkaConsumerThread 
                consumerThread.start();
    
                while (running) {
                    // 获取协调者 Handover 的 next 缓存值 
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // 从partition 获取 数据
                    for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
                        // 向下游发送数据
    
                        partitionConsumerRecordsHandler(partitionRecords, partition);
                    }
                }
            }
            finally {
                consumerThread.shutdown();
            }
    

    KafkaConsumerThread 源码如下,run() 方法中创建 KafkaClient API 的 KafkaConsumer,并使用 KafkaConsumer.poll() 消费数据

    @Override
        public void run() {
            // 省略...
            // 从主线程获取的 handover 赋值给本地变量...
            final Handover handover = this.handover;
            // 省略...
            try {
                // 创建 KafkaConsumer
                this.consumer = getConsumer(kafkaProperties);
            }
            catch (Throwable t) {
                handover.reportError(t);
                return;
            }
                // 省略...
                ConsumerRecords<byte[], byte[]> records = null;
                // while 循环消费 Kafka
                while (running) {
                    // 省略...
                    if (records == null) {
                        try {
                            // KafkaConsumer poll 数据,即使用 KafkaClient API 的 KafkaConsumer 消费数据
                            records = consumer.poll(pollTimeout);
                        }
                        catch (WakeupException we) {
                            continue;
                        }
                    }
    
                    try {
                            // 把 Kafka 的数据保存在 Handover 的缓存中
                        handover.produce(records);
                        records = null;
                    }
                    // 省略...
                }
        }
    

    问题3:Handover 是如何协调消费线程和主线程,使得前者可以及时消费和保存数据,而后者也可以及时获取数据 ?

    Handover 的关键方法是 produce() 保存缓存数据 nextpollNext() 获取缓存数据 next,主要作用是在消费线程和主线程下,保证同一个缓存数据 next ,在同一时间内是不能既更新(写),也输出(读),即保证原子性操作 next。

    Handover 源码如下:

        /**
        * consumer 线程把 Kafka 数据保存到 next 
        **/
        public void produce(final ConsumerRecords<byte[], byte[]> element)
                throws InterruptedException, WakeupException, ClosedException {
    
            checkNotNull(element);
    
            synchronized (lock) {
                // 循环判断 next 是否为 null
                while (next != null && !wakeupProducer) {
                    // lock 会释放当前的锁,该 consumer 线程进入 waiting 状态
                    lock.wait();
                }
                // 省略...
                else if (error == null) {
                    // 写 next
                    next = element;
                    // 唤醒 lock(使得处于 waiting 状态的 main 线程能够继续执行)
                    lock.notifyAll();
                }
                // 省略...
            }
        }
    
        /**
        * main 线程读取 next 
        **/
        public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
            synchronized (lock) {
                // 循环判断 next 是否为 null
                while (next == null && error == null) {
                    // lock 会释放当前的锁,该 main 线程进入 waiting 状态
                    lock.wait();
                }
                // 读取 next
                ConsumerRecords<byte[], byte[]> n = next;
                if (n != null) {
                    next = null;
                    // 唤醒 lock(使得处于 waiting 状态的 consumer 线程能够继续执行)
                    lock.notifyAll();
                    return n;
                }
                // 省略...
            }
        }
    

    Java 多线程的等待/通知机制:Object 的 wait()、notify/notifyAll()
    ① 当线程执行 wait() 方法的时候,会释放当前的锁,然后让出CPU,进入等待状态
    ② 当线程执行 notify/notifyAll() 方法的时候,会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到 wait() ,再次释放锁。

    相关文章

      网友评论

        本文标题:【Flink 精选】Kafka Consumer 源码详解

        本文链接:https://www.haomeiwen.com/subject/jqrxvktx.html