美文网首页消息队列Kafka
kafka多线程、定时、按时间段消费

kafka多线程、定时、按时间段消费

作者: 风里有神通 | 来源:发表于2019-05-21 17:20 被阅读166次

    最近做大数据相关的工作,用到了kafka。因为时间工期较赶的缘故,消费工程设计得比较简单、没有集群、没有分布式。单机的小钢炮,跑起来处理一天将近小100万的数据量,还是有点生猛的,不过暂时也能hold住。主要是把kafka生产出来的前一天数据,集中到凌晨以后流量低峰期去处理。要是放到白天或者遇到流量高峰期,实时处理的话,那对数据库、服务器来说还是有点压力的,其他业务可能直接就被拖死。那这样,就涉及消费者,按开始时间和结束时间,来处理数据。offsetsForTimes()这个方法按时间偏移量找的时候,会在你提供的时间点的附近往前查找偏移量,简单说,你想找 2019-05-21 00:00:00 至 2019-05-21 23:59:59 这一天产生的消息,那kafka在找结束时间为 2019-05-21 23:59:59 的时候,消息找不着,那它就往前继续找最近一条消息的位置,就可能找到 2019-05-22 00:00:03 这个时间点的消息,显然不符合我们只需要消费前一天数据的要求。所以,我们得往后,回溯,我这里是按照1秒的时间间隔往后查找,直至找到为止。

    在网上找了很多kafka关于按某个时间戳消费的资料,并不是按时间段,那些都不是特别理想。后边自己想了想办法,根据kafka官方提供的consumerAPI,粗略的实现了这一功能,再加上多线程之后,处理速度相当快!处理消息数据时,当计算出来了前一天的消息量之后,在多线程处理消息的过程中,采用计数的方式,来停止消费,即100万条消息全部消费完毕时,便关闭客户端连接。当然这其中肯定还会存在很多其他问题,待完善补充,如何确保这100万条消息全部被完整的处理掉,或者在处理期间程序出异常了,中断消费了呢,异常消息的如何处理、补偿,重复消费的控制等等。
    现在把核心的demo代码贴出来,供参考。

    线程池的配置

    TaskExecutorConfig

    import org.apache.log4j.Logger;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import java.lang.reflect.Method;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
    * @author yangpin
    * @Description
    * @Date 2018/8/15 16:08
    * @Param
    * @return
    **/
    @Configuration
    @EnableAsync
    public class TaskExecutorConfig  implements AsyncConfigurer {
    
        private static final Logger logger =Logger.getLogger(TaskExecutorConfig.class);
    
        @Autowired
        private TaskThreadPoolConfig config;
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(config.getCorePoolSize());
            executor.setMaxPoolSize(config.getMaxPoolSize());
            executor.setQueueCapacity(config.getQueueCapacity());
            executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
            executor.setThreadNamePrefix("mq-executor-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return new AsyncUncaughtExceptionHandler() {
                @Override
                public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
                    logger.error("=========================="+arg0.getMessage()+"=======================", arg0);
                    logger.error("exception method:"+arg1.getName());
                }
            };
        }
    
    }
    
    

    自定义配置类

    TaskThreadPoolConfig

    import org.springframework.boot.context.properties.ConfigurationProperties;
    /**
    * @author yangpin
    * @Description
    * @Date 2018/8/15 16:08
    * @Param
    * @return
    **/
    @ConfigurationProperties(prefix ="spring.task.pool")
    public class TaskThreadPoolConfig {
    
        private int corePoolSize;
        private int maxPoolSize;
        private int keepAliveSeconds;
        private int queueCapacity;
    
    
    
        public int getCorePoolSize() {return corePoolSize; }
        public void setCorePoolSize(int corePoolSize) {this.corePoolSize = corePoolSize;}
        public int getMaxPoolSize() {return maxPoolSize;}
        public void setMaxPoolSize(int maxPoolSize) {this.maxPoolSize = maxPoolSize;}
        public int getKeepAliveSeconds() {return keepAliveSeconds;}
        public void setKeepAliveSeconds(int keepAliveSeconds) {this.keepAliveSeconds = keepAliveSeconds;}
        public int getQueueCapacity() {return queueCapacity; }
        public void setQueueCapacity(int queueCapacity) {this.queueCapacity = queueCapacity; }
    
    
    }
    
    
    
    

    消费端配置

    
    import com.xyy.bi.configure.mq.MqConfigProperties;
    import com.xyy.bi.service.SourceDataService;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
    * @author yangpin
    * @Description mq配置
    * @Date 2019/5/13 10:16
    * @Param
    * @return
    **/
    @Configuration
    @EnableKafka
    public class MqConfig {
    
        private static final Logger logger =LoggerFactory.getLogger(MqConfig.class);
    
        @Autowired
        MqConfigProperties mqConfigProperties;
    
        @Autowired
        SourceDataService sourceDataService;
    
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String bootstrapServers;
    
        @Value("${spring.kafka.consumer.enable-auto-commit}")
        private Boolean autoCommit;
    
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
    
        @Value("${spring.kafka.consumer.auto-offset-reset}")
        private String autoOffsetReset;
    
        @Value("${spring.kafka.consumer.key-deserializer}")
        private String keyDeserializer;
    
        @Value("${spring.kafka.consumer.value-deserializer}")
        private String valueDeserializer;
    
        @Value("${spring.kafka.ocnsumer.session-timeout}")
         private String sessionTimeout;
    
        @Value("${spring.kafka.consumer.auto-commit-interval}")
        private String autoCommitInterval;
    
        @Value("${spring.kafka.consumer.max-poll-records}")
        private String maxPollRecords;
    
        @Value("${spring.kafka.consumer.max-poll-interval}")
         private String maxPollInterval;
    
        @Value("${spring.kafka.listener.concurrency}")
         private Integer concurrency;
    
        @Value("${kafka.app.topic.test1}")
         private String test1Topic;
    
        @Value("${kafka.app.topic.test2}")
         private String test2Topic;
    
        @Value("${kafka.app.topic.test3}")
         private String test3Topic;
    
        @Bean
        public KafkaListenerContainerFactory kafkaListenerContainerFactory(){
            ConcurrentKafkaListenerContainerFactory factory =new ConcurrentKafkaListenerContainerFactory();
            factory.setConcurrency(concurrency);
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setPollTimeout(300000);
            return factory;
        }
    
        public MapconsumerConfigs() {
            Mapprops =new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,autoCommitInterval);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,maxPollInterval);
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,sessionTimeout);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
            return props;
        }
    
        @Bean
        public ConsumerFactory consumerFactory(){
            DefaultKafkaConsumerFactory consumerFactory =new             
            DefaultKafkaConsumerFactory(consumerConfigs());
            return consumerFactory;
        }
    
        public static Logger getLogger() { return logger;  }
        public String getAutoOffsetReset() { return autoOffsetReset; }
        public String getTest1Topic() { return test1Topic; }
        public String getTest2Topic() { return test2Topic; }
        public String getTest3Topic() { return test3Topic; }
    }
    
    

    test1消费者

    
    import com.xyy.bi.service.SourceDataService;
    import kafka.utils.ShutdownableThread;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.time.Duration;
    
    /**
    * @author yangpin
    * @Description mq数据处理
    * @Date 2019/5/13 10:17
    * @Param
    * @return
    **/
    public class MqTest1Consumer extends ShutdownableThread {
    
        private static final Logger logger =LoggerFactory.getLogger(MqTest1Consumer.class);
        
        private final KafkaConsumerconsumer;
        private final long endOffset ;
        private final long startOffset ;
        private long counts  ;
        private final MqConfig configs;
        SourceDataService sourceDataService;
    
        public MqTest1Consumer(MqConfig configs,SourceDataService sourceDataService, KafkaConsumer consumer,long startOffset,long endOffset) {
            super("test1-consumer", false);
            this.configs = configs;
            this.consumer = consumer;
            this.startOffset = startOffset;
            this.endOffset = endOffset;
            this.sourceDataService = sourceDataService;
    //        consumer = new KafkaConsumer<>(configs.consumerConfigs());
    
        }
    
        @Override
        public void doWork() {
            try {
                  //consumer.assign(topicPartitions);
                  ConsumerRecordsrecords =consumer.poll(Duration.ofSeconds(configs.mqConfigProperties.getFrequency()));
                        
                   if (records ==null ||records.count() ==0 ){
                     consumer.close();
                     shutdown();
                    }
    
                  for (final ConsumerRecordrecord :records) {
                        if (record.offset() <=endOffset){
                             counts++;
                        
                          //此处为你的消息数据业务处理
    
    
                             logger.info("总计需要处理条数: " + (endOffset-startOffset) +" ,test1第: "+counts+ " 条 , test1结束offset = " + 
                             endOffset + " , test1当前offset = " + record.offset());
                             consumer.commitSync();
                         }else {
                            break;
                         }
                   }
                 if ((endOffset -startOffset) == counts){
                     consumer.close();
                     shutdown();
                 }
              }catch (Exception e){
                logger.error("mq消息队列处理异常!" + e.getMessage());
                e.printStackTrace();
              }
         }
    
        @Override
        public boolean isInterruptible() {return false;}
    
    

    test2消费者

    import com.xyy.bi.service.SourceDataService;
    import kafka.utils.ShutdownableThread;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.time.Duration;
    
    public class MqTest2Consumer  extends ShutdownableThread {
    
        private static final Logger logger =LoggerFactory.getLogger(MqTest2Consumer.class);
    
        private final KafkaConsumerconsumer;
        private final long endOffset ;
        private final long startOffset ;
        private final MqConfig configs;
        SourceDataService sourceDataService;
        private long counts  ;
    
        public MqTest2Consumer(MqConfig configs, SourceDataService sourceDataService, KafkaConsumer consumer,long startOffset,long endOffset) {
            super("test2-consumer", false);
            this.configs = configs;
            this.consumer = consumer;
            this.startOffset = startOffset;
            this.endOffset = endOffset;
            this.sourceDataService = sourceDataService;
            //consumer = new KafkaConsumer<>(configs.consumerConfigs());
        }
    
        @Override
        public void doWork() {
          try {
                //consumer.assign(topicPartitions);
                ConsumerRecordsrecords =consumer.poll(Duration.ofSeconds(configs.mqConfigProperties.getFrequency()));
         
                if (records ==null ||records.count() ==0 ){
                     consumer.close();
                     shutdown();
                }
              
    
                for (final ConsumerRecordrecord :records  ) {
                     if (record.offset() <=endOffset){
    
                         //此处为你的消息数据业务处理
    
                          counts++;
                          logger.info("总计需要处理条数: " + (endOffset-startOffset) +" ,test2第: "+counts+ " 条 , test2结束offset = " +         
                                          endOffset + " , test2当前offset = " + record.offset());
                          consumer.commitSync();
                     }else {
                         break;
                     }
                  }
                if ((endOffset -startOffset) ==counts){
                    consumer.close();
                    shutdown();
                 }
            }catch (Exception e){
                  logger.error("mq消息队列处理异常!" + e.getMessage());
                  e.printStackTrace();
            }
        }
    
        @Override
        public boolean isInterruptible() { return false;  }
    
    

    核心任务处理类

    
    import com.xyy.bi.service.SourceDataService;
    import com.xyy.bi.thread.TaskExecutorConfig;
    import com.xyy.bi.utils.DateUtil;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Component;
    
    import java.text.SimpleDateFormat;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicLong;
    
    
    @Component
    @Configuration
    @EnableScheduling
    public class MqTask {
    
    
        private static final Logger logger = LoggerFactory.getLogger(MqTask.class);
    
    
        @Autowired
        TaskExecutorConfig taskExecutorConfig;
    
        @Autowired
        MqConfig mqConfig;
    
        @Autowired
        SourceDataService sourceDataService;
    
        /**
        * @author yangpin
        * @Description kafka定时消费
        * @Date 2019/5/21 18:06
        * @Param []
        * @return void
        **/
        //每天凌晨0点
        @Scheduled(cron = "0 0 00 * * ?")
        private void MqTask() {
            try {
                logger.info("mq消息队列消费线程初始化开始!......");
                ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) taskExecutorConfig.getAsyncExecutor();
                KafkaConsumer<Integer, String> test1Consumer = new KafkaConsumer(mqConfig.consumerConfigs());
                KafkaConsumer<Integer, String> test2Consumer = new KafkaConsumer(mqConfig.consumerConfigs());
    
                List<PartitionInfo> test1PartitionInfos = test1Consumer.partitionsFor(mqConfig.getTest1Topic());
                List<PartitionInfo> test2PartitionInfos = test2Consumer.partitionsFor(mqConfig.getTest2Topic());
    
                List<TopicPartition> test1TopicPartitions = new ArrayList<>();
                List<TopicPartition> test2TopicPartitions = new ArrayList<>();
    
                Map<TopicPartition, Long> test1StartTimestampsToSearch = new HashMap<>();
                Map<TopicPartition, Long> test1EndTimestampsToSearch = new HashMap<>();
    
                Map<TopicPartition, Long> test2StartTimestampsToSearch = new HashMap<>();
                Map<TopicPartition, Long> test2EndTimestampsToSearch = new HashMap<>();
    
                final AtomicLong test1StartOffset = new AtomicLong(0L);
                final AtomicLong test1EndOffset = new AtomicLong(0L);
    
    
                final AtomicLong test2StartOffset = new AtomicLong(0L);
                final AtomicLong test2EndOffset = new AtomicLong(0L);
    
                //是否开启偏移消费
                if (mqConfig.mqConfigProperties.getOffset() == true  && mqConfig.getAutoOffsetReset().equals("latest")){
                    logger.info("偏移消费开启!......");
    
                    Date now = new Date();
                    Calendar calendar = Calendar.getInstance();
                    calendar.setTime(now);
                    calendar.add(calendar.DATE, -1);
                    SimpleDateFormat sd= new SimpleDateFormat(DateUtil.DEFALT_DATE_FORMAT);
                    SimpleDateFormat df= new SimpleDateFormat(DateUtil.DATE_FORMATE_YYYYMMDDHHMMSS);
    
                    logger.info("当前时间:   " + DateUtil.getDate(DateUtil.DATE_FORMATE_YYYYMMDDHHMMSS) +"");
                    logger.info("偏移消费时间段:" + sd.format(calendar.getTime()) + " 00:00:00" +  " 至 " + sd.format(calendar.getTime()) + " 23:59:59" );
    
                    test1PartitionInfos.forEach(n ->{
                        test1TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                        //开始时间
                        test1StartTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(0));
                        test1EndTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(1));
                    });
    
                    test2PartitionInfos.forEach(n ->{
                        test2TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                        test2StartTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(0));
                        test2EndTimestampsToSearch.put(new TopicPartition(n.topic(), n.partition()), DateUtil.getLastDayStartTimeStamp(1));
                    });
                    test1Consumer.assign(test1TopicPartitions);
                    test2Consumer.assign(test2TopicPartitions);
                    // 获取每个partition指定时间之前的偏移量
                    Map<TopicPartition, OffsetAndTimestamp> test1StartTimeMap = test1Consumer.offsetsForTimes(test1StartTimestampsToSearch);
                    Map<TopicPartition, OffsetAndTimestamp> test1EndTimeMap = test1Consumer.offsetsForTimes(test1EndTimestampsToSearch);
    
                    Map<TopicPartition, OffsetAndTimestamp> test2StartTimeMap = test2Consumer.offsetsForTimes(test2StartTimestampsToSearch);
                    Map<TopicPartition, OffsetAndTimestamp> test2EndTimeMap = test2Consumer.offsetsForTimes(test2EndTimestampsToSearch);
    
                    logger.info("开始设置各分区初始偏移量!......");
                    offsetHandle(test1StartTimeMap,test1EndTimeMap,test1StartOffset,test1EndOffset,test1EndTimestampsToSearch,test1Consumer,df);
                    offsetHandle(test2StartTimeMap,test2EndTimeMap,test2StartOffset,test2EndOffset,test2EndTimestampsToSearch,test2Consumer,df);
                    logger.info("设置各分区初始偏移量完毕!......");
    
    
                }else if (mqConfig.getAutoOffsetReset().equals("earliest") && mqConfig.mqConfigProperties.getOffset() == false){
                    test1PartitionInfos.forEach(n ->{
                        test1TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                    });
                    test2PartitionInfos.forEach(n ->{
                        test2TopicPartitions.add(new TopicPartition(n.topic(), n.partition()));
                    });
                    logger.info("isSetOffsetTime = " + mqConfig.mqConfigProperties.getOffset() + "消费策略 = " + mqConfig.getAutoOffsetReset() );
                    test1Consumer.assign(test1TopicPartitions);
                    test2Consumer.assign(test2TopicPartitions);
                }else {
                    logger.error("mq消息参数配置有误,请检查配置文件!");
                    System.exit(-1);
                }
                executor.execute(new MqTest1Consumer(mqConfig,sourceDataService,test1Consumer,test1StartOffset.get(),test1EndOffset.get()));
                executor.execute(new MqTest2Consumer(mqConfig,sourceDataService,test2Consumer,test2StartOffset.get(),test2EndOffset.get()));
                logger.info("mq消息队列消费线程初始化完成!......");
            }catch (Exception e){
                e.printStackTrace();
                logger.error("mq消息队列消费线程初始化失败!......" + e.getMessage());
                System.exit(-1);
            }
        }
    
    
    
    
        /**
        * @author yangpin
        * @Description offset偏移处理
        * @Date 2019/5/21 18:05
        * @Param [startTimeMap, endTimeMap, startOffset, endOffset, endTimestampsToSearch, consumer, df]
        * @return void
        **/
        private void offsetHandle(Map<TopicPartition, OffsetAndTimestamp> startTimeMap,
                                  Map<TopicPartition, OffsetAndTimestamp> endTimeMap,
                                  final AtomicLong startOffset,
                                  final AtomicLong endOffset,
                                  Map<TopicPartition, Long> endTimestampsToSearch,
                                  KafkaConsumer<Integer, String> consumer,
                                  SimpleDateFormat df){
    
            startTimeMap.forEach((k,v) ->{
                OffsetAndTimestamp startOffsetTimestamp =  v;
                OffsetAndTimestamp endOffsetTimestamp =  endTimeMap.get(k);
                if(startOffsetTimestamp != null) {
                    long endTimestamp = 0L;
                    String topic = k.topic();
                    int partition = k.partition();
                    long startTimestamp = startOffsetTimestamp.timestamp();
                    long startOffsetTmp = startOffsetTimestamp.offset();
                    if (endOffsetTimestamp != null){
                        //86,400,000
                        //86,399,000
                        endTimestamp = endOffsetTimestamp.timestamp();
                        endOffset.set(endOffsetTimestamp.offset());
                        long lastDayEndTime = DateUtil.getLastDayStartTimeStamp(1);
                        boolean flag = false;
                        if (endTimestamp > lastDayEndTime){
                            while (true){
                                endTimestamp = endTimestamp - 1000;
                                //往后回溯一秒查找
                                endTimestampsToSearch.put(new TopicPartition(k.topic(), k.partition()), endTimestamp);
                                Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(endTimestampsToSearch);
                                for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetsForTimes.entrySet()) {
    //                                        logger.info("反向查找时间节点 = " + df.format(new Date(entry.getValue().timestamp())));
                                    if (entry.getValue().timestamp() <= lastDayEndTime){
                                        endTimestamp = entry.getValue().timestamp();
                                        endOffset.set(entry.getValue().offset());
                                        flag = true;
                                        break;
                                    }
                                }
                                if (flag == true) break;
                            }
                        }
                    }
                    logger.info("consumer : " + " topic = " + topic + " , partition = " +
                            partition + " , period of time = " + df.format(new Date(startTimestamp))+" - " + df.format(new Date(endTimestamp))
                            + " , period of offset = " + startOffsetTmp + " - " + endOffset.get() +" ,共计: " + (endOffset.get() - startOffsetTmp));
                    // 设置读取消息的偏移量
                    startOffset.set(startOffsetTmp);
                    consumer.seek(k, startOffsetTmp);
                }
            });
    
        }
    
    
    
    }
    
    

    pom文件主要用到的包

        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.0.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <spring-kafka.version>2.2.5.RELEASE</spring-kafka.version>
            <kafka.version>2.2.0</kafka.version>
        </properties>
    
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>${kafka.version}</version>
         </dependency>
    

    application.properties主要配置

    
    #mq configuration
    #是否自动提交偏移量
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.group-id=test-group
    spring.kafka.consumer.bootstrap-servers=localhost:9092,
    #指定消费策略(earliest|latest|none)
    spring.kafka.consumer.auto-offset-reset=latest
    spring.kafka.consumer.auto-commit-interval=5000
    spring.kafka.consumer.max-poll-records=1000
    spring.kafka.consumer.max-poll-interval=300000
    spring.kafka.ocnsumer.session-timeout=150000
    spring.kafka.listener.concurrency=5
    
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #topic
    kafka.app.topic.test1=test1
    kafka.app.topic.test2=test2
    
    kafka.consumer.frequency=20
    kafka.consumer.offset=true
    kafka.consumer.offsetTime=2
    
    spring.task.pool.corePoolSize=30
    spring.task.pool.maxPoolSize=30
    spring.task.pool.keepAliveSeconds=70
    spring.task.pool.queueCapacity=25
    
    

    相关文章

      网友评论

        本文标题:kafka多线程、定时、按时间段消费

        本文链接:https://www.haomeiwen.com/subject/hhajzqtx.html