Kafka生产者客户端入门
[TOC]
demo
/**
* 生产者
*
* @author Jenson
*/
public class ProducerFastStart {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "topic-demo";
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 生产者拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
// 配置生产者客户端参数,并创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// 构造所需要发送的消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "hello kafka! again!");
// 发送消息
producer.send(record, new Callback() {
/**
* onCompletion()方法的两个参数是互斥的,
* 消息发送成功时,metadata 不为 null 而exception为null;
* 消息发送异常时,metadata为null而exception不为null。
*
* @param recordMetadata
* @param e
*/
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("topic : " + recordMetadata.topic() +
" ,hasOffset : " + recordMetadata.hasOffset() +
" ,offset : " + recordMetadata.offset() +
" ,partition : " + recordMetadata.partition());
}
}
});
// 关闭生产者客户端
producer.close();
}
}
/**
* 客户化生产者拦截器
*
* @author Jenson
*/
public class MyProducerInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
ProducerRecord record = new ProducerRecord(producerRecord.topic(),
producerRecord.partition(),
producerRecord.timestamp(),
producerRecord.key(),
"prefix-" + producerRecord.value(),
producerRecord.headers());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("----> 发送成功");
}
}
@Override
public void close() {
System.out.println("----> 结束,回收资源");
}
@Override
public void configure(Map<String, ?> map) {
}
}
生产者对象
org.apache.kafka.clients.producer.KafkaProducer
三个必填参数:
bootstrap.servers:连接Kafka集群所需要的broker地址清单,可以为多个地址,逗号分开
key.serializer和value.serializer:broker端接收的消息必须以字节数组(byte[])的形式存在
其他参数:
client.id : KafkaProducer对应的客户端id,默认值为“”,如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,“producer-”与数字的拼接。
配置参数常量类:org.apache.kafka.clients.producer.ProducerConfig
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。
发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)
消息发送
-
生产者消息发送 ProducerRecord.send() ProducerRecord.send(value, callback)
-
发送消息时报错 对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。 properties.put(ProducerConfig.RETRIES_CONFIG,10); //设置重试次数
-
**消息发送回调 **
// 发送消息 producer.send(record, new Callback() { // ... 详见demo });
producer.send(record,callback1); producer.send(record,callback2);
callback1在callback2前调用,回调函数的调用也可以保证分区有序。
-
producer.close(); // 关闭生产者客户端 close()方法会阻塞等待之前所有的发送请求完成后再关闭,回收所占用的资源
序列化
可以自定义序列化,实现 org.apache.kafka.common.serialization.Serializer 接口
分区器
-
如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
-
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口
-
如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个
-
自定义分区可以实现Partitioner接口,通过配置参数partitioner.class来显式指定这个分区器
生产者拦截器
-
生产者拦截器既可以用来在消息发送前做一些准备工作
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作
-
也可以用来在发送回调逻辑前做一些定制化的需求,
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行
-
生产者拦截器,自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口
在KafkaProducer 的配置参数interceptor.classes中指定这个拦截器
-
可以配置多个自定义拦截器,用逗号分隔
网友评论