本文将从使用 Kafka 的 Java 客户端生产者入手,剖析 Producer API 是如何向 Kafka 集群发送消息的。
基本使用
Hello Kafka
以使用 Maven 为例,引入 kafka-clients 的依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version</version>
</dependency>
一个简单的生产端代码,使用 KafkaProducer 发送一条 “Hello, Kafka !”
。
public class KafkaProducerDemo {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "topic-demo";
public static void main(String[] args) {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello, Kafka !");
try {
producer.send(record);
Thread.sleep(500L);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
/**
* 初始化参数
*
* @return
*/
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
我们注意到,被 KafkaProducer 发送的消息是一个 ProducerRecord 类型的对象。这个 ProducerRecord 类型使用泛型的方式定义其发送消息的 Key 和 Value。这里为了简单演示,都采用字符串类型,向 topic-demo 主题发送一个内容为 “Hello, Kafka !”
的消息。
我们发现在初始化参数的方法 initConfig() 方法中,我们使用了很多字符串类型的参数,而在实际开发过程中难免有可能在进行字符串拼写的时候由于疏忽拼错单词等错误,而这些错误往往在编辑阶段又是不能被及时发现,Kafka 的 kafka-clients 中为我们提供了常量来表示,于是初始化参数的代码修改如下:
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return props;
}
而这里 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 配置的 Broker 列表不一定配置所有 Broker 的地址,生产者会从给定的 Broker 里查找到其他 Broker 的信息。不过建议至少要设置两个以上的 Broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
我们还注意到,KafkaProducer 源码中还有一个构造函数,如果在初始化参数方法 initConfig()
方法中没有设置 key.serializer
和 value.serializer
,那么也可以通过 KafkaProducer 的构造函数传入:
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
KafkaProducer 是线程安全的,我们可以在多线程共享单个 KafkaProducer。官方描述:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
创建 ProducerRecord
public ProducerRecord(String topic, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
- 若指定了 partition,则发送至指定的 partition。
- 若没指定 partition,但指定了 key,则根据 key 和分区规则指定 partition。
- 若既没有指定 partition 也没有指定 key,则 round-robin 模式发送到每个 partition。
- 若既指定了 partition 又指定了 key,则根据 partition 参数发送到指定 partition,key 不起作用。
发送模式
发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。
发后即忘
不关心发送的消息是否到达,对返回结果不作任何处理。
本质上是一种异步发送,性能最高,但可靠性最差。
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka4 !");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
同步
在执行 send()
方法返回 Future 对象,并调用了 get()
方法来阻塞等待 Kafka 的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka !");
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
} catch (Exception e) {
e.printStackTrace();
}
异步回调
在 send()
方法里指定一个 Callback 回调函数:
网友评论