美文网首页
Kafka JAVA API开发-基础案例

Kafka JAVA API开发-基础案例

作者: 学术界末流打工人 | 来源:发表于2020-02-04 12:23 被阅读0次

配置

首先在maven项目目录中的pom.xml中添加dependency

<properties>
  <kafka.version>0.9.0.0</kafka.version>
</properties>

<!-- Kafka 依赖-->
 <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>${kafka.version}</version>
</dependency>

Producer 开发

开发Properties类

定义静态常量

public class KafkaProperties {

    public static final String ZK = "192.168.1.113:2181";

    public static final String TOPIC = "hello_topic";

    public static final String BROKER_LIST = "192.168.1.113:9092";

}

开发 KafkaProducer 类

创建需要的属性成员

private String topic;

// import kafka.javaapi.producer.Producer;
private Producer<Integer, String> producer;

创建构造方法

public KafkaProducer(String topic) {
  this.topic = topic;

  // 配置 ProducerConfig 需要的Properties类型参数
  Properties properties = new Properties();

  // 指定broker list
  properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
  // 指定序列化类,具体类型参见源代码
  properties.put("serializer.class","kafka.serializer.StringEncoder");
  properties.put("request.required.acks","1");

  //import kafka.producer.ProducerConfig;
  producer = new Producer<Integer, String>(new ProducerConfig(properties));
 }

配置工作线程

// 将KafkaProducer继承Thread
public class KafkaProducer extends Thread{

  @Override
  public void run() {

  int messageNo = 1;

  while(true) {
    String message = "message_" + messageNo;
    //import kafka.producer.KeyedMessage;
    producer.send(new KeyedMessage<Integer, String>(topic, message));
    System.out.println("Sent: " + message);

    messageNo ++ ;

    try{
      Thread.sleep(2000);
    } catch (Exception e){
      e.printStackTrace();
    }
   }

  }

}

开发 Consumer 类

开发KafkaConsumer类

定义常量和构造方法

private String topic;

public KafkaConsumer(String topic){
  this.topic = topic;
}

开发 createConnector 方法

private ConsumerConnector createConnector(){
  Properties properties = new Properties();
  // 定义zookeeper链接
  properties.put("zookeeper.connect",KafkaProperties.ZK);
  // 定义 group id
  properties.put("group.id",KafkaProperties.GROUP_ID);
  // import kafka.consumer.ConsumerConfig;
  return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}

配置工作线程

public class KafkaConsumer extends Thread{
  @Override
  public void run(){
    // import kafka.javaapi.consumer.ConsumerConnector;
    ConsumerConnector consumer = createConnector();

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic,1);

    // String: topic
    // List<KafkaStream<byte[],byte[]>> 对应的数据流
    Map<String, List<KafkaStream<byte[],byte[]>>> messagetream = consumer.createMessageStreams(topicCountMap);
    // 获取我们每次
    // import kafka.consumer.KafkaStream;
    KafkaStream<byte[],byte[]> stream = messagetream.get(topic).get(0);
    // import kafka.consumer.ConsumerIterator;
    ConsumerIterator<byte[],byte[]> iterator = stream.iterator();

    while(iterator.hasNext()){
      String message = new String(iterator.next().message());
      System.out.println("rec:"+message);
    }
  }
}

完整代码

KafkaProperties.java

package com.imooc.spark.kafka;

/**
 * Kafka 常用配置文件
 */
public class KafkaProperties {

    public static final String ZK = "192.168.1.113:2181";

    public static final String TOPIC = "hello_topic";

    public static final String BROKER_LIST = "192.168.1.113:9092";

    public static final String GROUP_ID = "test_group1";
}

KafkaProducer.java

package com.imooc.spark.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Kafka生产者
 */
public class KafkaProducer extends Thread{

    private String topic;

    // import kafka.javaapi.producer.Producer;
    private Producer<Integer, String> producer;

    public KafkaProducer(String topic) {
        this.topic = topic;

        // 配置 ProducerConfig 需要的Properties类型参数
        Properties properties = new Properties();

        // 指定broker list
        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
        // 指定序列化类,具体类型参见源代码
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");

        //import kafka.producer.ProducerConfig;
        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }


    @Override
    public void run() {

        int messageNo = 1;

        while(true) {
            String message = "message_" + messageNo;
            //import kafka.producer.KeyedMessage;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("Sent: " + message);

            messageNo ++ ;

            try{
                Thread.sleep(2000);
            } catch (Exception e){
                e.printStackTrace();
            }
        }

    }
}

KafkaConsumer 类

package com.imooc.spark.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer extends Thread{
    private String topic;

    public KafkaConsumer(String topic){
        this.topic = topic;
    }

    private ConsumerConnector createConnector(){
        Properties properties = new Properties();
        // 定义zookeeper链接
        properties.put("zookeeper.connect",KafkaProperties.ZK);
        // 定义 group id
        properties.put("group.id",KafkaProperties.GROUP_ID);
        // import kafka.consumer.ConsumerConfig;
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run(){
        // import kafka.javaapi.consumer.ConsumerConnector;
        ConsumerConnector consumer = createConnector();

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic,1);

        // String: topic
        // List<KafkaStream<byte[],byte[]>> 对应的数据流
        Map<String, List<KafkaStream<byte[],byte[]>>> messagetream = consumer.createMessageStreams(topicCountMap);
        // 获取我们每次
        // import kafka.consumer.KafkaStream;
        KafkaStream<byte[],byte[]> stream = messagetream.get(topic).get(0);
        // import kafka.consumer.ConsumerIterator;
        ConsumerIterator<byte[],byte[]> iterator = stream.iterator();

        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("rec:"+message);
        }
    }
}

测试类 KafkaClientApp.java

package com.imooc.spark.kafka;

/**
 * Kafka Java API 测试
 */
public class KafkaClientApp {

    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();

        new KafkaConsumer(KafkaProperties.TOPIC).start();
    }
}


References

  1. Kafka 官网
    2.Spark Streaming实时流处理项目实战

相关文章

网友评论

      本文标题:Kafka JAVA API开发-基础案例

      本文链接:https://www.haomeiwen.com/subject/fookxhtx.html