美文网首页
【kafka 源码】 kafka 生产者初始化过程

【kafka 源码】 kafka 生产者初始化过程

作者: logan_wu | 来源:发表于2019-07-11 17:13 被阅读0次

    Kafka 生产者初始化时,首先会初始化生产者相关配置 ProducerConfig 。然后,根据配置,初始化分区器,序列化器,拦截器,消息累加器并启动 Sender 线程。

    配置

    谈到 Kafka 生产者初始化之前,先要看一下生产者配置 ProducerConfig 。该类定义了生产者的配置,必填参数需要在定义生产者时配置,非必填参数已定义了默认值,如果需要也可在定义生产者时配置修改。

    public class ProducerConfig extends AbstractConfig {
        
        /**
         * 定义生产者相关配置
         */
        private static final ConfigDef CONFIG;
        
        static {
            // 使用 builder 建造者模式,定义生产者配置
            CONFIG = new ConfigDef()
                .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(),
                        // 约束,不能为空
                        new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                .define(...);
        }
    }
    

    ConfigDef 定义配置时,会将参数封装为 ConfigKey 对象,并将 name 作为 key 该对象作为 value 存储。同时提供转换方法。

    public class ConfigDef {
        
        private final Map<String, ConfigKey> configKeys;
        
        public ConfigDef define(String name, ...) {
            // 参数封装为 ConfigKey 对象
            return define(new ConfigKey(name, ...));
        }
        
        public ConfigDef define(ConfigKey key) {
            configKeys.put(key.name, key);
        }
        
        public Map<String, Object> parse(Map<?, ?> props) {
            // 转换所有已知 key
            Map<String, Object> values = new HashMap<>();
            for (ConfigKey key : configKeys.values()) {
                values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name)));
            }
            return values;
        }
    }
    

    编写生产者时会定义一些必须的配置 bootstrap.servers , key.serializervalue.serializer 以及一些其他的非必填参数如 acks 。在 ProducerConfig 初始化时,会与上面的 configkeys 相匹配并放入新 map 中。

    public class AbstractConfig {
        
        /**
         * 转换后值集
         */
        private final Map<String, Object> values;
        
        public AbstractConfig(ConfigDef definition, Map<?, ?> originals, ...) {
            
            // 匹配 key ,覆盖 value。如果没有匹配到 key , 用默认 value
            this.value = definition.parse(this.originals);
        }
    }
    

    实例化

    谈完了配置,再来谈一下 Kafka 如何实例化类。在 AbstractConfig 类中提供实例化类的方法,通过生产者配置的值,实例化为对应的对象和集合。

    public class AbstractConfig {
        
        /**
         * 转换后值集
         */
        private final Map<String, Object> values;
        
        /**
         * 自定义配置
         */
        private final Map<String, ?> originals;
        
        public <T> T getConfiguredInstance(String key, Class<T> t) {
            // 通过 key 从生产者配置 values 中获取 value
            Class<?> c = getClass(key);
            Object o = Utils.newInstance(c);
            
            // 继承 Configurable 的类的对象,传入自定义配置 originals
            if (o instanceof Configurable) {
                ((Configurable) o).configure(originals());
            }
            return t.cast(o);
        }
        
        public <T> List<T> getConfiguredInstances(String key, Class<t> t) {
            List<T> objects = new ArrayList<>();
            
            // 通过 key 从生产者配置 values 中获取 value
            List<String> classNames = getList(key);
            for (Object klass : className) {
                Object o = Utils.newInstance(klass);
                
                // 继承 Configurable 的类的对象,传入自定义配置 originals
                if (o instanceof Configurable) {
                    ((Configurable) o).configure(originals());
                }
                objects.add(t.cast(o));
            }
            
            return objects;
        }
    }
    

    初始化

    明白了生产者的相关配置以及类的实例化过程,现在就可以看明白 kafka 生产者初始化过程了。

    public class KafkaProducer<K, V> implements Producer<K, V> {
        
        KafkaProducer(Map<String, Object> configs, ...) {
            
            // 实例化生产者配置
            ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, ...));
            
            // 实例化分区器
            this.partitioner = config.getConfiguredInstance("partitioner.class", Partitoner.class);
            
            // 实例化序列化器
            this.keySerializer = config.getConfiguredInstance("key.serializer", Serializer.class);
            this.valueSerializer = config.getConfiguredInstance("value.serializer", Serializer.class);
            
            // 实例化拦截器
            List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
            this.interceptors = new ProducerInterceptors<>(interceptorList);
            
            // 实例化消息累加器
            this.accumulator = new RecordAccumulator(...);
            
            // 启动 Sender 线程
            this.sender = newSender(...);
            this.ioThread = new KafkaThread(..., this.sender, ...);
            this.ioThread.start();
        }
        
    }
    

    至此, kafka 初始化过程完成。

    注: 篇幅所限,代码片段只列举了必要部分,且稍有修改。具体代码请参考 kafka 源码。

    相关文章

      网友评论

          本文标题:【kafka 源码】 kafka 生产者初始化过程

          本文链接:https://www.haomeiwen.com/subject/bmgdkctx.html