美文网首页
Kafka 使用ExecutorService 进行消费

Kafka 使用ExecutorService 进行消费

作者: ZhangDHing | 来源:发表于2019-07-26 10:29 被阅读0次

    前言:

         Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。
    

    如何使用Java的ExecutorService框架来创建线程池处理大量消息?

      1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。
    
      2.创建工作线程,以执行每条消息的进一步处理。
    

    1.topic消息传递到ThreadPoolExecutorService

    /** kafka 消息处理*/
    public class KafkaProcessor {
        private final KafkaConsumer<String, String> myConsumer;
        private ExecutorService executor;
        private static final Properties KAFKA_PROPERTIES = new Properties();
    
       //基础的kafka配置~
       static {
            KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
            KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
            KAFKA_PROPERTIES.put("enable.auto.commit", "true");
            KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
            KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
            KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        }
    
        public KafkaProcessor() {
            this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);//初始化配置
            this.myConsumer.subscribe(Arrays.asList("test")); //订阅topic=test
        }
        public void init(int numberOfThreads) {
          //创建一个线程池 
         /**
          * public ThreadPoolExecutor(int corePoolSize,
                                      int maximumPoolSize,
                                      long keepAliveTime,
                                      TimeUnit unit,
                                      BlockingQueue<Runnable> workQueue,
                                      ThreadFactory threadFactory,
                                      RejectedExecutionHandler handler)
           *corePoolSize : 核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线    程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待空闲线程来执行。
    
           *maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
    
           *keepAliveTime : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。
    
           *unit : 时间单位,TimeUnit.SECONDS等。
    
           *workQueue : 任务队列,存储暂时无法执行的任务,等待空闲线程来执行任务。
    
           *threadFactory :  线程工程,用于创建线程。
    
           *handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序
          */
    
          executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,0L,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    
        while (true) {
            ConsumerRecords<String, String> records = myConsumer.poll(100);//每隔时间段进行消息拉取
            for (final ConsumerRecord<String, String> record : records) {
            executor.submit(new KafkaRecordHandler(record)); 
            }
          }
    }
        //别忘记线程池的关闭!
        public void shutdown() {
            if (myConsumer != null) {
            myConsumer.close();
            }
            if (executor != null) {
            executor.shutdown();
            }
            try {
              if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
              executor.shutdownNow();
              }
            }catch (InterruptedException e) {
            executor.shutdownNow();
            }
    }
    }
    
    image.gif

    2.创建工作线程

    // 创建消息线程进行处理
    public class KafkaRecordHandler implements Runnable {
    
        private ConsumerRecord<String, String> record;
    
        public KafkaRecordHandler(ConsumerRecord<String, String> record) {
            this.record = record;
        }
    
        @Override
        public void run() { 
          //业务操作...
            System.out.println("value = "+record.value());
            System.out.println("Thread id = "+ Thread.currentThread().getId());
        }
    }
    
    image.gif

    3.Using ?

    //消费测试
    public class ConsumerTest {
        public static void main(String[] args) {
          KafkaProcessor processor = new KafkaProcessor();
          try {
              processor.init(5);//指定相应的线程数!
          }catch (Exception exp) {
              processor.shutdown();
          }
        }
    }
    
    image.gif

    4.总结

    可能并不适合所有方案,按需定制方案。


    相关文章

      网友评论

          本文标题:Kafka 使用ExecutorService 进行消费

          本文链接:https://www.haomeiwen.com/subject/iecrrctx.html