- 拦截器定义
A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.
props.put("interceptor.classes", "com.afei.kafka.interceptor.AProducerInterceptor,com.afei.kafka.interceptor.BProducerInterceptor");
- ProducerInterceptor接口定义源码解读
public interface ProducerInterceptor<K, V> extends Configurable {
* This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
* get serialized and partition is assigned (if partition is not specified in ProducerRecord).
* 即这个方法调用在KafkaProducer.send()之后,但是在key&value序列化之前,以及分配分区之前。
* <p>
* This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
* key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
* 这个方法中允许修改ProducerRecord即发送的消息,接下来的分区分配,根据修改后的key(如果会在onSend()中修改key的话)进行计算.
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
* 不管消息发送成功还是发送过程抛出异常,这个方法都会执行
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
* This is called when interceptor is closed
public void close();
- 自定义拦截器实现
* @author afei
* @version 1.0.0
* @since 2018年06月27日
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
* 统计发送成功数
private static AtomicLong sendSuccess = new AtomicLong(0);
* 统计发送失败数
private static AtomicLong sendFailure = new AtomicLong(0);
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 改写ProducerRecord, 将key置为null, 分区全部交给kafka去决定
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(), null, record.value(),
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 如果没有异常表示发送成功, 那么发送成功数+1, 否则发送失败数+1
if (exception!=null){
* 打印出发送的成功&失败次数的统计信息
private void outputSendStat(){
long successCount = sendSuccess.get();
long failedCount = sendFailure.get();
System.out.println("success count: "+successCount+", failed count:"+failedCount);
public void close() {
public void configure(Map<String, ?> configs) {