美文网首页
kafka生产者和消费者api的简单使用

kafka生产者和消费者api的简单使用

作者: huan1993 | 来源:发表于2021-01-09 18:44 被阅读0次

    一、背景

    此处主要是简单记录一下,使用 kafka api 发送消息和接收消息。

    二、需要实现的功能

    1、生产者实现功能

    1、KafkaProducer线程安全的,可以在多线程中使用。

    2、消息发送的key和value的序列化

    3、自定义分区的使用

    4、自定义拦截器的使用

    5、消息发送完成后的回调使用

    2、消费者实现功能

    1、消息接收的key和value的序列化

    2、指定消费者组

    3、自动提交 offset (生产环境可以使用手动提交offset)

    4、重置消费者的偏移量,此配置生效的条件

    5、自定义消息消费拦截器

    6、每次从服务器获取多少数据

    3、详细实现

    详细实现,请参考代码中的注释。
    

    三、代码实现

    1、jar包的引入

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.0</version>
        </dependency>
    </dependencies>
    

    2、生产者代码实现

    1、自定义分区器

    主要目的是为了能自己控制自己的消息需要发送到那个分区中。

    import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
    import org.apache.kafka.common.Cluster;
    
    /**
     * @author huan.fu 2021/1/5 - 上午9:57
     */
    public class CustomPartitioner extends DefaultPartitioner {
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            int partition = super.partition(topic, key, keyBytes, value, valueBytes, cluster);
            System.err.println("消息被分配到分区: " + partition);
            return partition;
        }
    
        @Override
        public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
            System.err.println("topic: " + topic + " 产生了一个新的批次");
            super.onNewBatch(topic, cluster, prevPartition);
        }
    }
    

    2、自定义消息生产拦截器

    主要是消息在发送到kafka服务器之前,最消息进行一个拦截。

    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 自定义生产者消息拦截器,此类不是线程安全的,需要自己控制线程安全问题
     *
     * @author huan.fu 2021/1/5 - 上午10:04
     */
    public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            System.out.println("此方法是在消息发送的业务线程中,可以在消息发送到kafka服务器时做一些处理");
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            System.out.println("此方法是在kafka 生产者的 I/O 线程中执行,需要立即返回,否则影响消息发送的速度");
            if (exception != null) {
                System.out.println("消息发送到服务器时发生了异常");
                return;
            }
            System.out.println("消息发送到了服务器 - offset:" + metadata.offset() + " partition:" + metadata.partition());
        }
    
        @Override
        public void close() {
            System.out.println("释放资源");
        }
    }
    

    3、生产者代码

    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    
    /**
     * 测试kafka发送者
     * 1、KafkaProducer线程安全的,可以在多线程中使用。
     * 2、消息发送的key和value的序列化
     * 3、自定义分区的使用
     * 4、自定义拦截器的使用
     * 5、消息发送完成后的回调使用
     *
     * @author huan.fu 2021/1/4 - 上午10:11
     */
    public class KafkaProducerDemo {
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            // kafka服务器的地址,端口,可以不用全写,写几个既可
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
            // kafka 消息key的序列化方式
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            // kafka 消息value的序列化方式
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            // 0: 消息只要发送出去了,就认为是发送成功
            // 1: 消息被 leader 分区保存后,就认为发送成功
            // all: 消息被 leader + isr 都保存成功后,才认为发送成功
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            // 设置消息重试的次数
            properties.put(ProducerConfig.RETRIES_CONFIG, 0);
            // 消息发送失败后,重试的间隔,即等待多少毫秒后在重试
            properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
            // kafka 的消息默认是批量发送,即等批次消息达到一定大小(单位字节)的时候发送,此参数控制批次消息的大小
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            // batch.size 设置kafka消息批次达到一定的字节时才发送,那万一一直没有达到,此时就由linger.ms控制,即等待多少毫秒之后发送,等于0表示立即发送
            properties.put(ProducerConfig.LINGER_MS_CONFIG, 100);
            // 表示生产者可以用来缓存等待发送到服务器上消息的可用内存大小。
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            // 控制一次请求的大小,防止出现一个非常大的请求,默认是一兆
            properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
            // 控制客户端等待服务端响应的最大时间,此配置的值应该要大于 replica.lag.time.max(服务端移除isr的一个超时配置) 的值。
            properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
            // 配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()会阻塞多长时间。这些方法可能因为缓冲区已满或元数据不可用而被阻塞。
            properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
            // 配置kafka自定义的分区器,我们自己实现的分区器,需要实现 org.apache.kafka.clients.producer.Partitioner 接口
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.huan.kafka.api.CustomPartitioner");
            // 配置kafka自定义的拦截器,可以配置多个,多个以英文的逗号分割开
            properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.huan.kafka.api.CustomProducerInterceptor");
    
            // KafkaProducer 是线程安全的,可以多个线程使用用一个 KafkaProducer
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("topic-a", "value - (" + i + 1 + ")");
                kafkaProducer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            System.err.println("发送数据到kafka中,发生了异常.");
                            exception.printStackTrace();
                            return;
                        }
                        System.out.println("topic: " + metadata.topic() + " offset: " + metadata.offset() + " partition: "
                                + metadata.partition());
                    }
                });
            }
    
            System.out.println("消息发送完成");
            kafkaProducer.close();
        }
    }
    

    3、消息消费者实现

    1、自定义消息消费拦截器

    import org.apache.kafka.clients.consumer.ConsumerInterceptor;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.util.Map;
    
    /**
     * @author huan.fu 2021/1/5 - 下午2:48
     */
    public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            System.out.println("从kafka拉取到了数据,可以在此修改记录数据");
            return records;
        }
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            System.out.println("提交偏移量成功时(即已经提交到了broker上),调用此函数.");
        }
    
        @Override
        public void close() {
            System.out.println("close");
        }
    }
    
    

    2、消息消费者实现

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 测试kafka消费者
     * 1、消息接收的key和value的序列化
     * 2、指定消费者组
     * 3、自动提交 offset (生产环境可以使用手动提交offset)
     * 4、重置消费者的偏移量,此配置生效的条件
     * 5、自定义消息消费拦截器
     * 6、每次从服务器获取多少数据
     *
     * @author huan.fu 2021/1/5 - 上午10:29
     */
    public class KafkaConsumerDemo {
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
            // kafka服务器的地址,端口,可以不用全写,写几个既可
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
            // kafka 消息key的序列化方式
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // kafka 消息value的序列化方式
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // 设置消费者组,多个消费者只要组一样,就认为在一个组中,那个topic中的某一个分区,只能被这个组中的某个消费组消费,消费者组和消费者组之间互不影响
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test-001");
            // 自动提交 offset
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            // 自动提交 offset 的间隔,即隔多长时间提交offset
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
            // 重置消费者的偏移量,此配置生效的条件:
            //      1、当前消费者的偏移量不在kafka服务器上(比如是一个新的消费者组)
            //      2、当前消费者的offset在kafka服务器上不存在(比如: 当前消费者消费的偏移量到了10,但是此时消费者宕机了很长一段时间,而服务器上数据默认保存7天,那么此时10之后的某些偏移量可能被删除了)
            // earliest: 从最早的偏移量开始消费
            // latest: 从最新的偏移量开始消费
            // none: 如果没有在消费者组中找到先前的偏移量,则向消费者抛出异常
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            // 配置kafka自定义的拦截器,可以配置多个,多个以英文的逗号分割开
            properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.huan.kafka.api.CustomConsumerInterceptor");
            // 设置两次 poll 方法之间的最大的延时,如果超过了最大的延时,则kafka认为该consumer消费能力弱,会将该分区给别的消费者消费
            properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
            // 设置一次 pool 最多获取多少条记录
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
            // 拉取消息时,每个分区返回的最大的消息的字节数
            properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,1048576);
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
            // 指定consumer消费的主题
            consumer.subscribe(Collections.singletonList("topic-a"));
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("接收到了一个数据. partition:" + consumerRecord.partition() + " offset:"
                            + consumerRecord.offset() + " 消息的值:" + consumerRecord.value());
                    TimeUnit.SECONDS.sleep(3);
                }
            }
        }
    }
    

    四、完整代码

    https://gitee.com/huan1993/rabbitmq/tree/master/kafka-api

    相关文章

      网友评论

          本文标题:kafka生产者和消费者api的简单使用

          本文链接:https://www.haomeiwen.com/subject/pszcaktx.html