package com.ky.produce;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
-
@author xwj
-
线程池生产者
*/
public class ProduceThreadPool {private static Properties properties = new Properties();
private static KafkaProducer<String, String> producer;
private static ThreadPoolExecutor service;
private static TimeUnit timeUnit = TimeUnit.SECONDS;
private static BlockingQueue blockingQueue = new LinkedBlockingQueue<Runnable>();
static {
int corePoolSize = 40;
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.11.110:6667,10.1.11.111:6667,10.1.11.112:6667");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072);
properties.put(ProducerConfig.ACKS_CONFIG, "all");
producer = new KafkaProducer<>(properties);
int maximumPoolSize = 100;
long keepAliveTime = 60;
service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue);
System.out.println("init ok....");
}
//发送数据
public static void sendData(String topic, String msg) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, String.valueOf(System.currentTimeMillis()), msg);
service.submit(new ProducerThread(producer, record));
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.ky.produce;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
-
@author xwj
-
<p>
-
生产者线程
*/
public class ProducerThread implements Runnable {private Logger log = LoggerFactory.getLogger(ProducerThread.class);
private KafkaProducer<String, String> producer;
private ProducerRecord<String, String> record;ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}@Override
public void run() {
producer.send(record, (metadata, e) -> {
if (null != e) {
e.printStackTrace();
}
if (null != metadata) {
log.info("消息发送成功 : " + String.format("offset: %s, partition:%s, topic:%s timestamp:%s",
metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
}
});
}
}
网友评论