kafka的客户端,建立消费者
public class KafkaClient {
public static ConsumerConnector createConsumer() {
Properties consumerProducer = new Properties();
consumerProducer.put("zookeeper.connect", "ip:port");
consumerProducer.put("group.id", "group-1");
consumerProducer.put("serializer.class", "kafka.serializer.StringEncoder");
consumerProducer .put("auto.offset.reset", "smallest");
ConsumerConfig config = new ConsumerConfig(consumerProducer);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
return consumer;
}
public static class ConsumerTest extends Thread {
AtomicBoolean start = new AtomicBoolean(true);
CountDownLatch c;
private String topic;
ConsumerConnector consumer;
public ConsumerTest(String topic, ConsumerConnector consumer) {
this.consumer = consumer;
this.topic = topic;
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = createMessageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while (iterator.hasNext()) {
String j = new String(iterator.next().message());
System.out.println(j);
}
}
}
public static void main(String[] args) throws InterruptedException {
ConsumerConnector c = KafkaClient.createConsumer();
ConsumerTest t = new ConsumerTest("topic_sname", c);
t.start();
Thread.sleep(1000 * 60 * 10);
t.interrupt();
c.shutdown();
}
}
kafka数据收集
public class KafkaDataCollect {
static Logger LOG = Logger.getLogger(KafkaDataCollect.class);
ConsumerConnector consumer = KafkaClient.createConsumer();
//尺度时间戳 用于限定每日更新量
private long ruler = 0;
KafkaDataCollect(){
//初始化尺度时间戳
ruler = getRuler();
}
public long getRuler(){
long current = System.currentTimeMillis();
String date = new java.text.SimpleDateFormat("dd/MM/yyyy").format(new java.util.Date(current));
date = date + " 00:00:00";
long rulertime = 0;
try {
rulertime = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").parse(date).getTime();
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return rulertime;
}
public void dataStreamIn(String topic) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = createMessageStreams.get(topic).get(0);// 获取每次接收到的这个数据
int articleCount = 0;
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
Jedis jedis = new Jedis(ip, 6379);
try {
while (iterator.hasNext()) {
String msgStr = "";
if((System.currentTimeMillis() - ruler) >= 24 * 60 * 60 * 1000)
{
String date = new java.text.SimpleDateFormat("dd/MM/yyyy").format(new java.util.Date(ruler));
msgStr = date + "Kafka data stream collected "+ articleCount+ " articles.";
LOG.info(msgStr);
articleCount = 0;
ruler = ruler + 24 * 60 * 60 * 1000;
}
if((System.currentTimeMillis() - ruler) >= 8 * 60 * 60 * 1000)
{
SendMessage.send("17865153777", msgStr, null, null, null);
msgStr = "";
}
String j = new String(iterator.next().message());
Gson gson = new Gson();
JsonFromKafkaData jsonD = gson.fromJson(j, new TypeToken<JsonFromKafkaData>() {
}.getType());
try {
LOG.info(j);
if(jsonD.getSimId()!=null && !jsonD.getSimId().contains("_"))
jsonD.setSimId("clusterId_"+jsonD.getSimId());
jedis.lpush("kafka-queue", gson.toJson(jsonD));
articleCount++;
} catch (Exception e) {
// TODO: handle exception
LOG.error("Input newData into queue.", e);
}
}
} catch (Exception e) {
jedis.close();
SendMessage.send("178xxxxxxxx", "Kafka data collection process stoped.", null, null, null);
// TODO: handle exception
e.printStackTrace();
}
}
public static void main(String[] args) {
KafkaDataCollect kafkaStream = new KafkaDataCollect();
kafkaStream.dataStreamIn("topic_name");
}
}
网友评论