美文网首页JavaFlink学习指南玩转大数据
Flink 使用之Kafka exactly-once场景

Flink 使用之Kafka exactly-once场景

作者: AlienPaul | 来源:发表于2023-05-10 11:31 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    前言

    近期一个需求是写一套demo用来证明Flink能够精准一次投送。笔者设计了如下场景:Flink从Kafka消费数据,然后原封不动再发送回Kafka。中间模拟Flink作业失败恢复的场景。Flink作业恢复之后,仍可以继续发送数据到Kafka。输出的数据和消费的数据相比,不丢失也不重复。

    环境信息

    • Flink 1.13.2
    • Kafka 1.1.1
    • Hadoop 3.1.1

    要点分析

    需要格外注意的有如下内容:

    • Flink一定要启用checkpoint。
    • Flink CheckpointMode一定要配置为EXACTLY_ONCE
    • Flink Kafka数据源配置要禁用自动commit。
    • Flink Kafka数据源配置要配置隔离级别为read_committed
    • Flink Kafka Producer要配置EXACTLY_ONCE(内部会启用事务和幂等性)。

    主要注意的是,如果我们使用kafka-console-consumer等外部系统读取Flink写入到Kafka的数据来验证数据是否重复或丢失的话,必须保证这个外部系统也是配置了EXACTLY_ONCE相关支持的。例如kafka-console-consumer需要配置隔离级别为read_committed。否则即便是Flink处理数据的时候的确实现了exactly_once,由于kafka-console-consumer读到了未提交的数据,读到的数据会“重复”。干扰结果的验证。

    实现代码

    Flink主程序代码如下所示:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 非必须
    env.setParallelism(1);
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
    String sourceBroker = parameterTool.get("source-broker");
    String sinkBroker = parameterTool.get("sink-broker");
    String sourceTopic = parameterTool.get("source-topic");
    String sinkTopic = parameterTool.get("sink-topic");
    String checkpointPath = parameterTool.get("ckp-path");
    
    CheckpointingMode checkpointingMode = CheckpointingMode.valueOf(mode);
    
    // checkpoint时间间隔,必须到启动checkpoint
    env.enableCheckpointing(666, CheckpointingMode.EXACTLY_ONCE);
    env.setStateBackend(new HashMapStateBackend());
    env.getCheckpointConfig().setCheckpointStorage(checkpointPath);
    
    KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(sourceBroker)
            .setTopics(sourceTopic)
            .setGroupId("source")
            .setValueOnlyDeserializer(new SimpleStringSchema())
            // 必须项,否则数据会有重复
            // 禁用kafka source自动提交offset
            .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
            // 必须项,否则数据会有重复
            // 配置kafka source隔离级别为读提交
            .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
            .build();
    
    DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source");
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", sinkBroker);
    // exactly once模式必须要配置
    properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000));
    // 启用幂等性,下面设置FlinkKafkaProducer的时候指定了EXACTLY_ONCE会自动启用事务,可以不配置此项
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    // 配置transactional id,下面设置FlinkKafkaProducer的时候指定了EXACTLY_ONCE会自动启用事务,可以不配置此项
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer");
    
    KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
            return new ProducerRecord<>(
                    sinkTopic,
                    element.getBytes(StandardCharsets.UTF_8));
        }
    };
    
    FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
            sinkTopic,
            serializationSchema,
            properties,
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    
    stream.addSink(kafkaSink).name("kafka_sink");
    
    // execute program
    env.execute("Exactly once demo");
    

    演示方式

    这里Kafka数据源topic为source,输出数据topic为sink。

    启动任意kafka数据源(也可以使用console producer),向source topic写入数据。

    启动kafka console consumer,监视Flink输出的数据。注意务必要添加隔离级别参数,设置为read_committed。命令示例如下:

    kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --isolation-level read_committed --topic sink
    

    将上面代码编译输出为Flink作业jar,使用flink命令提交。

    flink run -m yarn-cluster  -c xxx.xxx.xxx xxx.jar --source-broker master:9092,node1:9092,node2:9092 --source-topic source --sink-broker master:9092,node1:9092,node2:9092 --sink-topic sink --ckp-path hdfs://xxx:9000/path/to/checkpoint/
    

    任务运行之后一段时间。通过Flink管理页面找到TaskManager container所在的节点。使用kill pid命令终止进程。等待Flink自动恢复。成功恢复后再观察kafka console consumer,输出数据应不重复不遗漏。

    相关文章

      网友评论

        本文标题:Flink 使用之Kafka exactly-once场景

        本文链接:https://www.haomeiwen.com/subject/akgrsdtx.html