美文网首页
flink写入kafka分区策略

flink写入kafka分区策略

作者: 熊云昆 | 来源:发表于2022-04-08 19:44 被阅读0次

flink写入kafka默认采用的分区策略的代码实现在FlinkFixedPartitioner这个类中,并不是我们理解的轮盘转方式写入下游分区,而是每个并发固定的写入到kafka的某个分区,举个例子:flink有3个sink并发写入kafka,而kafka有10个分区,那么数据只会写入kafka的0-2号分区中,其他分区不会有数据。代码的实现逻辑如下:

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {

    private static final long serialVersionUID = -3785320239953858777L;

    private int parallelInstanceId;

    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(
                parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(
                parallelInstances > 0, "Number of subtasks must be larger than 0.");

        this.parallelInstanceId = parallelInstanceId;
    }

    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(
                partitions != null && partitions.length > 0,
                "Partitions of the target topic is empty.");

        return partitions[parallelInstanceId % partitions.length];
    }

    @Override
    public boolean equals(Object o) {
        return this == o || o instanceof FlinkFixedPartitioner;
    }

    @Override
    public int hashCode() {
        return FlinkFixedPartitioner.class.hashCode();
    }
}

我们可以看到partitions[parallelInstanceId % partitions.length]这行代码就是决定数据该写入kafka哪个分区中,其中parallelInstanceId 是flink的sink并发数编号,partitions.length是kafka的分区数。
在一般场景下,这种写入分区策略不会有太多问题,但是如果下游kafka有多个flink写入,举个例子:a,b...f作业都同时把数据写入到topic1中,每个flink并发度都是1,而topic1的分区数是10,这样就会导致所有的flink作业都会把数据写入到0分区中,1-9号分区没有数据,造成kafka的数据倾斜,这种情况下,只能我们自己自定义分区策略,我们可以简单的定义一个轮盘转方式的分区策略:

public class FlinkRebalancePartitioner<T> extends FlinkKafkaPartitioner<T> {
    private static final long serialVersionUID = -3785320239953858777L;
    private int parallelInstanceId;
    private int nextPartitionToSendTo;

    public FlinkRebalancePartitioner(){

    }

    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(
                parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(
                parallelInstances > 0, "Number of subtasks must be larger than 0.");

        this.parallelInstanceId = parallelInstanceId;
        nextPartitionToSendTo = ThreadLocalRandom.current().nextInt(parallelInstances);
    }

    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
        nextPartitionToSendTo = (nextPartitionToSendTo + 1) % partitions.length;
        return partitions[nextPartitionToSendTo];
    }

    @Override
    public boolean equals(Object o) {
        return this == o || o instanceof FlinkRebalancePartitioner;
    }

    @Override
    public int hashCode() {
        return FlinkRebalancePartitioner.class.hashCode();
    }
}

这种方式简单明了,参考的是flink的数据分区策略中RebalancePartitioner这个类的实现方式,数据就能均匀的写入到下游kafka分区中去。

相关文章

网友评论

      本文标题:flink写入kafka分区策略

      本文链接:https://www.haomeiwen.com/subject/vonysrtx.html