美文网首页
Flink中动态消费kafka的一种方法

Flink中动态消费kafka的一种方法

作者: Epiphanic | 来源:发表于2023-02-13 14:32 被阅读0次

通过在FlinkKafkaConsumer设置maxRecordsPerPoll 来动态调整消费速率

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Properties;

public class DynamicKafkaConsumer<T> implements SourceFunction<T> {

    private final FlinkKafkaConsumer<T> kafkaConsumer;
    private final int maxRecordsPerPoll;
    private volatile boolean running;

    public DynamicKafkaConsumer(String topic, KafkaDeserializationSchema<T> schema, Properties props, int maxRecordsPerPoll) {
        this.kafkaConsumer = new FlinkKafkaConsumer<>(topic, schema, props);
        this.maxRecordsPerPoll = maxRecordsPerPoll;
        this.running = true;
    }

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        kafkaConsumer.setStartFromLatest();
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        kafkaConsumer.setProperties(props);
        kafkaConsumer.setDeserializationSchema(schema);

        while (running) {
            kafkaConsumer.setPollTimeout(1000);
            kafkaConsumer.setMaxRecordsPerPoll(maxRecordsPerPoll);
            kafkaConsumer.poll(new KafkaConsumerRecordsHandler(sourceContext));
        }
    }

    @Override
    public void cancel() {
        running = false;
        kafkaConsumer.close();
    }

    private static class KafkaConsumerRecordsHandler<T> implements KafkaConsumer.CallBridge {
        private final SourceContext<T> sourceContext;

        private KafkaConsumerRecordsHandler(SourceContext<T> sourceContext) {
            this.sourceContext = sourceContext;
        }

        @Override
        public void onCompletion(OffsetsHandler offsetsHandler) {
            // This method is called when the KafkaConsumer has finished processing a batch of records.
            // In this example, we don't do anything with the offsets, but this is where you could commit them to Kafka.
        }

        @Override
        public void onException(Throwable throwable) {
            // This method is called when the KafkaConsumer encounters an exception while polling for records.
            // In this example, we just print the exception, but you could also take some other action.
            throwable.printStackTrace();
        }

        @Override
        public void onRecords(ConsumerRecords<byte[], byte[]> records, OffsetsHandler offsetsHandler) {
            for (ConsumerRecord<byte[], byte[]> record : records) {
                // Deserialize the record and emit it to the Flink job.
                T value = deserialize(record.value());
                sourceContext.collect(value);
            }
        }

        private T deserialize(byte[] bytes) {
            // Deserialize the byte[] to your type T
            // This method should be implemented by the user according to their schema
            return null;
        }
    }
}

在此代码中,maxRecordsPerPoll 变量确定每次轮询获取的记录数。 通过动态调整该值,可以控制记录的消耗率。 请注意,pollTimeout 设置为 1000 毫秒,这意味着即使未达到 maxRecordsPerPoll,轮询也会在 1000 毫秒后返回。 这确保消费者不会被阻塞太久,并且可以根据需求调整其消费率。
要使用此消费者,创建 DynamicKafkaConsumer 类的实例,传入 Kafka 主题、反序列化架构、Kafka 属性以及每次轮询要获取的最大记录数。

相关文章

网友评论

      本文标题:Flink中动态消费kafka的一种方法

      本文链接:https://www.haomeiwen.com/subject/zhajkdtx.html