```/**
* kafka produce 单例模式只初始化一个生产者
*/publicclassKafkaProducer{ privatestaticKafkaTemplate kafkaTemplate =newKafkaTemplate<>(producerFactory());; publicstaticvoidsend(Stringtopic,Stringkey,Stringdata){ ListenableFuture> future = kafkaTemplate.send(topic, key, data); future.addCallback(newCallBackSuccess(),newFailCallBack(topic, key, data)); } publicstaticvoidsend(Stringtopic,Stringdata){ ListenableFuture> future = kafkaTemplate.send(topic, data); future.addCallback(newCallBackSuccess(),newFailCallBack(topic,"",data)); } privatestaticvoidsend(Stringtopic, Integer parti, Long time,Objectkey,Stringvalue){ kafkaTemplate.send(topic,parti,time,key,value); }/**
*
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/privatestaticMap producerConfigs() {Map props = Maps.newHashMap();Stringlist = Properties.appProps.getValue("kafka.broker"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, list); props.put(ProducerConfig.RETRIES_CONFIG,0); props.put(ProducerConfig.BATCH_SIZE_CONFIG,4096); props.put(ProducerConfig.LINGER_MS_CONFIG,1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,40960); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);returnprops; }/** 获取工厂 */privatestaticProducerFactory producerFactory() {returnnewDefaultKafkaProducerFactory<>(producerConfigs()); }/**
* 发送消息后的成功回调
*/staticclassCallBackSuccessimplementsSuccessCallback{ @Override publicvoidonSuccess(Objecto) { System.out.println("成功"); } }/**
* 发送消息后的失败回调
*/staticclassFailCallBackimplementsFailureCallback{Stringtopic;Stringkey;Stringdata; FailCallBack(Stringtopic,Stringkey,Stringdata){this.data = data;this.key = key;this.topic = topic; } @Override publicvoidonFailure(Throwable throwable) { System.out.println("失败 topid:"+topic+",key:"+key+",data:"+data); throwable.printStackTrace(); } } publicstaticvoidmain(String[] args) throws Exception{ KafkaTemplate hh = kafkaTemplate; System.out.print(hh);for(int i=0; i<500;i++){ ListenableFuture> r = hh.send("yyy7","key2",""+i); r.addCallback(newCallBackSuccess(),newFailCallBack("","","")); hh.flush(); Thread.sleep(1000); } }}消费者
//通过注解监听topic进行消费 @Configuration
@EnableKafka public class KafkaConsumer {
finalstaticStringlist = Properties.appProps.getValue("kafka.broker");/**
*
* Description:获取配置
* Date: 2017年7月11日
* @author shaqf
*/privateMap consumerConfigs() {Map props = Maps.newHashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, list); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG,"group1"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); System.out.println("KafkaConsumer consumerConfigs "+ JsonUtil.object2Json(props));returnprops; }/** 获取工厂 */private ConsumerFactory consumerFactory() {returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}/** 获取实例 */@Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory1 =newConcurrentKafkaListenerContainerFactory<>(); factory1.setConsumerFactory(consumerFactory()); factory1.setConcurrency(2); factory1.getContainerProperties().setPollTimeout(3000); System.out.println("KafkaConsumer kafkaListenerContainerFactory factory"+ JsonUtil.object2Json(factory1));returnfactory1;}/**
* topic的消费者组1监听
* @return
*/@Beanpublic Group1Listener listener1() {returnnewGroup1Listener();}/**
* topic的消费者组2监听
* @return
*/@Beanpublic Group2Listener listener2() {returnnewGroup2Listener();}
}
消费者Group 1
消费者组1publicclassGroup1Listener{ @KafkaListener(topics = {"test-topic"})publicvoidlisten(ConsumerRecord<?, ?> record){ Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("listen1 "+ message); } } @KafkaListener(topics = {"task1"},groupId ="group1")publicvoidtask1(ConsumerRecord<?, ?> record){ System.out.println("这是"+" task1 的消费者"); System.out.println("这是group1 topic task1 KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record)); Object message = record.value(); System.out.println("group1 topic task1 "+record.topic()); System.out.println(message); System.out.println(record.key()); System.out.println(record); } @KafkaListener(topics = {"gift"},groupId ="group1")publicvoidgift(ConsumerRecord<String, String> record){ String key = record.key(); Stringvalue= record.value(); System.out.println("groupId1 kafka gift Consumer value:"+value); }}消费者组2
public class Group2Listener {
@KafkaListener(topics = {"taskCmd"})publicvoidtaskCmd(ConsumerRecord<?, ?> record){ System.out.println(" KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record)); Object message = record.value(); System.out.println(" 这是group2 topic taskCmd "+record.topic()); System.out.println(message); System.out.println(record.key()); System.out.println(record);}@KafkaListener(topics = {"task"})publicvoidtask(ConsumerRecord<?, ?> record){ System.out.println("这是group2 topic task KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record)); Object message = record.value(); System.out.println("这是group2 topic task "+record.topic()); System.out.println(message); System.out.println(record.key()); System.out.println(record);}@KafkaListener(topics = {"task1"},groupId ="group2")publicvoidtask1(ConsumerRecord<?, ?> record){ System.out.println("这是group2"+" task1 的消费者"); System.out.println("这是group2 topic task1 KafkaConsumer ---------->>>>>>>>:"+ JsonUtil.object2Json(record)); Object message = record.value(); System.out.println("group2 topic task1 "+record.topic()); System.out.println(message); System.out.println(record.key()); System.out.println(record);}@KafkaListener(topics = {"gift"},groupId ="group2")publicvoidgift(ConsumerRecord<String, String> record){ String key = record.key(); Stringvalue= record.value(); System.out.println("groupId2 kafka gift Consumer value:"+value);}
}
网友评论