美文网首页
6. kafka拦截器

6. kafka拦截器

作者: 阿飞的博客 | 来源:发表于2018-06-30 21:08 被阅读154次
  • 拦截器定义
    拦截器参数命名为:interceptor.classes。官方文档解析如下:

A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.

即拦截发送到kafka服务器之前的消息,且在序列化&反序列化之前调用,序列化&反序列化又在分区策略之前调用,这个调用顺序在[kafka生产者&消费者]已经分析过了。拦截器允许修改key和value,同时拦截器可以指定多个,多个拦截器形成拦截器链,且有先后顺序,假定按照如下方式配置2个拦截器,那么会先调用AProducerInterceptor,再调用BProducerInterceptor,且调用BProducerInterceptor时的ProducerRecord是经过AProducerInterceptor修改过的ProducerRecord(如果在AProducerInterceptor中修改过ProducerRecord的话):

props.put("interceptor.classes", "com.afei.kafka.interceptor.AProducerInterceptor,com.afei.kafka.interceptor.BProducerInterceptor");
  • ProducerInterceptor接口定义源码解读
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
     * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
     * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
     * 即这个方法调用在KafkaProducer.send()之后,但是在key&value序列化之前,以及分配分区之前。
     * <p>
     * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
     * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
     * 这个方法中允许修改ProducerRecord即发送的消息,接下来的分区分配,根据修改后的key(如果会在onSend()中修改key的话)进行计算.
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     * 不管消息发送成功还是发送过程抛出异常,这个方法都会执行
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);

    /**
     * This is called when interceptor is closed
     */
    public void close();
}
  • 自定义拦截器实现
/**
 * @author afei
 * @version 1.0.0
 * @since 2018年06月27日
 */
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    /**
     * 统计发送成功数
     */
    private static AtomicLong sendSuccess = new AtomicLong(0);
    /**
     * 统计发送失败数
     */
    private static AtomicLong sendFailure = new AtomicLong(0);

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

        this.outputSendStat();

        // 改写ProducerRecord, 将key置为null, 分区全部交给kafka去决定
        return new ProducerRecord<>(record.topic(),
                record.partition(), record.timestamp(), null, record.value(),
                record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 如果没有异常表示发送成功, 那么发送成功数+1, 否则发送失败数+1
        if (exception!=null){
            sendFailure.getAndIncrement();
        }else{
            sendSuccess.getAndIncrement();
        }
    }

    /**
     * 打印出发送的成功&失败次数的统计信息
     */
    private void outputSendStat(){
        long successCount = sendSuccess.get();
        long  failedCount = sendFailure.get();
        System.out.println("success count: "+successCount+", failed count:"+failedCount);
    }

    @Override
    public void close() {
        this.outputSendStat();
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

相关文章

  • 6. kafka拦截器

    拦截器定义拦截器参数命名为:interceptor.classes。官方文档解析如下: A list of cla...

  • 尚硅谷大数据技术之Kafka

    第5章 Kafka producer拦截器(interceptor) 5****.****1 拦截器原理 Prod...

  • flume 到 kafka 精准一起性消费

    一、Kafka精准一次性消费 1.1 编写拦截器 拦截器 1.2 在flume2配置文件中使用拦截器

  • 五、Kafka producer拦截器(interceptor)

    5.1 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于...

  • kafka_06_生产者拦截器

    Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Pro...

  • Kafka Producer 拦截器

    Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Pro...

  • Kafka生产者源码解析,学习总结

    目录 简单使用示例 kafka生产者总体架构 配置模块 拦截器模块 序列化模块 分区模块 RecordAccumu...

  • Kafka-拦截器

    一、拦截器定义 基本思想是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操...

  • kafka 基础知识

    kafka不仅仅是消息中间件,还是个流式计算框架。 要点 简单介绍 基本概念 分区 避免消息丢失 拦截器 reba...

  • kafka拦截器实现队列插队效果

    前言 突然出现一个任务需要对kafka处理的数据进行插队操作(内心小崩溃。。。),研究了一下,还是可以使用拦截器进...

网友评论

      本文标题:6. kafka拦截器

      本文链接:https://www.haomeiwen.com/subject/nndxuftx.html