美文网首页数客联盟
Flink kafka sink源码解析

Flink kafka sink源码解析

作者: Woople | 来源:发表于2019-11-03 17:47 被阅读0次

    初始化

    通常添加一个kafka sink的代码如下:

    input.addSink(
       new FlinkKafkaProducer<>(
          "bar",
          new KafkaSerializationSchemaImpl(),
             properties,
          FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");
    

    初始化执行env.addSink的时候会创建StreamSink对象,即StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));这里的sinkFunction就是传入的FlinkKafkaProducer对象,StreamSink构造函数中将这个对象传给父类AbstractUdfStreamOperator的userFunction变量,源码如下:

    StreamSink.java

    public StreamSink(SinkFunction<IN> sinkFunction) {
      super(sinkFunction);
      chainingStrategy = ChainingStrategy.ALWAYS;
    }
    

    AbstractUdfStreamOperator.java

    public AbstractUdfStreamOperator(F userFunction) {
       this.userFunction = requireNonNull(userFunction);
       checkUdfCheckpointingPreconditions();
    }
    

    Task运行

    StreamSink会调用下面的方法发送数据

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
       sinkContext.element = element;
       userFunction.invoke(element.getValue(), sinkContext);
    }
    

    也就是实际调用的是FlinkKafkaProducer#invoke方法。在FlinkKafkaProducer的构造函数中需要指FlinkKafkaProducer.Semantic,即:

    public enum Semantic {
       EXACTLY_ONCE,
       AT_LEAST_ONCE,
       NONE
    }
    

    下面就基于3种语义分别说一下总体的向kafka发送数据的流程

    Semantic.NONE

    这种方式不会做任何额外的操作,完全依靠kafka producer自身的特性,也就是FlinkKafkaProducer#invoke里面发送数据之后,flink不会再考虑kafka是否已经正确的收到数据。

    transaction.producer.send(record, callback);
    

    Semantic.AT_LEAST_ONCE

    这种语义下,除了会走上面说到的发送数据的流程外,如果开启了checkpoint功能,在FlinkKafkaProducer#snapshotState中会首先执行父类的snapshotState方法,里面最终会执行FlinkKafkaProducer#preCommit

    @Override
    protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
       switch (semantic) {
          case EXACTLY_ONCE:
          case AT_LEAST_ONCE:
             flush(transaction);
             break;
          case NONE:
             break;
          default:
             throw new UnsupportedOperationException("Not implemented semantic");
       }
       checkErroneous();
    }
    

    AT_LEAST_ONCE会执行了flush方法,里面执行了

    transaction.producer.flush();
    

    就是将send的数据立即发送给kafka服务端,详细含义可以参考KafkaProducer api

    flush()

    Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

    Semantic.EXACTLY_ONCE

    EXACTLY_ONCE语义也会执行sendflush方法,但是同时会开启kafka producer的事务机制,详细内容请参考Kafka事务。FlinkKafkaProducer中beginTransaction的源码如下,可以看到只有是EXACTLY_ONCE模式才会真正开始一个事务。

    @Override
    protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
       switch (semantic) {
          case EXACTLY_ONCE:
             FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
             producer.beginTransaction();
             return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
          case AT_LEAST_ONCE:
          case NONE:
             // Do not create new producer on each beginTransaction() if it is not necessary
             final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
             if (currentTransaction != null && currentTransaction.producer != null) {
                return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
             }
             return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
          default:
             throw new UnsupportedOperationException("Not implemented semantic");
       }
    }
    

    和AT_LEAST_ONCE另一个不同的地方在于checkpoint的时候,会将事务相关信息保存到变量nextTransactionalIdHintState中,这个变量存储的信息会作为checkpoint中的一部分进行持久化。

    if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
       checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
       long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
    
       // If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
       // case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
       // scaling up.
       if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
          nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
       }
    
       nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(
          getRuntimeContext().getNumberOfParallelSubtasks(),
          nextFreeTransactionalId));
    }
    

    总结

    本文介绍了FlinkKafkaProducer的基本实现原理,后续会详细介绍flink在结合kafka的时候如何做到端到端的Exactly Once语义的。

    注:本文基于flink 1.9.0和kafka 2.3

    参考

    Flink kafka source源码解析

    相关文章

      网友评论

        本文标题:Flink kafka sink源码解析

        本文链接:https://www.haomeiwen.com/subject/diklbctx.html