美文网首页
6. kafka拦截器

6. kafka拦截器

作者: 阿飞的博客 | 来源:发表于2018-06-30 21:08 被阅读154次
    • 拦截器定义
      拦截器参数命名为:interceptor.classes。官方文档解析如下:

    A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.

    即拦截发送到kafka服务器之前的消息,且在序列化&反序列化之前调用,序列化&反序列化又在分区策略之前调用,这个调用顺序在[kafka生产者&消费者]已经分析过了。拦截器允许修改key和value,同时拦截器可以指定多个,多个拦截器形成拦截器链,且有先后顺序,假定按照如下方式配置2个拦截器,那么会先调用AProducerInterceptor,再调用BProducerInterceptor,且调用BProducerInterceptor时的ProducerRecord是经过AProducerInterceptor修改过的ProducerRecord(如果在AProducerInterceptor中修改过ProducerRecord的话):

    props.put("interceptor.classes", "com.afei.kafka.interceptor.AProducerInterceptor,com.afei.kafka.interceptor.BProducerInterceptor");
    
    • ProducerInterceptor接口定义源码解读
    public interface ProducerInterceptor<K, V> extends Configurable {
        /**
         * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
         * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
         * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
         * 即这个方法调用在KafkaProducer.send()之后,但是在key&value序列化之前,以及分配分区之前。
         * <p>
         * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
         * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
         * 这个方法中允许修改ProducerRecord即发送的消息,接下来的分区分配,根据修改后的key(如果会在onSend()中修改key的话)进行计算.
         */
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    
        /**
         * 不管消息发送成功还是发送过程抛出异常,这个方法都会执行
         */
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    
        /**
         * This is called when interceptor is closed
         */
        public void close();
    }
    
    • 自定义拦截器实现
    /**
     * @author afei
     * @version 1.0.0
     * @since 2018年06月27日
     */
    public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
    
        /**
         * 统计发送成功数
         */
        private static AtomicLong sendSuccess = new AtomicLong(0);
        /**
         * 统计发送失败数
         */
        private static AtomicLong sendFailure = new AtomicLong(0);
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    
            this.outputSendStat();
    
            // 改写ProducerRecord, 将key置为null, 分区全部交给kafka去决定
            return new ProducerRecord<>(record.topic(),
                    record.partition(), record.timestamp(), null, record.value(),
                    record.headers());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            // 如果没有异常表示发送成功, 那么发送成功数+1, 否则发送失败数+1
            if (exception!=null){
                sendFailure.getAndIncrement();
            }else{
                sendSuccess.getAndIncrement();
            }
        }
    
        /**
         * 打印出发送的成功&失败次数的统计信息
         */
        private void outputSendStat(){
            long successCount = sendSuccess.get();
            long  failedCount = sendFailure.get();
            System.out.println("success count: "+successCount+", failed count:"+failedCount);
        }
    
        @Override
        public void close() {
            this.outputSendStat();
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    

    相关文章

      网友评论

          本文标题:6. kafka拦截器

          本文链接:https://www.haomeiwen.com/subject/nndxuftx.html