美文网首页Kafka
kafka_02_kafka快速入门案例

kafka_02_kafka快速入门案例

作者: 平头哥2 | 来源:发表于2019-03-23 19:55 被阅读0次

使用Java代码编写
创建springboot项目,gradle工程
依赖如下:

plugins {
    id 'org.springframework.boot' version '2.1.3.RELEASE'
    id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.ghq.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.apache.kafka:kafka-streams'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

生产者客户端如下(注意看注释):

package com.ghq.kafka.server;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


public class ProducerFastStart {

    public static final String brokerList = "192.168.52.135:9092";
    public static final String topic = "topic-demo";

    public static Properties initProperties(){
        Properties prop = new Properties();

        /**
         * ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG属性:指定 key的序列化器
         */
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        /**
         * ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG属性:指定 value 的序列化器
         */
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        /**
         * ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性:指定生产者连接的kafka集群的地址
         * 格式: host1:port1,host2:port2,host3:port3
         * 可以设置一个或者多个地址,中间以逗号分隔。此参数默认为 ""
         * 这里并不需要填写所有的kafka集群的所有broker的地址,因为生产者会从给定的broker查找到其他的broker的信息
         * 建议至少设置两个或两个以上的broker地址,当其中一个宕机的时候,生产者仍然可以连接到kafka集群
         */
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        //prop.put("broker.id",0);

        /**
         * ProducerConfig.CLIENT_ID_CONFIG=client.id属性: 设置KafkaProducer对应的客户端id
         * 默认为""
         * 如果客户端不设置,那么kafka会自动生成一个:形式如 producer-1,producer-2 等
         */
        prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
        return prop;
    }

    public static void main(String[] args) {
        //0. 配置参数
        Properties prop = initProperties();

        //1. 创建kafka的客户端,并配置参数
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        /**
         * 1.1 创建kafka的客户端 并指定 key和value所使用的 序列化 类
         */
        //KafkaProducer<String, String> producer = new KafkaProducer<>(prop,new StringSerializer(),new StringSerializer());

        //2. 创建待发送的消息记录
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        //3. 发送消息
        while (true){
            producer.send(record);
            System.out.println(".....................");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //4. 关闭资源
        //producer.close();

    }
}

消费者客户端如下:

package com.ghq.kafka.client;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastClient {

    public static final String brokerList = "192.168.52.135:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static Properties initProperties(){
        //0. 配置客户端的参数
        Properties prop = new Properties();

        /**
         * 消费者 的key 反序列化器,必须和生产者一致
         */
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        /**
         * 消费者 的value 反序列化器,必须和生产者一致
         */
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        /**
         * 0.1 设置broker服务端ip列表
         * ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性:指定生产者连接的kafka集群的地址
         * 格式: host1:port1,host2:port2,host3:port3
         * 可以设置一个或者多个地址,中间以逗号分隔。此参数默认为 ""
         * 这里并不需要填写所有的kafka集群的所有broker的地址,因为消费者会从给定的broker查找到其他的broker的信息
         * 建议至少设置两个或两个以上的broker地址,当其中一个宕机的时候,生产者仍然可以连接到kafka集群
         */
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        /**
         * 0.2 设置消费组的名称,默认为""
         * 一般该参数设置为 具有业务的值
         */
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        /**
         * ProducerConfig.CLIENT_ID_CONFIG=client.id属性: 设置KafkaConsumer对应的客户端id
         * 默认为""
         * 如果客户端不设置,那么kafka会自动生成一个:形式如 consumer-1,consumer-2 等
         */
        prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-id-demo");
        return prop;
    }

    public static void main(String[] args) {
        //0. 配置客户端的参数
        Properties prop = initProperties();

        //1. 创建一个消费客户端实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);


        //2. 订阅主题
        consumer.subscribe(Collections.singletonList(topic));

        //3. 循环消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("--------------->:" + record.value());
            }
        }

    }
}

执行结果如下:

producer控制台输出如下:

Hello,World
Hello,World
Hello,World
Hello,World
.....

consumer控制台输出如下:

--------------->:Hello,World
20:18:15.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:15.642 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=25) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:15.643 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
20:18:15.844 [kafka-coordinator-heartbeat-thread | group.demo] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=client-id-demo, groupId=group.demo] Sending Heartbeat request to coordinator master:9092 (id: 2147483647 rack: null)
20:18:15.850 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=client-id-demo, groupId=group.demo] Received successful Heartbeat response
20:18:16.127 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 20 for partition topic-demo-1 returned fetch data (error=NONE, highWaterMark=21, lastStableOffset = 21, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=26) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-1), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2)) to broker slave2:9092 (id: 2 rack: null)
--------------->:Hello,World
20:18:16.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:16.643 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=27) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:16.643 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
20:18:17.132 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 20 for partition topic-demo-0 returned fetch data (error=NONE, highWaterMark=21, lastStableOffset = 21, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.134 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=28) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:17.134 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-0), toForget=(), implied=(topic-demo-3, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
--------------->:Hello,World
20:18:17.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.642 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=29) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:17.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
20:18:18.138 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
20:18:18.138 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 22 for partition topic-demo-2 returned fetch data (error=NONE, highWaterMark=23, lastStableOffset = 23, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 23 to node slave2:9092 (id: 2 rack: null)
20:18:18.140 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=30) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:18.140 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-2), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
--------------->:Hello,World

到此完成。

相关文章

网友评论

    本文标题:kafka_02_kafka快速入门案例

    本文链接:https://www.haomeiwen.com/subject/gkcevqtx.html