配置
首先在maven项目目录中的pom.xml中添加dependency
<properties>
<kafka.version>0.9.0.0</kafka.version>
</properties>
<!-- Kafka 依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
Producer 开发
开发Properties类
定义静态常量
public class KafkaProperties {
public static final String ZK = "192.168.1.113:2181";
public static final String TOPIC = "hello_topic";
public static final String BROKER_LIST = "192.168.1.113:9092";
}
开发 KafkaProducer 类
创建需要的属性成员
private String topic;
// import kafka.javaapi.producer.Producer;
private Producer<Integer, String> producer;
创建构造方法
public KafkaProducer(String topic) {
this.topic = topic;
// 配置 ProducerConfig 需要的Properties类型参数
Properties properties = new Properties();
// 指定broker list
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
// 指定序列化类,具体类型参见源代码
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");
//import kafka.producer.ProducerConfig;
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
配置工作线程
// 将KafkaProducer继承Thread
public class KafkaProducer extends Thread{
@Override
public void run() {
int messageNo = 1;
while(true) {
String message = "message_" + messageNo;
//import kafka.producer.KeyedMessage;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("Sent: " + message);
messageNo ++ ;
try{
Thread.sleep(2000);
} catch (Exception e){
e.printStackTrace();
}
}
}
}
开发 Consumer 类
开发KafkaConsumer类
定义常量和构造方法
private String topic;
public KafkaConsumer(String topic){
this.topic = topic;
}
开发 createConnector 方法
private ConsumerConnector createConnector(){
Properties properties = new Properties();
// 定义zookeeper链接
properties.put("zookeeper.connect",KafkaProperties.ZK);
// 定义 group id
properties.put("group.id",KafkaProperties.GROUP_ID);
// import kafka.consumer.ConsumerConfig;
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
配置工作线程
public class KafkaConsumer extends Thread{
@Override
public void run(){
// import kafka.javaapi.consumer.ConsumerConnector;
ConsumerConnector consumer = createConnector();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic,1);
// String: topic
// List<KafkaStream<byte[],byte[]>> 对应的数据流
Map<String, List<KafkaStream<byte[],byte[]>>> messagetream = consumer.createMessageStreams(topicCountMap);
// 获取我们每次
// import kafka.consumer.KafkaStream;
KafkaStream<byte[],byte[]> stream = messagetream.get(topic).get(0);
// import kafka.consumer.ConsumerIterator;
ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("rec:"+message);
}
}
}
完整代码
KafkaProperties.java
package com.imooc.spark.kafka;
/**
* Kafka 常用配置文件
*/
public class KafkaProperties {
public static final String ZK = "192.168.1.113:2181";
public static final String TOPIC = "hello_topic";
public static final String BROKER_LIST = "192.168.1.113:9092";
public static final String GROUP_ID = "test_group1";
}
KafkaProducer.java
package com.imooc.spark.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
/**
* Kafka生产者
*/
public class KafkaProducer extends Thread{
private String topic;
// import kafka.javaapi.producer.Producer;
private Producer<Integer, String> producer;
public KafkaProducer(String topic) {
this.topic = topic;
// 配置 ProducerConfig 需要的Properties类型参数
Properties properties = new Properties();
// 指定broker list
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
// 指定序列化类,具体类型参见源代码
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");
//import kafka.producer.ProducerConfig;
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
@Override
public void run() {
int messageNo = 1;
while(true) {
String message = "message_" + messageNo;
//import kafka.producer.KeyedMessage;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("Sent: " + message);
messageNo ++ ;
try{
Thread.sleep(2000);
} catch (Exception e){
e.printStackTrace();
}
}
}
}
KafkaConsumer 类
package com.imooc.spark.kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer extends Thread{
private String topic;
public KafkaConsumer(String topic){
this.topic = topic;
}
private ConsumerConnector createConnector(){
Properties properties = new Properties();
// 定义zookeeper链接
properties.put("zookeeper.connect",KafkaProperties.ZK);
// 定义 group id
properties.put("group.id",KafkaProperties.GROUP_ID);
// import kafka.consumer.ConsumerConfig;
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run(){
// import kafka.javaapi.consumer.ConsumerConnector;
ConsumerConnector consumer = createConnector();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic,1);
// String: topic
// List<KafkaStream<byte[],byte[]>> 对应的数据流
Map<String, List<KafkaStream<byte[],byte[]>>> messagetream = consumer.createMessageStreams(topicCountMap);
// 获取我们每次
// import kafka.consumer.KafkaStream;
KafkaStream<byte[],byte[]> stream = messagetream.get(topic).get(0);
// import kafka.consumer.ConsumerIterator;
ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("rec:"+message);
}
}
}
测试类 KafkaClientApp.java
package com.imooc.spark.kafka;
/**
* Kafka Java API 测试
*/
public class KafkaClientApp {
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}
网友评论