美文网首页
kafkaTemplate对kafkaProducer的封装

kafkaTemplate对kafkaProducer的封装

作者: 陆阳226 | 来源:发表于2020-05-26 23:30 被阅读0次

    kafkatemplate发送方法

    public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data)
    public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,@Nullable V data)
    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record)
    public ListenableFuture<SendResult<K, V>> send(Message<?> message)
    

    这些发送方法内部生成一个ProducerRecord对象,传递给dosend方法,该对象包含了发送给kafka的所有信息

    public class ProducerRecord<K, V> {
    
        private final String topic;
        private final Integer partition;
        private final Headers headers;
        private final K key;
        private final V value;
        private final Long timestamp;
    

    doSend方法:通过getTheProducer获得KafkaProducer对象,KafkaProducer对象发送数据ProducerRecord

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    
        // 获取KafkaPruducer对象
        final Producer<K, V> producer = getTheProducer(producerRecord.topic());
        this.logger.trace(() -> "Sending: " + producerRecord);
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        Object sample = null;
        if (this.micrometerEnabled && this.micrometerHolder == null) {
            this.micrometerHolder = obtainMicrometerHolder();
        }
        if (this.micrometerHolder != null) {
            sample = this.micrometerHolder.start();
        }
    
        // 执行kafkaProducer的send方法,发送数据
        Future<RecordMetadata> sendFuture =
                producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
        // May be an immediate failure
        if (sendFuture.isDone()) {
            try {
                sendFuture.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted", e);
            }
            catch (ExecutionException e) {
                throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
            }
        }
        if (this.autoFlush) {
            flush();
        }
        this.logger.trace(() -> "Sent: " + producerRecord);
        return future;
    }
    

    getTheProducer方法,前面的是开启事务的Producer创建,暂不了解;对于非事务的Producer创建主要在最后两个if判断中,都是获取当前KafkaTemplateproducerFactoryproducerFactory用来创建KafkaProducer。getProducerFactory(topic)方法通过传入的topic来确定producerFactory,默认是返回this.producerFactory,可以在子类中重写具体的策略。剩下的就转到了producerFactorycreateProducer方法。

    protected Producer<K, V> getTheProducer(@SuppressWarnings("unused") @Nullable String topic) {
        // 开启事务时创建事务Producer
        boolean transactionalProducer = this.transactional;
        if (transactionalProducer) {
            boolean inTransaction = inTransaction();
            Assert.state(this.allowNonTransactional || inTransaction,
                    "No transaction is in process; "
                        + "possible solutions: run the template operation within the scope of a "
                        + "template.executeInTransaction() operation, start a transaction with @Transactional "
                        + "before invoking the template method, "
                        + "run in a transaction started by a listener container when consuming a record");
            if (!inTransaction) {
                transactionalProducer = false;
            }
        }
        if (transactionalProducer) {
            Producer<K, V> producer = this.producers.get();
            if (producer != null) {
                return producer;
            }
            KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
                    .getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
            return holder.getProducer();
        }
        else if (this.allowNonTransactional) {
            return this.producerFactory.createNonTransactionalProducer();
        }
        // 创建非事务Producer
        else if (topic == null) {
            return this.producerFactory.createProducer();
        }
        else {
            return getProducerFactory(topic).createProducer();
        }
    }
    

    createProducer方法:ProducerFactory接口只有一个默认实现类DefaultKafkaProducerFactory,又调用了doCreateProducer方法

    public Producer<K, V> createProducer() {
        return createProducer(this.transactionIdPrefix);
    }
    public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
        String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
        return doCreateProducer(txIdPrefix);
    }
    

    doCreateProducer方法:

    1. 第一个if中是创建事务Producer
    2. 第二个if中为每个线程创建一个Producer,Kafka文档中建议多线程共享一个Producer,当然也可以每个线程创建一个
    3. synchronized同步块中是创建非事务的Producer,保证多线程也只会创建一个Producer实例,这个Producer实例通过CloseSafeProducer类代理
    private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
        if (txIdPrefix != null) {
            if (this.producerPerConsumerPartition) {
                return createTransactionalProducerForPartition(txIdPrefix);
            }
            else {
                return createTransactionalProducer(txIdPrefix);
            }
        }
        if (this.producerPerThread) {
            CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
            if (this.threadBoundProducerEpochs.get() == null) {
                this.threadBoundProducerEpochs.set(this.epoch.get());
            }
            if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
                closeThreadBoundProducer();
                tlProducer = null;
            }
            if (tlProducer == null) {
                tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                        this.physicalCloseTimeout, this.beanName);
                for (Listener<K, V> listener : this.listeners) {
                    listener.producerAdded(tlProducer.clientId, tlProducer);
                }
                this.threadBoundProducers.set(tlProducer);
                this.threadBoundProducerEpochs.set(this.epoch.get());
            }
            return tlProducer;
        }
        synchronized (this) {
            // 创建一个CloseSafeProducer对象
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                        this.physicalCloseTimeout, this.beanName);
                this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
            }
            return this.producer;
        }
    }
    

    createKafkaProducer方法:根据clientIdPrefix处理配置问题,clientId就是创建的Producer的名字标识

    protected Producer<K, V> createKafkaProducer() {
        Map<String, Object> newConfigs;
        if (this.clientIdPrefix == null) {
            newConfigs = new HashMap<>(this.configs);
        }
        else {
            newConfigs = new HashMap<>(this.configs);
            newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
                    this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        checkBootstrap(newConfigs);
        return createRawProducer(newConfigs);
    }
    protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
        return new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
    }
    

    CloseSafeProducer代理了KafkaProducer实例,在创建对象时主要传入了KafkaProducer实例、关闭KafkaProducer的行为:this::removeProducer,其send方法就是调用调用代理的KafkaProducer的send方法。Callback回调中处理的时事务Producer的异常问题。

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        LOGGER.trace(() -> toString() + " send(" + record + ")");
        return this.delegate.send(record, new Callback() {
    
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception instanceof OutOfOrderSequenceException) {
                    CloseSafeProducer.this.producerFailed = exception;
                    close(CloseSafeProducer.this.closeTimeout);
                }
                callback.onCompletion(metadata, exception);
            }
    
        });
    }
    

    CloseSafeProducer,对于非事务Producer来说只有send方法是有用的,其中的close等相关方法都是针对事务Producer。

    非事务Producer是通过closeDelegate来关闭的,该方法在destroy方法中调用。在spring应用结束运行时会调用destroy方法,然后Producer的所有资源都会被释放。

    相关文章

      网友评论

          本文标题:kafkaTemplate对kafkaProducer的封装

          本文链接:https://www.haomeiwen.com/subject/vczbahtx.html