美文网首页Kafka相关
Flink 消费 Kafka 分区顺序性问题

Flink 消费 Kafka 分区顺序性问题

作者: caster | 来源:发表于2021-08-30 14:40 被阅读0次

    1. kafka分区数据顺序性

    kafka具有分区内数据有序的特点,可以通过将数据指定到特定的分区来实现数据的顺序性。kafka分区逻辑代码如下:如果指定了分区号生产,则发送到指定分区;否则调用分区器计算方法partitioner.partition()

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
    

    一共三个分区器实现了Partitioner类的partition()方法:

    1. DefaultPartitioner
      指定分区则使用;否则如果存在key则根据key去hash;否则batch满了切换分区。
    2. RoundRobinPartitioner
      没指定分区则平均分配循环写入分区。
    3. UniformStickyPartitioner
      和默认相比去除掉key取hash相关的规则。

    综上,我们想实现数据顺序入kafka,可以指定分区写或者通过设置key值相同保证数据入同一个分区。但是要注意避免全部数据入同一分区的场景,最好将数据分组即保证组内数据有序而不是全局有序。

    如果采用设置key值相同方式进行组内数据入同一分区,则计算分区方式如下:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //key为null等同于UniformStickyPartitioner分区器
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // key取hash后再取正值(并非绝对值)再对分区数量取余
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    

    经测试尽量保证key的前缀多样化来保证数据的均匀分布,可以对自己的数据进行测试来敲定key的定义方式:如下数据返回结果为:1 1 1 0 2 / 0 2 0 1 2 / 0 2 2 1 0

    System.out.print(Utils.toPositive(Utils.murmur2("test11".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("test12".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("test13".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("test14".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("test15".getBytes()))%3+" ");
    System.out.println();
    System.out.print(Utils.toPositive(Utils.murmur2("1test1".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("2test1".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("3test1".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("4test1".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("5test1".getBytes()))%3+" ");
    System.out.println();
    System.out.print(Utils.toPositive(Utils.murmur2("1".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("2".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("3".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("4".getBytes()))%3+" ");
    System.out.print(Utils.toPositive(Utils.murmur2("5".getBytes()))%3+" ");
    System.out.println();
    

    2. Flink消费kafka的顺序性

    首先构造三个分区的topic,然后写入测试数据:指定了key和每个key的版本号,以版本号升序方式写入kafka。

    new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"1\",\"time\":1623588192345}");
    new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"1\",\"time\":1623588192342}");
    new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"2\",\"time\":1623588192347}");
    new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"2\",\"time\":1623588192344}");
    new ProducerRecord<>("test1", "c", "{\"key\":\"c\",\"value\":\"1\",\"time\":1623588192345}");
    new ProducerRecord<>("test1", "c", "{\"key\":\"c\",\"value\":\"2\",\"time\":1623588192348}");
    new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"3\",\"time\":1623588192346}");
    new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"3\",\"time\":1623588192349}");
    new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"4\",\"time\":1623588192348}");
    

    通过以下命令可以查看kafka topic数据和消费组情况:

    ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test1  --time -1
    ./kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group test1 --describe
    

    以上数据分布情况如下:key a和key c位于1号分区,key b位于2号分区。

    test1:0:0
    test1:1:5
    test1:2:4
    

    编写flink代码消费kafka观察数据顺序性:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //设置并行度为3对应分区数,但是只有
    env.setParallelism(3);
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test1");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    String[] fields = new String[]{"key","value","time"};
    TypeInformation[] type = new TypeInformation[3];
    type[0] = Types.STRING;
    type[1] = Types.STRING;
    type[2] = Types.LONG;
    RowTypeInfo rowTypeInfo = new RowTypeInfo(type, fields);
    
    SingleOutputStreamOperator<Row> dataStreamSource = env.addSource(new FlinkKafkaConsumer(
                    "test1",
                    new SimpleStringSchema(),
                    properties
            ).setStartFromEarliest())
                    .process(new JsonToRow(rowTypeInfo), rowTypeInfo)
                    .name("kafkaSource").uid("kafkaSource");
    //source数据情况
    dataStreamSource.print("source");
    //按key对数据进行分区
    KeyedStream<Row, String> key = dataStreamSource
            .keyBy((KeySelector<Row, String>) value ->  value.getFieldAs(0));
    //keyby后数据情况
    key.print("sink");
    
    env.execute("kafkatest");
    

    输出结果如下:

    source:3> +I[a, 1, 1623588192345]
    source:1> +I[b, 1, 1623588192342]
    source:1> +I[b, 2, 1623588192344]
    source:3> +I[a, 2, 1623588192347]
    source:1> +I[b, 3, 1623588192346]
    source:3> +I[c, 1, 1623588192345]
    source:1> +I[b, 4, 1623588192348]
    source:3> +I[c, 2, 1623588192348]
    source:3> +I[a, 3, 1623588192349]
    sink:2> +I[a, 1, 1623588192345]
    sink:1> +I[b, 1, 1623588192342]
    sink:2> +I[a, 2, 1623588192347]
    sink:1> +I[b, 2, 1623588192344]
    sink:1> +I[b, 3, 1623588192346]
    sink:2> +I[c, 1, 1623588192345]
    sink:1> +I[b, 4, 1623588192348]
    sink:2> +I[c, 2, 1623588192348]
    sink:2> +I[a, 3, 1623588192349]
    

    可以看出source和sink每个线程输出的数据中均按key值的版本号升序排布,即flink消费kafka和进行keyby操作(shuffle)均为破坏kafka的分区有序性。

    3. Flink消费kafka并进行checkpoint

    在上文代码基础上配置checkpoint配置,设置为本地文件存储且任务停止时保留checkpoint文件

    String topic = args[0];
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new HashMapStateBackend());
    env.getCheckpointConfig().setCheckpointStorage("file:///Users/caster/Desktop/checkpoint");
    
    // 两个checkpoint超时时间之间的间隔和语义
    env.enableCheckpointing(5*1000, CheckpointingMode.AT_LEAST_ONCE);
    // checkpoint超时时间
    env.getCheckpointConfig().setCheckpointTimeout(60*1000);
    // 最大同时进行的checkpoint任务,即多个Barrier进入处理流程
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 两次checkpoint之间的空闲时间最小值,设置后上一个配置则为1
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
    // 设置为0表示不容忍checkpoint失败
    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
    // 重启策略2:每隔10s尝试重启一次共三次,超时一分钟则失败
    env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(60),Time.seconds(10)));
    
    // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    env.setParallelism(3);
    
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", topic);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    String[] fields = new String[]{"key","value","time"};
    TypeInformation[] type = new TypeInformation[3];
    type[0] = Types.STRING;
    type[1] = Types.STRING;
    type[2] = Types.LONG;
    RowTypeInfo rowTypeInfo = new RowTypeInfo(type, fields);
    
    SingleOutputStreamOperator<Row> dataStreamSource = env.addSource(new FlinkKafkaConsumer(
                    topic,
                    new SimpleStringSchema(),
                    properties
            ).setStartFromEarliest())
                    .process(new JsonToRow(rowTypeInfo), rowTypeInfo)
                    .name("kafkaSource").uid("kafkaSource");
    //source数据情况
    dataStreamSource.print("source");
    KeyedStream<Row, String> keyedStream = dataStreamSource
            .keyBy((KeySelector<Row, String>) value -> value.getFieldAs(0));
    //keyby后数据情况
    keyedStream.print("sink");
    
    env.execute("kafkatest");
    

    mvn打包带有依赖的jar包,然后到flink 客户端提交任务:

    ./flink run flink-compute-1.0-SNAPSHOT.jar test1
    

    提交成功会返回job 的id:

    Job has been submitted with JobID e7309690361278a82675c7d981057692
    

    且在配置的checkpoint目录可以看到对应的job的目录,在不断进行新的checkpoint


    checkpoint目录

    在flink 的 web UI可以看到具体的两个print()输出,直到kafka数据全部消费完毕,如下图:


    job 输出情况
    关闭任务,并向kafka内新生产部分数据,测试指定checkpoint目录恢复任务,命令如下:
    ./flink run -s /Users/caster/Desktop/checkpoint/e7309690361278a82675c7d981057692/chk-19  flink-compute-1.0-SNAPSHOT.jar test1
    

    相当于重新启动新的flink job,job id发生改变,checkpoint目录也随之改变。
    观察到任务会从上次消费到的位置继续消费,红线上为第一个job输出,红线下为第二个job输出:


    job 输出情况

    相关文章

      网友评论

        本文标题:Flink 消费 Kafka 分区顺序性问题

        本文链接:https://www.haomeiwen.com/subject/noxoiltx.html