美文网首页
Java多consumer消费kafka

Java多consumer消费kafka

作者: ivms8200 | 来源:发表于2022-09-18 17:28 被阅读0次

    本项目基于springboot2

    引入依赖

      <properties>
        <hutool-version>5.7.12</hutool-version>
        <lombok-version>1.18.20</lombok-version>
        <slf4j-api-version>1.7.10</slf4j-api-version>
        <logback-classic-version>1.0.13</logback-classic-version>
      </properties>
    <dependency>
      <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>${hutool-version}</version>
      </dependency>
      <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok-version}</version>
      </dependency>
      <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j-api-version}</version>
      </dependency>
      <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback-classic-version}</version>
      </dependency>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
      </dependency>
    

    配置文件

    创建subscribe/kafka.json,producer为预留参数

    {
      "producer": {
        "init": {
          "bootstrap.servers": "50.1.172.137:9092",
          "acks": "all",
          "retries": 0,
          "batch.size": 16384,
          "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
          "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
        },
        "topic": {
          "topicname1": "topic1",
          "topicname1": "topic2"
        }
      },
      "consumer": {
        "init": {
          "bootstrap.servers": "50.1.172.137:9092",
          "group.id": "daps10017dev",
          "enable.auto.commit": "false",
          "auto.commit.interval.ms": "1000",
          "session.timeout.ms": "30000",
          "max.poll.records": "100",
          "auto.offset.reset": "latest",
          "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
        },
        "topic": {
          "face": "SNAP_IMAGE_INFO_TOPIC",
          "dapsFace": "daps_face_snap_topic"
        },
        "thread": 10
      }
    }
    

    代码实现

    KafkaConfig

    import cn.hutool.core.io.FileUtil;
    import cn.hutool.json.JSONObject;
    import cn.hutool.json.JSONUtil;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.io.File;
    
    @Configuration
    public class KafkaConfig {
    
        @Bean
        public JSONObject kafkaConf() {
            //kafka配置
            String kafkaStr = FileUtil.readUtf8String(
                    Thread.currentThread().getContextClassLoader().getResource("").getPath()
                            + File.separator + "subscribe/kafka.json");
            return JSONUtil.parseObj(kafkaStr);
        }
    }
    

    ThreadPoolConfig

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    @EnableAsync
    @Slf4j
    public class ThreadPoolConfig {
        @Bean
        public ThreadPoolTaskExecutor subscribeThreadPool() {
            log.info("start subscribeThreadPool");
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //配置核心线程数
            executor.setCorePoolSize(10);
            //配置最大线程数
            executor.setMaxPoolSize(12);
            //配置队列大小
            executor.setQueueCapacity(0);
            //配置线程池中的线程的名称前缀
            executor.setThreadNamePrefix("async-subscribe-");
    
            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
            // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            //执行初始化
            executor.initialize();
            return executor;
        }
    

    KafkaConfigPathConstants

    public interface KafkaConfigPathConstants {
        /**
         * 消费者线程
         */
        String CONSUMER_THREAD = "consumer.thread";
        /**
         * topic
         */
        String CONSUMER_TOPIC = "consumer.topic";
        /**
         * 初始化参数
         */
        String CONSUMER_INIT = "consumer.init";
    }
    

    KafkaConsumerFactory

    import cn.hutool.json.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Properties;
    
    @Slf4j
    public class KafkaConsumerFactory {
    
        public static KafkaConsumer<String, String> getSingletonConsumer(JSONObject kafkaConf) {
            //初始化参数
            JSONObject init = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_INIT, JSONObject.class);
            Properties props = new Properties();
            init.entrySet().stream().forEach(set -> props.put(set.getKey(), set.getValue()));
            return new KafkaConsumer<>(props);
        }
    
    }
    

    KafkaCache

    import java.util.concurrent.CopyOnWriteArraySet;
    
    public class KafkaCache {
    
        /**
         * 是否需要kafka守护线程启动kafka消费线程
         */
        public static final AtomicBoolean kafkaConsumerDaemonNeed = new AtomicBoolean(true);
    
        /**
         * 线程缓存,用于恢复线程运行
         */
        public static final CopyOnWriteArraySet<Thread> kafkaConsumerThread = new CopyOnWriteArraySet<>();
        /**
         * kafka消费者线程,用于停止消费者
         */
        public static final CopyOnWriteArraySet<KafkaConsumerThread> kafkaConsumerThread2 = new CopyOnWriteArraySet<>();
    }
    

    KafkaManager

    import cn.hutool.json.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Component;
    import javax.annotation.Resource;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Component
    @EnableScheduling   // 1.开启定时任务
    @EnableAsync        // 2.开启多线程
    @Slf4j
    public class KafkaManager {
    
        @Autowired
        private ThreadPoolTaskExecutor subscribeThreadPool;
    
        @Resource
        private JSONObject kafkaConf;
        
        private AtomicInteger currentFaceConsumer = new AtomicInteger();
    
        private Integer consumerCount = Integer.MAX_VALUE;
    
        @Async
        @Scheduled(cron = "${subscribe.kafka.daemon.cron}")
        public void kafkaStart() {
            if (!KafkaCache.kafkaConsumerDaemonNeed.get()) {
                return;
            }
            if (currentFaceConsumer.get() >= consumerCount) {
                return;
            }
            try {
                //这里要检查下配置了topic没有。没有配置的话,不启动线程.
                Map topicMap = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_TOPIC, Map.class);
                if (topicMap.size() == 0) {
                    return;
                }
                consumerCount = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_THREAD, Integer.class);
                //任务加入线程池
                while (consumerCount > currentFaceConsumer.get()) {
                    subscribeThreadPool.execute(new KafkaConsumerThread(kafkaConf));
                    currentFaceConsumer.incrementAndGet();
                }
            } catch (Exception e) {
                log.error("kafka消费者启动失败", e);
            }
    
        }
    
        @Async
        @Scheduled(cron = "${subscribe.kafka.thread.log.cron}")
        public void threadLog() {
            log.info("检查任务运行状态--运行标志={}", KafkaCache.kafkaConsumerDaemonNeed.get());
            log.debug("检查任务运行状态--线程存活数量={}", subscribeThreadPool.getActiveCount());
            log.debug("检查任务运行状态--线程池大小={}", subscribeThreadPool.getPoolSize());
            log.debug("检查任务运行状态--线程队列剩余长度={}", subscribeThreadPool.getThreadPoolExecutor().getQueue().remainingCapacity());
            log.debug("检查任务运行状态--线程队列使用长度={}", subscribeThreadPool.getThreadPoolExecutor().getQueue().size());
        }
    
    }
    

    KafkaConsumerThread

    import cn.hutool.json.JSONObject;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.errors.InterruptException;
    import org.apache.kafka.common.errors.WakeupException;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.stream.Collectors;
    
    @Slf4j
    @NoArgsConstructor
    public class KafkaConsumerThread implements Runnable {
    
        private JSONObject kafkaConf;
    
        private KafkaConsumer<String, String> consumer;
    
        /**
         * 消费者开启状态
         */
        private AtomicBoolean opened = new AtomicBoolean(true);
    
        public KafkaConsumerThread(JSONObject kafkaConf) {
            this.kafkaConf = kafkaConf;
        }
    
        @Override
        public void run() {
            if (kafkaConf == null) {
                return;
            }
            consumer = initConsumer();
            KafkaCache.kafkaConsumerThread.add(Thread.currentThread());
            KafkaCache.kafkaConsumerThread2.add(this);
            consumerMsg();
        }
    
        /**
         * 初始化消费者
         *
         * @return
         */
        private KafkaConsumer<String, String> initConsumer() {
            KafkaConsumer<String, String> consumer = KafkaConsumerFactory.getSingletonConsumer(kafkaConf);
            Map<String, String> topic = kafkaConf.getByPath(KafkaConfigPathConstants.CONSUMER_TOPIC, Map.class);
            List<String> topicList = topic.entrySet().stream()
                    .map(o -> o.getValue())
                    .collect(Collectors.toList());
            consumer.subscribe(topicList);
            return consumer;
        }
    
        /**
         * 关闭消费
         */
        public void shutdownConsumer() {
            KafkaCache.kafkaConsumerDaemonNeed.set(false);
            //退出消费循环
            opened.set(false);
            // wakeup 可以安全地从外部线程来中断活动操作
            consumer.wakeup();
        }
    
        /**
         * 消费数据
         */
        private void consumerMsg() {
            ConsumerRecords<String, String> msgList;
            try {
                while (opened.get()) {
                    //try catch放wile里面保证一直消费
                    try {
                        msgList = consumer.poll(1000);
                        if (null == msgList || msgList.count() == 0) {
                            continue;
                        }
                        log.debug("消费到 {} 条数据", msgList.count());
                        for (ConsumerRecord<String, String> record : msgList) {
                            String topic = record.topic();
                            String value = record.value();
                            if (StringUtils.isBlank(value)) {
                                continue;
                            }
                            log.info("消费kafka数据。topic={};value={}", topic, value);
                            //todo
                            log.info("消费kafka数据。topic={} 数据处理完成", topic);
                        }
                        consumer.commitAsync();
                    } catch (WakeupException e) {
                    } catch (Exception e) {
                        if (e instanceof InterruptException) {
                            return;
                        }
                        log.error("消费数据报错", e);
                    }
                }
            } finally {
                //最外层finally在退出时处理consumer提交、关闭
                try {
                    consumer.commitSync();
                    consumer.close();
                } catch (Exception e) {
                } finally {
                    log.info("已关闭消费者");
                }
            }
        }
    }
    

    ShutdownConfig

    import com.hikvision.daps.modules.subscribe.kafka.KafkaCache;
    import com.hikvision.daps.modules.subscribe.kafka.KafkaConsumerThread;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.Ordered;
    
    @Configuration
    @Slf4j
    public class ShutdownConfig implements ApplicationRunner, Ordered {
    
        @Override
        public void run(ApplicationArguments args) {
            try {
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    // 在关闭钩子中执行收尾工作
                    // 注意事项:
                    // 1.在这里执行的动作不能耗时太久
                    // 2.不能在这里再执行注册,移除关闭钩子的操作
                    // 3 不能在这里调用System.exit()
                    int delayTime = 15;
                    log.info("关机前工作开始");
                    try {
                        //关闭缓存中的消费者
                        KafkaCache.kafkaConsumerThread2.forEach(KafkaConsumerThread::shutdownConsumer);
                        // 主线程继续执行,以便可以关闭consumer,提交偏移量
                        KafkaCache.kafkaConsumerThread.forEach(c -> {
                            try {
                                c.join();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                    long start = System.currentTimeMillis();
                    while (true) {
                        //关闭要时间,先睡
                        try {
                            Thread.sleep(1000L);
                        } catch (Exception e) {
                        }
                        long now = System.currentTimeMillis();
                        if (now - start >= delayTime * 1000L) {
                            log.info("关机前工作超过{}秒,将强行关机", delayTime);
                            break;
                        }
    //                KafkaCache.kafkaConsumerSet.forEach(c ->);
    //                if (TaskCache.doingFacePlateNoEventCount.get() <= 0 && TaskCache.taskPushFaceConsumer.size() <= 0) {
    //                    log.info("缓存的队列处理完毕");
    //                    break;
    //                }
                    }
                    log.info("关机前工作结束");
                }));
            } catch (Exception e) {
                log.info("关机前工作出错", e);
            }
        }
    
        @Override
        public int getOrder() {
            return Integer.MAX_VALUE;
        }
    }
    

    相关文章

      网友评论

          本文标题:Java多consumer消费kafka

          本文链接:https://www.haomeiwen.com/subject/pevportx.html