美文网首页
Kafka系列4-Kafka API

Kafka系列4-Kafka API

作者: 只是甲 | 来源:发表于2021-11-24 10:16 被阅读0次

    一. Producer API

    消息发送流程
      Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了
    两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker。

    image.png

    pom文件配置
    2.2.1 为kafka的版本

    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.2.1</version>
        </dependency>
    

    1.2 异步发送普通生产者

    代码:
    BATCH_SIZE_CONFIG = "batch.size":消息为batch.size大小,生产者才发送消息
    LINGER_MS_CONFIG = "linger.ms":如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送

    package com.bigdata.study.kafka;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark kafka生产者 - 异步 -  不带回调函数的API
     */
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.*;
    
    public class CustomProducer {
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
    
            // 设置集群配置
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
            // ack机制
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            // 重试次数
            props.put(ProducerConfig.RETRIES_CONFIG, 1);
            // 批次大小:消息大小为16384才发送消息
            props.put("batch.size", 16384);
            // 等待时间:如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            // ReadAccumulator缓冲区大小
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            // 序列化
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // 构造Producer
            Producer<String,String> producer=new KafkaProducer<>(props);
    
            // 生产消费
            for (int i = 0; i < 100; i++){
                producer.send(new ProducerRecord<String, String>("kafka_test1", "test_" + Integer.toString(i), "test_" + Integer.toString(i)));
            }
    
            producer.close();
    
        }
    }
    

    shell中查看消费者:

    /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server hp2:9092 --topic kafka_test1
    
    image.png

    1.2 异步发送带回调函数的生产者

    回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别为RecordMetaData和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

    消息发送失败会启动重试机制,但需要在回调函数中手动重试

    代码:

    package com.bigdata.study.kafka;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark  kafka生产者 - 异步 -  带回调函数的API
     */
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.producer.*;
    
    public class CallBackProducer {
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
    
            // 设置集群配置
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
            // ack机制
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            // 重试次数
            props.put(ProducerConfig.RETRIES_CONFIG, 1);
            // 批次大小:消息大小为16384才发送消息
            props.put("batch.size", 16384);
            // 等待时间:如果消息大小迟迟不为batch.size大小,则等待linger.ms时间后直接发送
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            // ReadAccumulator缓冲区大小
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            // 序列化
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // 构造Producer
            Producer<String,String> producer=new KafkaProducer<>(props);
    
            for (int i = 0; i < 100; i++){
                producer.send(new ProducerRecord<String, String>("kafka_test1", "test-" + Integer.toString(i),"test-" +  Integer.toString(i)), new Callback() {
                    //回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if ( e == null ) {
                            System.out.println("success->" + recordMetadata.offset());
                        } else {
                            e.printStackTrace();
                        }
                    }
                });
            }
    
            producer.close();
    
        }
    }
    

    1.3 生产者分区策略测试

    image.png
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {};
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {};
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {};
    public ProducerRecord(String topic, Integer partition, K key, V value) {};
    public ProducerRecord(String topic, K key, V value) {};
    public ProducerRecord(String topic, V value) {};
    

    上面ProducerRecord中的partition参数即为指定的分区(分区是有编号的,这是指定分区中的某一个,实际应该为一个分区编号)。

    这里要注意,如果指定特定分区的话,消息是会发送到这个编号的特定分区,但是注意如果你的Topic分区只有默认的1个,而你却要发送到分区1号,此时发送会失败!因为你只有1个分区,即0号分区。所以在构建的topic的时候需要注意。

    默认分区构造

    // 构造消息体,这里加上具体的分区,其中的2是特定的分区编号
    producer.send(new ProducerRecord<>("aaroncao",2, "test-" + i, "test-" + i), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                System.out.println(recordMetadata.partition() + "-" + recordMetadata.offset());
            } else {
                e.printStackTrace();
            }
        }
    });
    
    

    二. API消费者

    2.1 简单消费者

    Kafka提供了自动提交offset的功能enable.auto.commit=true;

    代码:

    package com.bigdata.study.kafka;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark  kafka消费者
     */
    
    public class CustomConsumer {
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
    
            // 设置集群配置
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
    
            // 设置消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group1");
            // 设置offset的自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            // 设置offset自动化提交的间隔时间
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            // 生产者是序列化,消费者则为反序列化
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            // offset重置,需要设置自动重置为earliest
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            // 这里需要订阅具体的topic
            consumer.subscribe(Collections.singletonList("kafka_test1"));
    
            // 一直处于监听状态中
            while (true) {
                // 因为消费者是通过pull获取消息消费的,这里设置间隔100ms
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
                // 对获取到的结果遍历
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value());
                }
                // 同步提交,会一直阻塞直到提交成功,这里可以设置超时时间,如果阻塞超过超时时间则释放
                //consumer.commitSync();
    
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.out.println("Commit failed, offset = " + offsets);
                        }
                    }
                });
    
            }
    
        }
    }
    

    测试记录:

    image.png

    2.2 消费者重置offset

    Consumer消费数据时的可靠性很容易保证,因为数据在Kafka中是持久化的,不用担心数据丢失问题。但由于Consumer在消费过程中可能遭遇断电或者宕机等故障,Consumer恢复之后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费的offset位置,以便故障恢复后可以继续消费。

    offset的维护是Consumer消费数据必须考虑的问题。

    // offset重置,需要设置自动重置为earliest
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    

    将消费者组的id变换一下即可,否则由于一条消息只能够被一个消费者组中的消费者消费一次,此时不会重新消费之前的消息,即使设置了offset重置也没有作用。

    注意
    这里的auto.offset.reset="earliest"的作用等同于在linux控制台,消费者监听的时候添加的--from-beginning命令。

    auto.offset.reset取值

    1. earliest:重置offset到最早的位置
    2. latest:重置offset到最新的位置,默认值
    3. none:如果在消费者组中找不到前一个offset则抛出异常
    4. anything else:抛出异常给消费者

    2.3 消费者保存offset读取问题

    enable.auto.commit=true即自动提交offset。默认是自动提交的。

    2.4 消费者手动提交offset

    自动提交offset十分便利,但是由于其实基于时间提交的,开发人员难以把握offset提交的时机,因此kafka提供了手动提交offset的API。

    手动提交offset的方法主要有两种:

    1. commitSync:同步提交
    2. commitAsync:异步提交

    相同点: 两种方式的提交都会将本次poll拉取的一批数据的最高的偏移量提交。

    不同点: commitSync阻塞当前线程,持续到提交成功,失败会自动重试(由于不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,有可能提交失败。

    代码:
    同步提交

    package com.bigdata.study.kafka;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark  kafka消费者 - 同步提交
     */
    
    public class SyncCommitOffset {
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
    
            // 设置集群配置
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
    
            // 设置消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group2");
            // 设置offset的自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            // 设置offset自动化提交的间隔时间
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            // 生产者是序列化,消费者则为反序列化
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            // offset重置,需要设置自动重置为earliest
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            // 这里需要订阅具体的topic
            consumer.subscribe(Collections.singletonList("kafka_test1"));
    
            // 一直处于监听状态中
            while (true) {
                // 因为消费者是通过pull获取消息消费的,这里设置间隔100ms
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
                // 对获取到的结果遍历
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value());
                }
    
                // 同步提交,会一直阻塞直到提交成功,这里可以设置超时时间,如果阻塞超过超时时间则释放
                consumer.commitSync();
    
            }
    
        }
    }
    

    异步提交

    consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.out.println("Commit failed, offset = " + offsets);
                        }
                    }
                });
    

    2.5 数据漏消费和重复消费分析

    无论是同步提交还是异步提交offset,都可能会造成数据的漏消费或者重复消费,先提交offset后消费,有可能造成数据的漏消费,而先消费再提交offset,有可能会造成数据的重复消费。

    2.6 自定义存储offset

    Kafka0.9版本之前,offset存储在zookeeper中,0.9版本及之后的版本,默认将offset存储在Kafka的一个内置的topic中,除此之外,Kafka还可以选择自定义存储offset数据。offse的维护相当繁琐,因为需要考虑到消费者的rebalance过程:

    当有新的消费者加入消费者组、已有的消费者退出消费者组或者订阅的主体分区发生了变化,会触发分区的重新分配操作,重新分配的过程称为Rebalance。
    

    消费者发生Rebalace之后,每个消费者消费的分区就会发生变化,因此消费者需要先获取到重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。(High Water高水位)

    代码:

    package com.bigdata.study.kafka;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark  Kafka自定义offset提交
     */
    
    public class CustomOffsetCommit {
        private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hp2:9092,hp3:9092,hp4:9092");
            // 设置消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group4");
            // 设置offset的自动提交为false
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 这里的意思是订阅的时候同时定义Consumer重分配的监听器接口
            consumer.subscribe(Collections.singletonList("kafka_test1"), new ConsumerRebalanceListener() {
                // rebalance发生之前调用
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    commitOffset(currentOffset);
                }
    
                // rebalance发生之后调用
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    currentOffset.clear();
                    for (TopicPartition partition : partitions) {
                        // 定位到最新的offset位置
                        consumer.seek(partition, getOffset(partition));
                    }
                }
            });
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
                    // 记录下当前的offset
                    currentOffset.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
                }
            }
        }
    
        // 获取某分区最新的offset
        private static long getOffset(TopicPartition topicPartition) {
            return 0;
        }
    
        // 提交该消费者所有分区的offset
        private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
    
        }
    }
    

    即自己记录下需要提交的offset,利用Rebalance分区监听器监听rebalance事件,一旦发生rebalance,先将offset提交,分区之后则找到最新的offset位置继续消费即可

    三. 自定义拦截器

    拦截器原理
    Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,Interceptor实现的接口为ProducerInterceptor,主要有四个方法:

    1. configure(Map<String, ?> configs):获取配置信息和初始化数据时调用

    2. onSend(ProducerRecord record):该方法封装在KafkaProducer.send()方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。

    3. onAcknowledgement(RecordMetadata metadata, Exception exception):onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。

    4. close():关闭inteceptor,主要用于执行资源清理工作。

    Inteceptor可能被运行到多个线程中,在具体使用时需要自行确保线程安全,另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。

    自定义加入时间戳拦截器
    TimeInterceptor

    package com.bigdata.study.kafka;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark  Kafka  消息拦截器
     */
    
    public class TimeInterceptor implements ProducerInterceptor<String, String> {
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                    "TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
        
    }
    
    

    CounterInterceptor

    package com.bigdata.study.kafka;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * @author 只是甲
     * @date   2021-10-29
     * @remark  Kafka  消息拦截器
     */
    
    public class CounterInterceptor implements ProducerInterceptor<String, String> {
        private int errorCounter = 0;
        private int successCounter = 0;
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                successCounter++;
            } else {
                errorCounter++;
            }
        }
    
        @Override
        public void close() {
            // 输出结果,结束输出
            System.out.println("Sent successful:" + successCounter);
            System.out.println("Sent failed:" + errorCounter);
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    
    

    在CustomProducer中加入拦截器

    // 加入拦截器
    List<Object> interceptors = new ArrayList<>();
    interceptors.add(TimeInterceptor.class);
    interceptors.add(CounterInterceptor.class);
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    

    测试记录:

    image.png
    /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server hp2:9092 --topic kafka_test1
    
    image.png

    参考:

    1. https://blog.csdn.net/cao1315020626/article/details/112590786

    相关文章

      网友评论

          本文标题:Kafka系列4-Kafka API

          本文链接:https://www.haomeiwen.com/subject/vcqialtx.html