美文网首页
05 java生成kafka中的生产者消费者

05 java生成kafka中的生产者消费者

作者: 张力的程序园 | 来源:发表于2020-06-23 17:48 被阅读0次

    本节将演示使用java完成kafka的生产者消费者测试。

    1、前提约束

    • 已完成kafka服务搭建并启动
      https://www.jianshu.com/p/1a7b9970d073
      假设该kafka服务所在机子ip为192.168.100.141,且已关闭防火墙
    • 本地hosts中已完成自定义域名和linux中ip的对应关系

    2、操作步骤

    • 创建一个maven工程,加入以下依赖
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.9.0.0</version>
            </dependency>
    
    • 在src/main/java文件夹中创建Producer.java
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class Producer {
    
    
        public static void main(String[] args) throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.100.141:9092");
            props.put("group.id", "1");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", StringSerializer.class.getName());
            props.put("value.serializer", StringSerializer.class.getName());
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
    
            producer.send(new ProducerRecord<String, String>("test", "zhangli"));
            producer.close();
        }
    }
    
    • 在src/main/java文件夹中创建Consumer.java
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class Consumer {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.100.141:9092");
            props.put("group.id", "1");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                records.forEach((ConsumerRecord<String, String> record) -> {
                    System.out.println(" value ====" + record.value() + " topic ===" + record.topic());
                });
            }
        }
    }
    
    • 测试
      先启动Consumer,再启动Producer,即可完成生产者消费者的模拟。

    相关文章

      网友评论

          本文标题:05 java生成kafka中的生产者消费者

          本文链接:https://www.haomeiwen.com/subject/nieqfktx.html