美文网首页
Flink消费Kafka

Flink消费Kafka

作者: 天之見證 | 来源:发表于2019-09-29 22:49 被阅读0次

FlinkKafkaConsumer09 为例

1. 从外到内的调用顺序如下:

序号 代码
1 StreamExecutionEnvironment#addSource(consumer)
2 new FlinkKafkaConsumer09[String](topic, msgSchema, consumerProperties)
3 kafkaFetcher.runFetchLoop()
4 Handover#pollNext()
5 KafkaConsumer#poll(pollTimeout)

2. 核心代码

由于多了个Handover 对consumer的代理所以代码看起来比较复杂

2.1 HandoverKafkaConsumerThread

Kafka09Fetcher 中初始化变量

this.handover = new Handover();

this.consumerThread = new KafkaConsumerThread(
   LOG,
   handover,
   kafkaProperties,
   unassignedPartitionsQueue,
   createCallBridge(),
   getFetcherName() + " for " + taskNameWithSubtasks,
   pollTimeout,
   useMetrics,
   consumerMetricGroup,
   subtaskMetricGroup);

获取数据部分:

@Override
public void runFetchLoop() throws Exception {
  try {
    final Handover handover = this.handover;

    // kick off the actual Kafka consumer
    consumerThread.start();

    while (running) {
      // this blocks until we get the next records
      // it automatically re-throws exceptions encountered in the consumer thread
      final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

      // get the records for each topic partition
      for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
        // 取出单个partition的数据
        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
            records.records(partition.getKafkaPartitionHandle());

        for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
          final T value = deserializer.deserialize(
              record.key(), record.value(),
              record.topic(), record.partition(), record.offset());

          if (deserializer.isEndOfStream(value)) {
            // end of stream signaled
            running = false;
            break;
          }

          // emit the actual record. this also updates offset state atomically
          // and deals with timestamps and watermark generation
          emitRecord(value, partition, record.offset(), record);
        }
      }
    }
  }
  finally {
    // this signals the consumer thread that no more work is to be done
    consumerThread.shutdown();
  }

  // on a clean exit, wait for the runner thread
  try {
    consumerThread.join();
  }
  catch (InterruptedException e) {
    // may be the result of a wake-up interruption after an exception.
    // we ignore this here and only restore the interruption state
    Thread.currentThread().interrupt();
  }
}

从上面可以看出外层启动了一个consumerThread.start() 真正的kafka的consumer来接收数据, 而数据的获取又用 handover.pollNext() 代理了下

具体Handover的实现如下(实现了一个类似单元素的blocking queue):

public final class Handover implements Closeable {

  private final Object lock = new Object();

  private ConsumerRecords<byte[], byte[]> next;
  private Throwable error;
  private boolean wakeupProducer;

  @Nonnull
  public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
    synchronized (lock) {
      while (next == null && error == null) {
        lock.wait();
      }

      ConsumerRecords<byte[], byte[]> n = next;
      if (n != null) {
        next = null;
        lock.notifyAll();
        return n;
      }
      else {
        ExceptionUtils.rethrowException(error, error.getMessage());

        // this statement cannot be reached since the above method always throws an exception
        // this is only here to silence the compiler and any warnings
        return ConsumerRecords.empty();
      }
    }
  }

  public void produce(final ConsumerRecords<byte[], byte[]> element)
      throws InterruptedException, WakeupException, ClosedException {

    checkNotNull(element);

    synchronized (lock) {
      while (next != null && !wakeupProducer) {
        lock.wait();
      }

      wakeupProducer = false;

      // if there is still an element, we must have been woken up
      if (next != null) {
        throw new WakeupException();
      }
      // if there is no error, then this is open and can accept this element
      else if (error == null) {
        next = element;
        lock.notifyAll();
      }
      // an error marks this as closed for the producer
      else {
        throw new ClosedException();
      }
    }
  }
}

此时我们将Handover 当作一个队列的话, 就好理解它的作用了, 而它的使用方就是前面提到的KafkaConsumerThread

KafkaConsumerThread 中对于Handover的使用如下:

@Override
public void run() {
  // ...
  // this is the means to talk to FlinkKafkaConsumer's main thread
  final Handover handover = this.handover;

  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  }
  catch (Throwable t) {
    handover.reportError(t);
    return;
  }

  // from here on, the consumer is guaranteed to be closed properly
  try {
    // ....
    // the latest bulk of records. May carry across the loop if the thread is woken up
    // from blocking on the handover
    ConsumerRecords<byte[], byte[]> records = null;

    // reused variable to hold found unassigned new partitions.
    // found partitions are not carried across loops using this variable;
    // they are carried across via re-adding them to the unassigned partitions queue
    List<KafkaTopicPartitionState<TopicPartition>> newPartitions;

    // main fetch loop
    while (running) {

      // check if there is something to commit
      if (!commitInProgress) {
        // get and reset the work-to-be committed, so we don't repeatedly commit the same
        final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
            nextOffsetsToCommit.getAndSet(null);

        if (commitOffsetsAndCallback != null) {
          log.debug("Sending async offset commit request to Kafka broker");

          // also record that a commit is already in progress
          // the order here matters! first set the flag, then send the commit command.
          commitInProgress = true;
          consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
        }
      }

      try {
        if (hasAssignedPartitions) {
          newPartitions = unassignedPartitionsQueue.pollBatch();
        }
        else {
          // if no assigned partitions block until we get at least one
          // instead of hot spinning this loop. We rely on a fact that
          // unassignedPartitionsQueue will be closed on a shutdown, so
          // we don't block indefinitely
          newPartitions = unassignedPartitionsQueue.getBatchBlocking();
        }
        if (newPartitions != null) {
          reassignPartitions(newPartitions);
        }
      } catch (AbortedReassignmentException e) {
        continue;
      }

      if (!hasAssignedPartitions) {
        // Without assigned partitions KafkaConsumer.poll will throw an exception
        continue;
      }

      // get the next batch of records, unless we did not manage to hand the old batch over
      if (records == null) {
        try {
          records = consumer.poll(pollTimeout);
        }
        catch (WakeupException we) {
          continue;
        }
      }

      try {
        handover.produce(records);
        records = null;
      }
      catch (Handover.WakeupException e) {
        // fall through the loop
      }
    }
    // end main fetch loop
  }
  catch (Throwable t) {
    // let the main thread know and exit
    // it may be that this exception comes because the main thread closed the handover, in
    // which case the below reporting is irrelevant, but does not hurt either
    handover.reportError(t);
  }
  finally {
    // make sure the handover is closed if it is not already closed or has an error
    handover.close();
    // make sure the KafkaConsumer is closed
    try {
      consumer.close();
    }
    catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t);
    }
  }
}

从上我们可以看出conumser拿到数据之后再交给handover去处理

3. Flink的checkpoint和kafka的offset提交

每次checkpoint的时候都会设置要提交kafka的offset, 再在消费数据代码里去异步提交offset

序号 代码
1 FlinkKafkaConsumerBase#notifyCheckpointComplete(long checkpointId)
2 AbstractFetcher#commitInternalOffsetsToKafka
3 KafkaConsumerThread#setOffsetsToCommit
4 nextOffsetsToCommit (上面的代码有设置改值的相关代码)
void setOffsetsToCommit(
    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
    @Nonnull KafkaCommitCallback commitCallback) {

  // record the work to be committed by the main consumer thread and make sure the consumer notices that
  if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
    log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
        "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
        "This does not compromise Flink's checkpoint integrity.");
  }

  // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
  handover.wakeupProducer();

  synchronized (consumerReassignmentLock) {
    if (consumer != null) {
      consumer.wakeup();
    } else {
      // the consumer is currently isolated for partition reassignment;
      // set this flag so that the wakeup state is restored once the reassignment is complete
      hasBufferedWakeup = true;
    }
  }
}

3.1 提交offset超时

提交offset超时指的是, 两次checkpoint的时候中,offset的提交还没有完成, 这个时候nextOffsetsToCommit 会被再次设置, 从而会造成之前offset提交会丢失的问题

相关文章

网友评论

      本文标题:Flink消费Kafka

      本文链接:https://www.haomeiwen.com/subject/pbgqpctx.html