flink写入kafka默认采用的分区策略的代码实现在FlinkFixedPartitioner这个类中,并不是我们理解的轮盘转方式写入下游分区,而是每个并发固定的写入到kafka的某个分区,举个例子:flink有3个sink并发写入kafka,而kafka有10个分区,那么数据只会写入kafka的0-2号分区中,其他分区不会有数据。代码的实现逻辑如下:
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private int parallelInstanceId;
@Override
public void open(int parallelInstanceId, int parallelInstances) {
Preconditions.checkArgument(
parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
Preconditions.checkArgument(
parallelInstances > 0, "Number of subtasks must be larger than 0.");
this.parallelInstanceId = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
return partitions[parallelInstanceId % partitions.length];
}
@Override
public boolean equals(Object o) {
return this == o || o instanceof FlinkFixedPartitioner;
}
@Override
public int hashCode() {
return FlinkFixedPartitioner.class.hashCode();
}
}
我们可以看到partitions[parallelInstanceId % partitions.length]这行代码就是决定数据该写入kafka哪个分区中,其中parallelInstanceId 是flink的sink并发数编号,partitions.length是kafka的分区数。
在一般场景下,这种写入分区策略不会有太多问题,但是如果下游kafka有多个flink写入,举个例子:a,b...f作业都同时把数据写入到topic1中,每个flink并发度都是1,而topic1的分区数是10,这样就会导致所有的flink作业都会把数据写入到0分区中,1-9号分区没有数据,造成kafka的数据倾斜,这种情况下,只能我们自己自定义分区策略,我们可以简单的定义一个轮盘转方式的分区策略:
public class FlinkRebalancePartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private int parallelInstanceId;
private int nextPartitionToSendTo;
public FlinkRebalancePartitioner(){
}
@Override
public void open(int parallelInstanceId, int parallelInstances) {
Preconditions.checkArgument(
parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
Preconditions.checkArgument(
parallelInstances > 0, "Number of subtasks must be larger than 0.");
this.parallelInstanceId = parallelInstanceId;
nextPartitionToSendTo = ThreadLocalRandom.current().nextInt(parallelInstances);
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
nextPartitionToSendTo = (nextPartitionToSendTo + 1) % partitions.length;
return partitions[nextPartitionToSendTo];
}
@Override
public boolean equals(Object o) {
return this == o || o instanceof FlinkRebalancePartitioner;
}
@Override
public int hashCode() {
return FlinkRebalancePartitioner.class.hashCode();
}
}
这种方式简单明了,参考的是flink的数据分区策略中RebalancePartitioner这个类的实现方式,数据就能均匀的写入到下游kafka分区中去。
网友评论