美文网首页Kafka
kafka_06_生产者拦截器

kafka_06_生产者拦截器

作者: 平头哥2 | 来源:发表于2019-03-25 22:15 被阅读0次

Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器,这里只说明ProducerInterceptor。

Producer拦截器

org.apache.kafka.clients.producer.ProducerInterceptor<K, V>源代码如下:

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

/**
 * 这是一个可插拔的接口,允许你拦截(可能改变)producer生产的消息(records) ,在他们被published到kafak集群之前 
 * 这个类可以通过 configure() 方法获取producer的配置信息,包括通过KafkaProducer分配的即使在配置信息 里面没有指定的clientId
 * 该接口的实现类需要被告知:和其他的 interceptors and serializers 共享producer的配置信息,为了保证他们之间没有冲突 
 * ProducerInterceptor 方法抛出的异常信息将会被捕获,日志记录,但是不会传播。因此,如果用户配置了interceptor使用了错误的key 和 value 参数,producer不会抛出异常,仅仅记录错误的日志
 * ProducerInterceptor回调函数可能被多线程调用,因此接口的实现要保证线程安全
 * 
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
     * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
     * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
     * <p>
     * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
     * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
     * not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
     * same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
     * as expected.
     * <p>
     * Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
     * Most often, it should be the same topic/partition from 'record'.
     * <p>
     * Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
     * <p>
     * Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
     * specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
     * in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
     * previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
     * the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
     * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
     * modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
     * is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
     * or otherwise the client.
     *
     * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
     * @return producer record to send to topic/partition
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
     * it gets sent to the server.
     * <p>
     * This method is generally called just before the user callback is called, and in additional cases when <code>KafkaProducer.send()</code>
     * throws an exception.
     * <p>
     * Any exception thrown by this method will be ignored by the caller.
     * <p>
     * This method will generally execute in the background I/O thread, so the implementation should be reasonably fast.
     * Otherwise, sending of messages from other threads could be delayed.
     *
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset).
     *                 If an error occurred, metadata will contain only valid topic and maybe
     *                 partition. If partition is not given in ProducerRecord and an error occurs
     *                 before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
     *                 The metadata may be null if the client passed null record to
     *                 {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    //拦截器关闭的时候调用
    public void close();
}

自定义拦截器开发

package com.ghq.kafka.server;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String modifiedValue = "prefix01-" + record.value();
        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
        double ratio = 1.0 * sendSuccess / (sendFailure + sendSuccess);
        System.out.println(ratio);

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

自定义拦截器使用

prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());

consumer控制台输出结果:

--------------->:prefix01-Hello,World
--------------->:prefix01-Hello,World

相关文章

  • kafka_06_生产者拦截器

    Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Pro...

  • Kafka分区策略

    1. 生产者分区选择配策略 生产者在将消息发送到某个Topic ,需要经过拦截器、序列化器和分区器(Partiti...

  • (9)拦截器、序列化器、分区器处理顺序

    生产者客户端:主线程和 Sender 线程 1、主线程:由 KafkaProducer 创建消息,通过拦截器、序列...

  • Kafka入门

    Kafka生产者消费者,拦截器,分区器,序列化以及反序列化器 直接上代码pom文件 发送消息的实体化数据 生产者,...

  • Kafka生产者源码解析,学习总结

    目录 简单使用示例 kafka生产者总体架构 配置模块 拦截器模块 序列化模块 分区模块 RecordAccumu...

  • Kafka中API简单调用

    一、pom引入依赖 二、创建拦截器 二、创建生产者 三、创建消费者 四、测试 正常生产、正常消费、正常拦截

  • kafka编程应用

    Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么? 序列化器:生产者需要用序列化器(Ser...

  • Spring 实现多个拦截器

    拦截器文件 API拦截器 ADMIN拦截器

  • OkHttp之拦截器(二)

    本篇文章主要介绍OkHttp的默认拦截器 重试拦截器 桥接拦截器 缓存拦截器 连接拦截器 访问服务器拦截器 通过拦...

  • Spring15-拦截器

    定义拦截器 定义拦截器需要实现HandlerInterceptor 配置拦截器 注意:spring mvc的拦截器...

网友评论

    本文标题:kafka_06_生产者拦截器

    本文链接:https://www.haomeiwen.com/subject/qmuivqtx.html