美文网首页
KafKa Consumer 高级别的API

KafKa Consumer 高级别的API

作者: felix_feng | 来源:发表于2016-08-17 11:03 被阅读117次

importjava.util.HashMap;

importjava.util.List;

importjava.util.Map;

importjava.util.Properties;

importkafka.consumer.Consumer;

importkafka.consumer.ConsumerConfig;

importkafka.consumer.ConsumerIterator;

importkafka.consumer.KafkaStream;

importkafka.javaapi.consumer.ConsumerConnector;

importkafka.message.MessageAndMetadata;

/**

* offset在zookeeper中记录,以group.id为key 分区和customer的对应关系由Kafka维护

*

* @author 崔磊

* @date 2015年11月4日 上午11:44:15

*/

publicclassMyHighLevelConsumer {

/**

* 该consumer所属的组ID

*/

privateString groupid;

/**

* 该consumer的ID

*/

privateString consumerid;

/**

* 每个topic开几个线程?

*/

privateintthreadPerTopic;

publicMyHighLevelConsumer(String groupid, String consumerid,intthreadPerTopic) {

super();

this.groupid = groupid;

this.consumerid = consumerid;

this.threadPerTopic = threadPerTopic;

}

publicvoidconsume() {

Properties props =newProperties();

props.put("group.id", groupid);

props.put("consumer.id", consumerid);

props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);

props.put("zookeeper.session.timeout.ms","60000");

props.put("zookeeper.sync.time.ms","2000");

// props.put("auto.commit.interval.ms", "1000");

ConsumerConfig config =newConsumerConfig(props);

ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);

Map topicCountMap =newHashMap();

// 设置每个topic开几个线程

topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);

// 获取stream

Map>> streams = connector.createMessageStreams(topicCountMap);

// 为每个stream启动一个线程消费消息

for(KafkaStream stream : streams.get(KafkaProperties.TOPIC)) {

newMyStreamThread(stream).start();

}

}

/**

* 每个consumer的内部线程

*

* @author cuilei05

*

*/

privateclassMyStreamThreadextendsThread {

privateKafkaStream stream;

publicMyStreamThread(KafkaStream stream) {

super();

this.stream = stream;

}

@Override

publicvoidrun() {

ConsumerIterator streamIterator = stream.iterator();

// 逐条处理消息

while(streamIterator.hasNext()) {

MessageAndMetadata message = streamIterator.next();

String topic = message.topic();

intpartition = message.partition();

longoffset = message.offset();

String key =newString(message.key());

String msg =newString(message.message());

// 在这里处理消息,这里仅简单的输出

// 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理

System.out.println("consumerid:"+ consumerid +", thread : "+ Thread.currentThread().getName()

+", topic : "+ topic +", partition : "+ partition +", offset : "+ offset +" , key : "

+ key +" , mess : "+ msg);

}

}

}

publicstaticvoidmain(String[] args) {

String groupid ="myconsumergroup";

MyHighLevelConsumer consumer1 =newMyHighLevelConsumer(groupid,"myconsumer1",3);

MyHighLevelConsumer consumer2 =newMyHighLevelConsumer(groupid,"myconsumer2",3);

consumer1.consume();

consumer2.consume();

}

}

相关文章

网友评论

      本文标题:KafKa Consumer 高级别的API

      本文链接:https://www.haomeiwen.com/subject/nfsmsttx.html