美文网首页Kafka
kafka_06_生产者拦截器

kafka_06_生产者拦截器

作者: 平头哥2 | 来源:发表于2019-03-25 22:15 被阅读0次

    Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器,这里只说明ProducerInterceptor。

    Producer拦截器

    org.apache.kafka.clients.producer.ProducerInterceptor<K, V>源代码如下:

    package org.apache.kafka.clients.producer;
    
    import org.apache.kafka.common.Configurable;
    
    /**
     * 这是一个可插拔的接口,允许你拦截(可能改变)producer生产的消息(records) ,在他们被published到kafak集群之前 
     * 这个类可以通过 configure() 方法获取producer的配置信息,包括通过KafkaProducer分配的即使在配置信息 里面没有指定的clientId
     * 该接口的实现类需要被告知:和其他的 interceptors and serializers 共享producer的配置信息,为了保证他们之间没有冲突 
     * ProducerInterceptor 方法抛出的异常信息将会被捕获,日志记录,但是不会传播。因此,如果用户配置了interceptor使用了错误的key 和 value 参数,producer不会抛出异常,仅仅记录错误的日志
     * ProducerInterceptor回调函数可能被多线程调用,因此接口的实现要保证线程安全
     * 
     */
    public interface ProducerInterceptor<K, V> extends Configurable {
        /**
         * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
         * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
         * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
         * <p>
         * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
         * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
         * not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
         * same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
         * as expected.
         * <p>
         * Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
         * Most often, it should be the same topic/partition from 'record'.
         * <p>
         * Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
         * <p>
         * Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
         * specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
         * in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
         * previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
         * the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
         * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
         * modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
         * is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
         * or otherwise the client.
         *
         * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
         * @return producer record to send to topic/partition
         */
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    
        /**
         * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
         * it gets sent to the server.
         * <p>
         * This method is generally called just before the user callback is called, and in additional cases when <code>KafkaProducer.send()</code>
         * throws an exception.
         * <p>
         * Any exception thrown by this method will be ignored by the caller.
         * <p>
         * This method will generally execute in the background I/O thread, so the implementation should be reasonably fast.
         * Otherwise, sending of messages from other threads could be delayed.
         *
         * @param metadata The metadata for the record that was sent (i.e. the partition and offset).
         *                 If an error occurred, metadata will contain only valid topic and maybe
         *                 partition. If partition is not given in ProducerRecord and an error occurs
         *                 before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
         *                 The metadata may be null if the client passed null record to
         *                 {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        public void onAcknowledgement(RecordMetadata metadata, Exception exception);
        //拦截器关闭的时候调用
        public void close();
    }
    
    

    自定义拦截器开发

    package com.ghq.kafka.server;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
    
        private volatile long sendSuccess = 0;
        private volatile long sendFailure = 0;
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String modifiedValue = "prefix01-" + record.value();
            return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
                    record.key(), modifiedValue, record.headers());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                sendSuccess++;
            } else {
                sendFailure++;
            }
        }
    
        @Override
        public void close() {
            double ratio = 1.0 * sendSuccess / (sendFailure + sendSuccess);
            System.out.println(ratio);
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    

    自定义拦截器使用

    prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
    

    consumer控制台输出结果:

    --------------->:prefix01-Hello,World
    --------------->:prefix01-Hello,World
    

    相关文章

      网友评论

        本文标题:kafka_06_生产者拦截器

        本文链接:https://www.haomeiwen.com/subject/qmuivqtx.html