Apache Kafka发布了Java版本的Producer,如果读者想要使用其他语言向Kafka生产数据,可以去Kafka官网对应的库,但要注意的是除了Java版本的意外,其他版本都是非官方支持的,笔者这里就使用Java来做演示了。
Java想要引入kafka客户端,maven你懂得
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
gradle如下:
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'
引入的过程中读者或许还需要引入slf4j,应当注意一下。
当我们成功引入kafka-client后,就可以撸代码了,下面先演示一个最简单的消费者:
package cn.gaowenhao;
/*
-----------------------------------------------------
Author : 高文豪
Github : https://github.com/gaowenhao
Blog : https://gaowenhao.cn
-----------------------------------------------------
*/
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class FirstConsumer {
public static void main(String[] args) {
// 创建配置文件
Properties props = new Properties();
// 指定broker
props.put("bootstrap.servers", "localhost:9092");
// 指定消费者组
props.put("group.id", "mygroup");
// 指定序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者对象
KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
// 订阅topic,这个方法需要的参数是Collection对象
consumer.subscribe(Collections.singletonList("mytopic"));
// 开启无限循环
while (true) {
// poll数据, 指定一个超时时间
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
// 遍历拿到的数据集合
for (var record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
当我们运行起上面这段代码,那么我们的consumer就挂起运行了,这时候consumer会从topic中不断地取出最早进入的数据然后打印,下面我们撸一个producer,给他产生一个数据
package cn.gaowenhao;
/*
-----------------------------------------------------
Author : 高文豪
Github : https://github.com/gaowenhao
Blog : https://gaowenhao.cn
-----------------------------------------------------
*/
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class FirstProducer {
public static void main(String[] args) {
// 设置配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 指定序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建producer
KafkaProducer producer = new KafkaProducer<String, String>(props);
// 给mytopic,发送一条消息
producer.send(new ProducerRecord("mytopic", "Hey --By Java"));
// 关闭producer, 读者应该知道几乎所有的close里面都有个flush
producer.close();
}
}
如此我们可以运行上面的代码,这样读者consumer端就会收到类似这样一条消息:
offset = 2, key = null, value = Hey --By Java
读者的offset可能和笔者的不一样,但这里的offset是有序的,如果读者的offset也是2再发一条就是3。
key这个值是可以在producer端指定的,在我们ProducerRecord的时候可以加上一个key值,如果没加consumer这边将会显示null。
value则是ProducerRecord的第三个参数。
更多内容欢迎来笔者的个人博客 https://gaowenhao.cn
网友评论