Flink是如何从kafka中拉取数据的

作者: shengjk1 | 来源:发表于2019-04-07 12:23 被阅读7次

首先来看一下 FlinkKafkaConsumerBase.run方法,相当于是Flink 从kafka中拉取数据的入口方法:

//入口方法 start a source
    public void run(SourceContext<T> sourceContext) throws Exception {
        ......
        // from this point forward:
        //   - 'snapshotState' will draw offsets from the fetcher,
        //     instead of being built from `subscribedPartitionsToStartOffsets`
        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
        //     Kafka through the fetcher, if configured to do so)
        //创建Fetcher 从kafka中拉取数据
        this.kafkaFetcher = createFetcher(
                sourceContext,
                subscribedPartitionsToStartOffsets,
                periodicWatermarkAssigner,
                punctuatedWatermarkAssigner,
                (StreamingRuntimeContext) getRuntimeContext(),
                offsetCommitMode,
                getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                useMetrics);

        if (!running) {
            return;
        }

        // depending on whether we were restored with the current state version (1.3),
        // remaining logic branches off into 2 paths:
        //  1) New state - partition discovery loop executed as separate thread, with this
        //                 thread running the main fetcher loop
        //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
        
        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
        //未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
            kafkaFetcher.runFetchLoop();
        } else {
//仍然调用了kafkaFetcher.runFetchLoop();
            runWithPartitionDiscovery();
        }
    }

createFetcher方法

@Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
......

        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }

返回了一个 KafkaFetcher对象,我们点进去看一下
KafkaFetcher的构造器里面创建了一个 KafkaConsumerThread对象

public KafkaFetcher(
        SourceFunction.SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        ProcessingTimeService processingTimeProvider,
        long autoWatermarkInterval,
        ClassLoader userCodeClassLoader,
        String taskNameWithSubtasks,
        KafkaDeserializationSchema<T> deserializer,
        Properties kafkaProperties,
        long pollTimeout,
        MetricGroup subtaskMetricGroup,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
......
        this.consumerThread = new KafkaConsumerThread(
            LOG,
//KafkaConsumerThread 构造器中的参数
            handover,
            kafkaProperties,
//unassignedPartitionsQueue具体是什么呢?咱们会在flink startupMode是如何起作用的 详细去讲
            unassignedPartitionsQueue,
            getFetcherName() + " for " + taskNameWithSubtasks,
            pollTimeout,
            useMetrics,
            consumerMetricGroup,
            subtaskMetricGroup);
    }

至此为止createFetch就介绍完了,也可以看作是拉取数据的准备工作,接下来看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式开始从kafka中拉取message

//fetcher message from kafka
    public void runFetchLoop() throws Exception {
        try {
//KafkaConsumerThread构造的参数之一
            final Handover handover = this.handover;

            // kick off the actual Kafka consumer
      //实际的从kafka中拉取数据的地方 
            consumerThread.start();

            while (running) {
                // this blocks until we get the next records
                // it automatically re-throws exceptions encountered in the consumer thread
//从handover中获取数据,然后对records进行处理
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

                // get the records for each topic partition
                for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                        records.records(partition.getKafkaPartitionHandle());

                    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
                        final T value = deserializer.deserialize(record);

                        if (deserializer.isEndOfStream(value)) {
                            // end of stream signaled
                            running = false;
                            break;
                        }

                        // emit the actual record. this also updates offset state atomically
                        // and deals with timestamps and watermark generation
//发送消息,接下来就是timestamps和watermark的处理了
                        emitRecord(value, partition, record.offset(), record);
                    }
                }
            }
        }
        finally {
            // this signals the consumer thread that no more work is to be done
            consumerThread.shutdown();
        }

        // on a clean exit, wait for the runner thread
        try {
            consumerThread.join();
        }
        catch (InterruptedException e) {
            // may be the result of a wake-up interruption after an exception.
            // we ignore this here and only restore the interruption state
            Thread.currentThread().interrupt();
        }
    }

既然consumerThread.start()开始了实际的kafka consumer,我们一起来看一下consumerThread中的方法

@Override
    public void run() {
        // early exit check
        if (!running) {
            return;
        }

        // this is the means to talk to FlinkKafkaConsumer's main thread
//构造器中参数,用于FlinkKafkaConsumer主线程进行消费,上面提到的handover.pollNext()
        final Handover handover = this.handover;

        // This method initializes the KafkaConsumer and guarantees it is torn down properly.
        // This is important, because the consumer has multi-threading issues,
        // including concurrent 'close()' calls.
        try {
//获取consumer
            this.consumer = getConsumer(kafkaProperties);
        }
        catch (Throwable t) {
            handover.reportError(t);
            return;
        }

        // from here on, the consumer is guaranteed to be closed properly
        ......

            // early exit check
            if (!running) {
                return;
            }

            // the latest bulk of records. May carry across the loop if the thread is woken up
            // from blocking on the handover
            ConsumerRecords<byte[], byte[]> records = null;

            // reused variable to hold found unassigned new partitions.
            // found partitions are not carried across loops using this variable;
            // they are carried across via re-adding them to the unassigned partitions queue
            List<KafkaTopicPartitionState<TopicPartition>> newPartitions;

            // main fetch loop
            while (running) {

                // check if there is something to commit
                //default false
                if (!commitInProgress) {
                    // get and reset the work-to-be committed, so we don't repeatedly commit the same
//这里具体可以参考[Flink是如何保存Offset的](https://www.jianshu.com/p/ee4fe63f0182)
                    final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
                            nextOffsetsToCommit.getAndSet(null);

                    if (commitOffsetsAndCallback != null) {
                        log.debug("Sending async offset commit request to Kafka broker");

                        // also record that a commit is already in progress
                        // the order here matters! first set the flag, then send the commit command.
                        commitInProgress = true;
                        consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
                    }
                }

                try {
                    //hasAssignedPartitions default false
                    //当发现新的partition的时候,会add到unassignedPartitionsQueue和sub
//具体可以参考 flink startupMode是如何起作用的
                    if (hasAssignedPartitions) {
                        newPartitions = unassignedPartitionsQueue.pollBatch();
                    }
                    else {
                        // if no assigned partitions block until we get at least one
                        // instead of hot spinning this loop. We rely on a fact that
                        // unassignedPartitionsQueue will be closed on a shutdown, so
                        // we don't block indefinitely
                        newPartitions = unassignedPartitionsQueue.getBatchBlocking();
                    }
                    if (newPartitions != null) {
                        reassignPartitions(newPartitions);
                    }
                } catch (AbortedReassignmentException e) {
                    continue;
                }

                if (!hasAssignedPartitions) {
                    // Without assigned partitions KafkaConsumer.poll will throw an exception
                    continue;
                }

                // get the next batch of records, unless we did not manage to hand the old batch over
                if (records == null) {
                    try {
//通过kafkaAPI 拉取数据
                        records = consumer.poll(pollTimeout);
                    }
                    catch (WakeupException we) {
                        continue;
                    }
                }

                try {
//handover对records进行"包装",供FlinkKafkaConsumer主线程消费
                    handover.produce(records);
                    records = null;
                }
                catch (Handover.WakeupException e) {
                    // fall through the loop
                }
            }
            // end main fetch loop
        }
        catch (Throwable t) {
            // let the main thread know and exit
            // it may be that this exception comes because the main thread closed the handover, in
            // which case the below reporting is irrelevant, but does not hurt either
            handover.reportError(t);
        }
        finally {
            // make sure the handover is closed if it is not already closed or has an error
            handover.close();

            // make sure the KafkaConsumer is closed
            try {
                consumer.close();
            }
            catch (Throwable t) {
                log.warn("Error while closing Kafka consumer", t);
            }
        }
    }

至此如何从kafka中拉取数据,已经介绍完了

相关文章

网友评论

    本文标题:Flink是如何从kafka中拉取数据的

    本文链接:https://www.haomeiwen.com/subject/dayybqtx.html