美文网首页
java操作kafka生产消费

java操作kafka生产消费

作者: 先生_吕 | 来源:发表于2017-10-15 17:56 被阅读71次

    前言

    kafka版本更新之后,其java调用的API也发生了变化,具体是从2.11.0.9.0之后(不包括0.9.0)版本发生了变化,变化之后的API操作更为简洁方便,下面是新版本后的生产消费实现方式,旧版本方式请浏览
    http://www.jianshu.com/p/d30419c8ffd4

    生产者实例
    import kafka.serializer.StringEncoder;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.clients.producer.Producer;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Properties;
    
    /**
     * @author lvfang
     * @create 2017-10-15 11:17
     * @desc
     **/
    public class KafkaProduce extends Thread {
        private String topic;//主题
    
        private String src;//数据源
    
        public KafkaProduce(String topic,String src){
            super();
            this.topic = topic;
            this.src = src;
        }
    
        //创建生产者
        private Producer<Integer, String> createProducer(){
            Properties properties = new Properties();
    
            //zookeeper单节点
            properties.put("zookeeper.connect","192.168.90.240:2181");
            properties.put("serializer.class", StringEncoder.class.getName());
            // 声明kafka集群的 broker
    
            //kafka单节点
            properties.put("metadata.broker.list", "192.168.90.240:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.90.240:9092");
    
            return new KafkaProducer<Integer, String>(properties);
        }
    
        public void run() {
            BufferedReader br = null;
            try {
    
                br = new BufferedReader(new FileReader(src));
                // 创建生产者
                Producer producer = createProducer();
    
                String line = null;
                // 循环发送消息到kafka
                while ((line = br.readLine()) != null) {
                    System.out.println("生产数据为:"+line);
                    producer.send(new ProducerRecord(topic,line + "\n"));
    
                    // 发送消息的时间间隔
                    Thread.sleep(3000);//TimeUnit.SECONDS.sleep(333);
                }
            } catch (Exception e) {
            } finally {
                try {
                    if (br != null) br.close();
                } catch (IOException e) {}
            }
        }
    
        public static void main(String[] args) {
            // 使用kafka集群中创建好的主题 test
            new KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start();
        }
    }
    
    
    2017-10-15_175407.png
    消费者实例
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    
    /**
     * @author lvfang
     * @create 2017-10-15 11:17
     * @desc
     **/
    public class KafkaCusumer extends Thread  {
        private String topic;//主题
    
        public final String SRC = "D:/testdata/testData.txt";
    
        public KafkaCusumer(String topic){
            super();
            this.topic = topic;
        }
    
        //创建消费者
        private Consumer<String, String> createConsumer(){
            Properties properties = new Properties();
            //声明zookeeper集群链接地址
            properties.put("zookeeper.connect","192.168.90.240:2181");
            //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
            properties.put("group.id", "group1");
            properties.put("zookeeper.session.timeout.ms", "100000");
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.90.240:2181");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            return new KafkaConsumer<String, String>(properties);
    
        }
    
        @Override
        public void run() {
            //创建消费者
            Consumer consumer = createConsumer();
    
            consumer.subscribe(Arrays.asList(topic));
            System.out.println("消费者对象:"  + consumer);
            while (true) {
    
                ConsumerRecords<String, String> records = consumer.poll(100);
                System.out.println(records);
                for (ConsumerRecord<String, String> record : records) {
    
                    System.out.println(record.key()+record.value());
                    System.out.printf("接收到: ", record.offset(), record.key(), record.value());
                }
            }
        }
    
        public static void main(String[] args) {
            // 使用kafka集群中创建好的主题 test
            new KafkaCusumer("htb_position_test").start();
        }
    }
    
    
    2017-10-15_175519.png

    相关文章

      网友评论

          本文标题:java操作kafka生产消费

          本文链接:https://www.haomeiwen.com/subject/aepcuxtx.html