使用Java代码编写
创建springboot项目,gradle工程
依赖如下:
plugins {
id 'org.springframework.boot' version '2.1.3.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'
group = 'com.ghq.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
生产者客户端如下(注意看注释):
package com.ghq.kafka.server;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProducerFastStart {
public static final String brokerList = "192.168.52.135:9092";
public static final String topic = "topic-demo";
public static Properties initProperties(){
Properties prop = new Properties();
/**
* ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG属性:指定 key的序列化器
*/
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG属性:指定 value 的序列化器
*/
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/**
* ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性:指定生产者连接的kafka集群的地址
* 格式: host1:port1,host2:port2,host3:port3
* 可以设置一个或者多个地址,中间以逗号分隔。此参数默认为 ""
* 这里并不需要填写所有的kafka集群的所有broker的地址,因为生产者会从给定的broker查找到其他的broker的信息
* 建议至少设置两个或两个以上的broker地址,当其中一个宕机的时候,生产者仍然可以连接到kafka集群
*/
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
//prop.put("broker.id",0);
/**
* ProducerConfig.CLIENT_ID_CONFIG=client.id属性: 设置KafkaProducer对应的客户端id
* 默认为""
* 如果客户端不设置,那么kafka会自动生成一个:形式如 producer-1,producer-2 等
*/
prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
return prop;
}
public static void main(String[] args) {
//0. 配置参数
Properties prop = initProperties();
//1. 创建kafka的客户端,并配置参数
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
/**
* 1.1 创建kafka的客户端 并指定 key和value所使用的 序列化 类
*/
//KafkaProducer<String, String> producer = new KafkaProducer<>(prop,new StringSerializer(),new StringSerializer());
//2. 创建待发送的消息记录
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");
//3. 发送消息
while (true){
producer.send(record);
System.out.println(".....................");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//4. 关闭资源
//producer.close();
}
}
消费者客户端如下:
package com.ghq.kafka.client;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerFastClient {
public static final String brokerList = "192.168.52.135:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static Properties initProperties(){
//0. 配置客户端的参数
Properties prop = new Properties();
/**
* 消费者 的key 反序列化器,必须和生产者一致
*/
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/**
* 消费者 的value 反序列化器,必须和生产者一致
*/
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/**
* 0.1 设置broker服务端ip列表
* ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 属性:指定生产者连接的kafka集群的地址
* 格式: host1:port1,host2:port2,host3:port3
* 可以设置一个或者多个地址,中间以逗号分隔。此参数默认为 ""
* 这里并不需要填写所有的kafka集群的所有broker的地址,因为消费者会从给定的broker查找到其他的broker的信息
* 建议至少设置两个或两个以上的broker地址,当其中一个宕机的时候,生产者仍然可以连接到kafka集群
*/
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
/**
* 0.2 设置消费组的名称,默认为""
* 一般该参数设置为 具有业务的值
*/
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
/**
* ProducerConfig.CLIENT_ID_CONFIG=client.id属性: 设置KafkaConsumer对应的客户端id
* 默认为""
* 如果客户端不设置,那么kafka会自动生成一个:形式如 consumer-1,consumer-2 等
*/
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-id-demo");
return prop;
}
public static void main(String[] args) {
//0. 配置客户端的参数
Properties prop = initProperties();
//1. 创建一个消费客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
//2. 订阅主题
consumer.subscribe(Collections.singletonList(topic));
//3. 循环消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("--------------->:" + record.value());
}
}
}
}
执行结果如下:
producer控制台输出如下:
Hello,World
Hello,World
Hello,World
Hello,World
.....
consumer控制台输出如下:
--------------->:Hello,World
20:18:15.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:15.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:15.642 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=25) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:15.643 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
20:18:15.844 [kafka-coordinator-heartbeat-thread | group.demo] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=client-id-demo, groupId=group.demo] Sending Heartbeat request to coordinator master:9092 (id: 2147483647 rack: null)
20:18:15.850 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=client-id-demo, groupId=group.demo] Received successful Heartbeat response
20:18:16.127 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 20 for partition topic-demo-1 returned fetch data (error=NONE, highWaterMark=21, lastStableOffset = 21, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:16.127 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=26) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:16.128 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-1), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2)) to broker slave2:9092 (id: 2 rack: null)
--------------->:Hello,World
20:18:16.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 20 to node slave2:9092 (id: 2 rack: null)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:16.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:16.643 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=27) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:16.643 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
20:18:17.132 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 20 for partition topic-demo-0 returned fetch data (error=NONE, highWaterMark=21, lastStableOffset = 21, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.133 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.134 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=28) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:17.134 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-0), toForget=(), implied=(topic-demo-3, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
--------------->:Hello,World
20:18:17.640 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 0 response partition(s), 4 implied partition(s)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.641 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:17.642 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=29) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:17.642 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
20:18:18.138 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Node 2 sent an incremental fetch response for session 1758705842 with 1 response partition(s), 3 implied partition(s)
20:18:18.138 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Fetch READ_UNCOMMITTED at offset 22 for partition topic-demo-2 returned fetch data (error=NONE, highWaterMark=23, lastStableOffset = 23, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=79)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-3 at offset 22 to node slave2:9092 (id: 2 rack: null)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-1 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-0 at offset 21 to node slave2:9092 (id: 2 rack: null)
20:18:18.139 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Added READ_UNCOMMITTED fetch request for partition topic-demo-2 at offset 23 to node slave2:9092 (id: 2 rack: null)
20:18:18.140 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=client-id-demo, groupId=group.demo] Built incremental fetch (sessionId=1758705842, epoch=30) for node 2. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 4 partition(s)
20:18:18.140 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=client-id-demo, groupId=group.demo] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-demo-2), toForget=(), implied=(topic-demo-3, topic-demo-0, topic-demo-1)) to broker slave2:9092 (id: 2 rack: null)
--------------->:Hello,World
到此完成。
网友评论