美文网首页
kafka-2.7.0(三)spring boot 集成 kaf

kafka-2.7.0(三)spring boot 集成 kaf

作者: _大叔_ | 来源:发表于2021-02-26 09:25 被阅读0次

    依赖

        <dependencies>
            <!-- 2.3.7 -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-autoconfigure</artifactId>
            </dependency>
        </dependencies>
    

    配置

    spring:
      kafka:
        ###########【kafka集群】###########
        # 192.168.81.62:9092,192.168.81.62:9092
        bootstrap-servers: 47.101.179.240:9092
        ###########【初始化生产者配置】###########
        producer:
          # 重试次数,producer 两次重试之间会停顿一段时间,以防止频繁地重试对系统带来冲击。这段时间是可以配置的,由参数 retry.backoff.ms 指定,默认是 100 毫秒
          # 重试可能造成消息的重复发送,为了应对这一风险, Kafka 要求用户在 consumer 端必须执行去重处理
          # 重试可能造成消息的乱序,producer 提供了 max.in.flight.requets.per.connection参数 一旦用户将此参数设置成 1, producer 将确保某一时刻只能发送一个请求
          retries: 2
          # acks=0 : producer 不等kafka是否接收成功,立即开始其他工作 -> 提高吞吐量,单会丢失数据
          # acks=1 : producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果 producer ,而无须等待 ISR 中其他副本写入该消息。能保证吞吐量,也能保证一定的持久性。
          # acks=all/-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待 ISR 中所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给producer。吞吐量极低,但不会丢失数据。
          acks: 1
          # 参数默认值是 16384B=16KB。producer 会将发往同一分区的多条消息封装进一个 batch 中。当 batch 满了的时候, producer 会发送 batch
          # 中的所有消息。不过, producer 并不总是 batch 满了才发送消息,很有可能当 batch 还有很多空闲空间时 producer 就发送该 batch,
          # producer 不管是否能够填满, producer 都会为该 batch 分配固定大小的内存。
          batch-size: 131072
          properties:
            # 控制消息发送延时行为,这个参数也就是影响 batch 不满也会被发送的原因,实际上这也是一种权衡,即吞吐量与延时之间的权衡
            linger.ms: 200
            # produce 发送请求给broker后,broker需要在规定的时间范围内将处理理结果返还给 produce。这段时间便是由该参数控制的,默认是 30s。
            # 默认的30s对于一般的情况而言是足够的,但如果 producer 发送的负载很大,超时的情况就很容易碰到,此时就应该适当调整该参数值。
            request.timeout.ms: 30
            # 控制 producer 发送请求的大小,1048576B=1M,该请求包括消息体和请求头的整体大小
            max.request.size: 1048576
            # 为防止 topic 同分区下的消息乱序问题。这个参数的实际效果其实限制了 producer 在单个 broker 连接上能够发送的未响应请求的数量
            # 设置成1 ,producer 在某个 broker 发送响应之前将无法再给该 broker 发送 PRODUCE 请求
            max.in.flight.requests.per.connection: 1
          # buffer-memory 生产端缓冲区大小,默认值 33554432B=32MB。
          # producer 启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另一个专属线程负责从缓冲区中读取消息执行真正的发送。这部分内存空间
          # 的大小即是由 buffer.memory 参数指定的。若 producer 向缓冲区写消息的速度超过了专属 io 线程发送消息的速度,那么必然造成该缓冲区空间
          # 的不断增大。此时 producer 会停止手头的工作等待 io 线程追上来,若一段时间之后 io 线程还是无法追上 producer 的进度,那么 producer
          # 就会抛出异常并期望用户介入进行处理。若 producer 程序要给很多分区发送消息,那么就需要仔细地设置这个参数以防止过小的内存缓冲区降低了
          # producer 程序整体的吞吐量。
          buffer-memory: 33554432
          # Kafka提供的序列化和反序列化类
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 压缩,默认 none(gzip\snappy\lz4),能降低网络IO提高吞吐量,但也会增加 producer 端机器的 CPU 开销。
          # 如果 broker 端的压缩参数设 置得与 producer 不同, broker 端在写入消息时也会额外使用 CPU 资源对消息进行对应的解压缩-重压缩操作。
          compression-type: lz4
        ###########【初始化消费者配置】###########
        consumer:
          # 默认的消费组ID
          groupId: wx_public_number
          properties:
            # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
            session.timeout.ms: 10000
            # 消费请求超时时间
            request.timeout.ms: 3000
            # 消费到的数据处理时长不宜超过max.poll.interval.ms,否则会触发rebalance,也可能导致 offset 提交失败
            max.poll.interval.ms: 10000
            # 最低拉取 1KB 的数据
            fetch.min.bytes: 1024
            # 不足 1KB 让等待 2s 再去拉取
            fetch.max.wait.ms: 2000
          # 是否自动提交 consumer offset,批量的时候要改为 false
          enable-auto-commit: true
          # 自动提交的时间间隔
          auto-commit-interval: 200
          # 假设你首次运行一个 consumer group 并且指定从头消费。显然该 group 会从头消费所有数据,因为此时该 group 还没有任何位移信息。
          # 一旦该 group 成功提交位移后,你重启了 group ,依然指定从头消费。此时你会发现该 group 并不会真的从头消费,因为 Kafka 己经保存了该 group
          # 位移信息,因此它会无视 auto.offset.reset 的设置。
          # latest(默认值)指定从最新处位移开始消费
          # earliest :指定从最早的位移开始消费,注意这里最早的位移不一定就是0
          # none:只要有一个分区不存在已提交的offset,就抛出异常;
          auto-offset-reset: earliest
          # Kafka提供的序列化和反序列化类
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # 批量消费每次最多消费多少条消息
          max-poll-records: 10000
          # Fetch请求发给 broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。毫秒数
          fetch-max-wait: 1000
        # 消费端监听的topic不存在时,项目启动会报错(关掉)
        listener:
          # 消费者并发启动个数(对应分区个数)每个listener方法
          concurrency: 4
          # manual listener负责ack,但是背后也是批量上去
          # manual_immediate listner负责ack,每调用一次,就立即commit
          # count_time ackTime或ackCount哪个条件先满足,就commit
          # count 累积达到ackCount次的ack去commit
          # record 每处理一条commit一次
          # batch 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
          # time 每次间隔ackTime的时间去commit
          # ack-mode: manual
          missing-topics-fatal: false
          poll-timeout: 3000
    

    groupId 一般为项目名称最好,消费者ID一般为业务或功能名称,topic的设计最好是 项目_业务_功能_标识ID

    生产与消费

    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.annotation.KafkaListeners;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.config.TopicBuilder;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.Acknowledgment;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    @SpringBootApplication
    public class KafkaApplication implements CommandLineRunner {
    
        @Resource
        KafkaTemplate<String, String> kafkaTemplate;
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    
        @Override
        public void run(String... args){
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "11111");
            kafkaTemplate.send(producerRecord);
        }
    
        /**
         * 创建 topic
         * @author big uncle
         * @date 2021/2/20 10:49
         * @param
         * @return org.apache.kafka.clients.admin.NewTopic
         **/
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic1")
                    .partitions(4)
                    .replicas(1)
                    .build();
        }
    
        /**
         * 消费 手动ACK 需要修改  enable-auto-commit: false
        **/
        @KafkaListener(id = "myId1", topics = "topic1",groupId="wx_public_number")
        public void listen1(String in, Acknowledgment ack) {
            System.out.println("aa单消费:"+in);
            ack.acknowledge();
        }
        /**
         * 消费自动ACK 需要 enable-auto-commit: true
        **/
        @KafkaListener(id = "myId2", topics = "topic1",groupId="wx_public_number")
        public void listen2(String in) {
            System.out.println("bb单消费:"+in);
        }
    
    }
    
    

    topic的消息会被发送到不同的消费组,不同的消费组创建一个消费者会实现广播消费模式,单播的话只需要删掉多余的组就好。

    批量消费

    package com.giant.kafka.config;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;
    
    /**
     * kafka 批量配置
     * @Author big uncle
     * @Date 2021/7/7 10:01
    **/
    @Configuration
    public class KafkaBatchConf {
    
        @Resource
        KafkaProperties properties;
    
    
        /**
         *  消费者批量工程
         *  手动提交
         */
        @Bean
        public KafkaListenerContainerFactory<?> batchFactoryAck() {
            return batchFactoryTemplate(factory -> {
                Map<String, Object> props = consumeProps();
                // 手动提交
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
                factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
                factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            });
        }
    
        /**
         *  消费者批量工程
         *  自动提交
         */
        @Bean
        public KafkaListenerContainerFactory<?> batchFactory() {
            Map<String, Object> props = consumeProps();
            // 最低拉取拉取数据的大小
            props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,  properties.getConsumer().getProperties().get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG));
            // 不够最低拉取等待一定的时间再去拉取
            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,   properties.getConsumer().getProperties().get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG));
            return batchFactoryTemplate(factory -> factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)));
        }
    
        /**
         *  消费者配置信息
         */
        private Map<String,Object> consumeProps() {
            Map<String, Object> props = new HashMap<>(20);
            // kafka server 地址
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
            // 组id
            props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getConsumer().getGroupId());
            // 偏移量
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset());
            // 最大拉取条数,该参数被fetch.max.wait.ms、fetch.min.bytes、max.partition.fetch.bytes 所影响
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, properties.getConsumer().getMaxPollRecords());
            // 会话超时时间
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, properties.getConsumer().getProperties().get(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG));
            // 请求超时时间
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, properties.getConsumer().getProperties().get(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
            // key序列化
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties.getConsumer().getKeyDeserializer());
            // 值序列化
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties.getConsumer().getValueDeserializer());
            // 自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
            // 自动提交时间间隔
            Duration autoCommitInterval = properties.getConsumer().getAutoCommitInterval();
            if(!ObjectUtil.isEmpty(autoCommitInterval)){
                props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, Long.valueOf(autoCommitInterval.toMillis()).intValue());
            }
            // poll 间隔时间
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,  properties.getConsumer().getProperties().get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG));
            return props;
        }
    
    
        /**
         * 批量消费模板
        **/
        private ConcurrentKafkaListenerContainerFactory batchFactoryTemplate(Consumer<ConcurrentKafkaListenerContainerFactory> consumer){
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            consumer.accept(factory);
            factory.setBatchListener(Boolean.TRUE);
            // 并发线程数
            factory.setConcurrency(properties.getListener().getConcurrency());
            return factory;
        }
    
    }
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.annotation.KafkaListeners;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.config.TopicBuilder;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.Acknowledgment;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    @SpringBootApplication
    public class KafkaApplication implements CommandLineRunner {
    
        @Resource
        KafkaTemplate<String, String> kafkaTemplate;
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    
        @Override
        public void run(String... args){
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "11111");
            kafkaTemplate.send(producerRecord);
        }
    
        /**
         * 创建 topic
         * @author big uncle
         * @date 2021/2/20 10:49
         * @param
         * @return org.apache.kafka.clients.admin.NewTopic
         **/
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic1")
                    .partitions(4)
                    .replicas(1)
                    .build();
        }
    
        /**
         * 批量自动确认ACK
        **/
        @KafkaListener(id = "consumer_1", topics = "topic1",groupId="wx_public_number",containerFactory = "batchFactory")
        public void consumer_1(List<String> in) {
            log.debug("消费数据 {}",in.size());
        }
    
        /**
         * 批量手动确认ACK
        **/
        @KafkaListener(id = "consumer_2", topics = "topic1",groupId="wx_public_number",containerFactory = "batchFactoryAck")
        public void consumer_2(List<String> in,Acknowledgment ack) {
            log.debug("消费数据 {}",in.size());
            ack.acknowledge();
        }
    }
    

    批量消费和单条消费不影响相互使用。

    按不同分区消费

    @KafkaListener(id = "consumer_2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "1" }) },groupId="wx_public_number")
        public void consumer_2(String str) {
            log.debug("消费数据 {}",str);
        }
    

    key 和 partition 的区别
    如果一个有效的partition属性数值被指定,那么在发送记录时partition属性数值就会被应用。如果没有partition属性数值被指定,而一个key属性被声明的话,一个partition会通过key的hash而被选中。如果既没有key也没有partition属性数值被声明,那么一个partition将会被分配以轮询的方式。

    相关文章

      网友评论

          本文标题:kafka-2.7.0(三)spring boot 集成 kaf

          本文链接:https://www.haomeiwen.com/subject/fxfmfltx.html