美文网首页
kafka生产者连接池模式

kafka生产者连接池模式

作者: 会飞的蜗牛66666 | 来源:发表于2019-08-16 17:45 被阅读0次

    package com.ky.produce;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;

    import java.util.Properties;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    /**

    • @author xwj

    • 线程池生产者
      */
      public class ProduceThreadPool {

      private static Properties properties = new Properties();

      private static KafkaProducer<String, String> producer;

      private static ThreadPoolExecutor service;

      private static TimeUnit timeUnit = TimeUnit.SECONDS;

      private static BlockingQueue blockingQueue = new LinkedBlockingQueue<Runnable>();

      static {
      int corePoolSize = 40;
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.11.110:6667,10.1.11.111:6667,10.1.11.112:6667");
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
      properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
      properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
      properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072);
      properties.put(ProducerConfig.ACKS_CONFIG, "all");
      producer = new KafkaProducer<>(properties);
      int maximumPoolSize = 100;
      long keepAliveTime = 60;
      service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue);
      System.out.println("init ok....");
      }

    //发送数据
    public static void sendData(String topic, String msg) {
        try {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, String.valueOf(System.currentTimeMillis()), msg);
            service.submit(new ProducerThread(producer, record));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    }

    package com.ky.produce;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    /**

    • @author xwj

    • <p>

    • 生产者线程
      */
      public class ProducerThread implements Runnable {

      private Logger log = LoggerFactory.getLogger(ProducerThread.class);

      private KafkaProducer<String, String> producer;
      private ProducerRecord<String, String> record;

      ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
      this.producer = producer;
      this.record = record;
      }

      @Override
      public void run() {
      producer.send(record, (metadata, e) -> {
      if (null != e) {
      e.printStackTrace();
      }
      if (null != metadata) {
      log.info("消息发送成功 : " + String.format("offset: %s, partition:%s, topic:%s timestamp:%s",
      metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
      }
      });
      }

    }

    相关文章

      网友评论

          本文标题:kafka生产者连接池模式

          本文链接:https://www.haomeiwen.com/subject/efiqsctx.html