美文网首页
【kafka 源码】 kafka 生产者初始化过程

【kafka 源码】 kafka 生产者初始化过程

作者: logan_wu | 来源:发表于2019-07-11 17:13 被阅读0次

Kafka 生产者初始化时,首先会初始化生产者相关配置 ProducerConfig 。然后,根据配置,初始化分区器,序列化器,拦截器,消息累加器并启动 Sender 线程。

配置

谈到 Kafka 生产者初始化之前,先要看一下生产者配置 ProducerConfig 。该类定义了生产者的配置,必填参数需要在定义生产者时配置,非必填参数已定义了默认值,如果需要也可在定义生产者时配置修改。

public class ProducerConfig extends AbstractConfig {
    
    /**
     * 定义生产者相关配置
     */
    private static final ConfigDef CONFIG;
    
    static {
        // 使用 builder 建造者模式,定义生产者配置
        CONFIG = new ConfigDef()
            .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(),
                    // 约束,不能为空
                    new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
            .define(...);
    }
}

ConfigDef 定义配置时,会将参数封装为 ConfigKey 对象,并将 name 作为 key 该对象作为 value 存储。同时提供转换方法。

public class ConfigDef {
    
    private final Map<String, ConfigKey> configKeys;
    
    public ConfigDef define(String name, ...) {
        // 参数封装为 ConfigKey 对象
        return define(new ConfigKey(name, ...));
    }
    
    public ConfigDef define(ConfigKey key) {
        configKeys.put(key.name, key);
    }
    
    public Map<String, Object> parse(Map<?, ?> props) {
        // 转换所有已知 key
        Map<String, Object> values = new HashMap<>();
        for (ConfigKey key : configKeys.values()) {
            values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name)));
        }
        return values;
    }
}

编写生产者时会定义一些必须的配置 bootstrap.servers , key.serializervalue.serializer 以及一些其他的非必填参数如 acks 。在 ProducerConfig 初始化时,会与上面的 configkeys 相匹配并放入新 map 中。

public class AbstractConfig {
    
    /**
     * 转换后值集
     */
    private final Map<String, Object> values;
    
    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, ...) {
        
        // 匹配 key ,覆盖 value。如果没有匹配到 key , 用默认 value
        this.value = definition.parse(this.originals);
    }
}

实例化

谈完了配置,再来谈一下 Kafka 如何实例化类。在 AbstractConfig 类中提供实例化类的方法,通过生产者配置的值,实例化为对应的对象和集合。

public class AbstractConfig {
    
    /**
     * 转换后值集
     */
    private final Map<String, Object> values;
    
    /**
     * 自定义配置
     */
    private final Map<String, ?> originals;
    
    public <T> T getConfiguredInstance(String key, Class<T> t) {
        // 通过 key 从生产者配置 values 中获取 value
        Class<?> c = getClass(key);
        Object o = Utils.newInstance(c);
        
        // 继承 Configurable 的类的对象,传入自定义配置 originals
        if (o instanceof Configurable) {
            ((Configurable) o).configure(originals());
        }
        return t.cast(o);
    }
    
    public <T> List<T> getConfiguredInstances(String key, Class<t> t) {
        List<T> objects = new ArrayList<>();
        
        // 通过 key 从生产者配置 values 中获取 value
        List<String> classNames = getList(key);
        for (Object klass : className) {
            Object o = Utils.newInstance(klass);
            
            // 继承 Configurable 的类的对象,传入自定义配置 originals
            if (o instanceof Configurable) {
                ((Configurable) o).configure(originals());
            }
            objects.add(t.cast(o));
        }
        
        return objects;
    }
}

初始化

明白了生产者的相关配置以及类的实例化过程,现在就可以看明白 kafka 生产者初始化过程了。

public class KafkaProducer<K, V> implements Producer<K, V> {
    
    KafkaProducer(Map<String, Object> configs, ...) {
        
        // 实例化生产者配置
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, ...));
        
        // 实例化分区器
        this.partitioner = config.getConfiguredInstance("partitioner.class", Partitoner.class);
        
        // 实例化序列化器
        this.keySerializer = config.getConfiguredInstance("key.serializer", Serializer.class);
        this.valueSerializer = config.getConfiguredInstance("value.serializer", Serializer.class);
        
        // 实例化拦截器
        List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
        this.interceptors = new ProducerInterceptors<>(interceptorList);
        
        // 实例化消息累加器
        this.accumulator = new RecordAccumulator(...);
        
        // 启动 Sender 线程
        this.sender = newSender(...);
        this.ioThread = new KafkaThread(..., this.sender, ...);
        this.ioThread.start();
    }
    
}

至此, kafka 初始化过程完成。

注: 篇幅所限,代码片段只列举了必要部分,且稍有修改。具体代码请参考 kafka 源码。

相关文章

  • 【kafka 源码】 kafka 生产者初始化过程

    Kafka 生产者初始化时,首先会初始化生产者相关配置 ProducerConfig 。然后,根据配置,初始化分区...

  • Kafka 生产者概述

    生产者:往消息队列里推送消息的应用 发送消息的过程 Kafka 生产者发送消息的过程: Kafka 会将发送消息包...

  • Kafka源码分析-Content Table

    Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3 Kafka源码分析...

  • 2022-01-19

    ```/** * kafka produce 单例模式只初始化一个生产者 */publicclassKafkaPr...

  • KafkaProducer

    Kafka源码阅读(一):Kafka Producer整体架构概述及源码分析 zqhxuyuan Kafka源码分...

  • 【kafka 源码】kafka 生产者

    前文讲述了 kafka 实例化过程。实例化完成后,就可以发送消息体。消息体经过拦截器,序列化器,分区器到达消息累加...

  • 【Kafka】Kafka 常用命令

    本篇结构: kafka topic 管理脚本 kafka 生产者控制台 kafka 消费者控制台 kafka 消费...

  • kafka源码导入idea

    最近开始学习kafka源码,在将kafka源码导入idea的过程中遇到各种问题,故做此记录。 下载源码从githu...

  • kafka0.8

    1、Kafka分为:生产者(producer),消费者(consumer) 2、生产者提交消息,给Kafka集群,...

  • 使用pykafka库测试kafka-180504-[github

    使用pykafka库测试kafka 生产者生产数据 在kafka目录下执行./kafka-console-cons...

网友评论

      本文标题:【kafka 源码】 kafka 生产者初始化过程

      本文链接:https://www.haomeiwen.com/subject/bmgdkctx.html