美文网首页Kafka
kafka_02_kafka快速入门案例

kafka_02_kafka快速入门案例

作者: 平头哥2 | 来源:发表于2019-03-23 19:55 被阅读0次

    使用Java代码编写
    创建springboot项目,gradle工程
    依赖如下:

    plugins {
        id 'org.springframework.boot' version '2.1.3.RELEASE'
        id 'java'
    }
    
    apply plugin: 'io.spring.dependency-management'
    
    group = 'com.ghq.kafka'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'
    
    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
        implementation 'org.springframework.boot:spring-boot-starter-data-redis'
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'org.apache.kafka:kafka-streams'
        implementation 'org.springframework.kafka:spring-kafka'
        compileOnly 'org.projectlombok:lombok'
        runtimeOnly 'org.springframework.boot:spring-boot-devtools'
        annotationProcessor 'org.projectlombok:lombok'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.kafka:spring-kafka-test'
    }
    
    

    生产者客户端如下(注意看注释):

    package com.ghq.kafka.server;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    
    public class ProducerFastStart {
    
        public static final String brokerList = "192.168.52.135:9092";
        public static final String topic = "topic-demo";
    
        public static Properties initProperties(){
            Properties prop = new Properties();
    
            /**
             * ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG属性:指定 key的序列化器
             */
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            /**
             * ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG属性:指定 value 的序列化器
             */
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            /**
             * ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性:指定生产者连接的kafka集群的地址
             * 格式: host1:port1,host2:port2,host3:port3
             * 可以设置一个或者多个地址,中间以逗号分隔。此参数默认为 ""
             * 这里并不需要填写所有的kafka集群的所有broker的地址,因为生产者会从给定的broker查找到其他的broker的信息
             * 建议至少设置两个或两个以上的broker地址,当其中一个宕机的时候,生产者仍然可以连接到kafka集群
             */
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            //prop.put("broker.id",0);
    
            /**
             * ProducerConfig.CLIENT_ID_CONFIG=client.id属性: 设置KafkaProducer对应的客户端id
             * 默认为""
             * 如果客户端不设置,那么kafka会自动生成一个:形式如 producer-1,producer-2 等
             */
            prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
            return prop;
        }
    
        public static void main(String[] args) {
            //0. 配置参数
            Properties prop = initProperties();
    
            //1. 创建kafka的客户端,并配置参数
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
    
            /**
             * 1.1 创建kafka的客户端 并指定 key和value所使用的 序列化 类
             */
            //KafkaProducer<String, String> producer = new KafkaProducer<>(prop,new StringSerializer(),new StringSerializer());
    
            //2. 创建待发送的消息记录
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");
    
            //3. 发送消息
            while (true){
                producer.send(record);
                System.out.println(".....................");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //4. 关闭资源
            //producer.close();
    
        }
    }
    

    消费者客户端如下:

    package com.ghq.kafka.client;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class ConsumerFastClient {
    
        public static final String brokerList = "192.168.52.135:9092";
        public static final String topic = "topic-demo";
        public static final String groupId = "group.demo";
    
        public static Properties initProperties(){
            //0. 配置客户端的参数
            Properties prop = new Properties();
    
            /**
             * 消费者 的key 反序列化器,必须和生产者一致
             */
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            /**
             * 消费者 的value 反序列化器,必须和生产者一致
             */
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            /**
             * 0.1 设置broker服务端ip列表
             * ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性:指定生产者连接的kafka集群的地址
             * 格式: host1:port1,host2:port2,host3:port3
             * 可以设置一个或者多个地址,中间以逗号分隔。此参数默认为 ""
             * 这里并不需要填写所有的kafka集群的所有broker的地址,因为消费者会从给定的broker查找到其他的broker的信息
             * 建议至少设置两个或两个以上的broker地址,当其中一个宕机的时候,生产者仍然可以连接到kafka集群
             */
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            /**
             * 0.2 设置消费组的名称,默认为""
             * 一般该参数设置为 具有业务的值
             */
            prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    
            /**
             * ProducerConfig.CLIENT_ID_CONFIG=client.id属性: 设置KafkaConsumer对应的客户端id
             * 默认为""
             * 如果客户端不设置,那么kafka会自动生成一个:形式如 consumer-1,consumer-2 等
             */
            prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-id-demo");
            return prop;
        }
    
        public static void main(String[] args) {
            //0. 配置客户端的参数
            Properties prop = initProperties();
    
            //1. 创建一个消费客户端实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
    
    
            //2. 订阅主题
            consumer.subscribe(Collections.singletonList(topic));
    
            //3. 循环消费消息
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("--------------->:" + record.value());
                }
            }
    
        }
    }
    

    执行结果如下:

    producer控制台输出如下:

    Hello,World
    Hello,World
    Hello,World
    Hello,World
    .....
    

    consumer控制台输出如下:

    --------------->:Hello,World
    20:18:15.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
    20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 20 to node slave2:9092 (id: 2 rack: null)
    20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
    20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:15.642 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=25) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
    20:18:15.643 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
    20:18:15.844 [kafka-coordinator-heartbeat-thread | group.demo] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=client-id-demo, groupId=group.demo] Sending Heartbeat request to coordinator master:9092 (id: 2147483647 rack: null)
    20:18:15.850 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=client-id-demo, groupId=group.demo] Received successful Heartbeat response
    20:18:16.127 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
    20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 20 for partition topic-demo-1 returned fetch data (error=NONE, highWaterMark=21, lastStableOffset = 21, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
    20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
    20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:16.128 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=26) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
    20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-1), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2)) to broker slave2:9092 (id: 2 rack: null)
    --------------->:Hello,World
    20:18:16.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
    20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
    20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:16.643 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=27) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
    20:18:16.643 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
    20:18:17.132 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
    20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 20 for partition topic-demo-0 returned fetch data (error=NONE, highWaterMark=21, lastStableOffset = 21, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
    20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:17.134 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=28) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
    20:18:17.134 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-0), toForget=(), implied=(topic-demo-3, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
    --------------->:Hello,World
    20:18:17.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
    20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:17.642 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=29) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
    20:18:17.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
    20:18:18.138 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
    20:18:18.138 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 22 for partition topic-demo-2 returned fetch data (error=NONE, highWaterMark=23, lastStableOffset = 23, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
    20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
    20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
    20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 23 to node slave2:9092 (id: 2 rack: null)
    20:18:18.140 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=30) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
    20:18:18.140 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-2), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
    --------------->:Hello,World
    
    

    到此完成。

    相关文章

      网友评论

        本文标题:kafka_02_kafka快速入门案例

        本文链接:https://www.haomeiwen.com/subject/gkcevqtx.html