Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器,这里只说明ProducerInterceptor。
Producer拦截器
org.apache.kafka.clients.producer.ProducerInterceptor<K, V>源代码如下:
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
/**
* 这是一个可插拔的接口,允许你拦截(可能改变)producer生产的消息(records) ,在他们被published到kafak集群之前
* 这个类可以通过 configure() 方法获取producer的配置信息,包括通过KafkaProducer分配的即使在配置信息 里面没有指定的clientId
* 该接口的实现类需要被告知:和其他的 interceptors and serializers 共享producer的配置信息,为了保证他们之间没有冲突
* ProducerInterceptor 方法抛出的异常信息将会被捕获,日志记录,但是不会传播。因此,如果用户配置了interceptor使用了错误的key 和 value 参数,producer不会抛出异常,仅仅记录错误的日志
* ProducerInterceptor回调函数可能被多线程调用,因此接口的实现要保证线程安全
*
*/
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
* get serialized and partition is assigned (if partition is not specified in ProducerRecord).
* <p>
* This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
* key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
* not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
* same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
* as expected.
* <p>
* Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
* Most often, it should be the same topic/partition from 'record'.
* <p>
* Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
* <p>
* Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
* specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
* in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
* previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
* the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
* of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
* modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
* is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
* or otherwise the client.
*
* @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
* @return producer record to send to topic/partition
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
* it gets sent to the server.
* <p>
* This method is generally called just before the user callback is called, and in additional cases when <code>KafkaProducer.send()</code>
* throws an exception.
* <p>
* Any exception thrown by this method will be ignored by the caller.
* <p>
* This method will generally execute in the background I/O thread, so the implementation should be reasonably fast.
* Otherwise, sending of messages from other threads could be delayed.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
* If an error occurred, metadata will contain only valid topic and maybe
* partition. If partition is not given in ProducerRecord and an error occurs
* before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
//拦截器关闭的时候调用
public void close();
}
自定义拦截器开发
package com.ghq.kafka.server;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "prefix01-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
record.key(), modifiedValue, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
@Override
public void close() {
double ratio = 1.0 * sendSuccess / (sendFailure + sendSuccess);
System.out.println(ratio);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
自定义拦截器使用
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
consumer控制台输出结果:
--------------->:prefix01-Hello,World
--------------->:prefix01-Hello,World
网友评论