美文网首页
KafkaProducer 流程解析

KafkaProducer 流程解析

作者: 桥头桥尾 | 来源:发表于2017-09-22 10:43 被阅读0次

    我们都知道Kafka对消息的处理速度非常的快,单机的TPS达到了百万条的数量级。主要是由于Producer端将对个小消息进行合并,进行一个batch message的操作。对于KafkaProducer 的流程设计通过源码的角度进行详细的解析。

    这里使用的代码版本为: 0.10.1

    • 构造方法: public KafkaProducer(Map<String, Object> configs) {...}
    • KafkaProducer 中成员变量
        // 如果用户没有配置"client.id", 则用 "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()作为cientId
        private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    
        // 作为jmx中BeanName 的前缀
        private static final String JMX_PREFIX = "kafka.producer";
    
        private String clientId;
    
        //通过此类的`partition`方法将那一条消息负载到指定的topic的partition中
        //用户可以自定义扩展此类, 可通过 "partitioner.class"进行配置
        private final Partitioner partitioner;
    
        //对message的size做限制, 可通过"max.request.size"进行配置
        private final int maxRequestSize;
    
        //作为Produce端申请message内存的大小, 如果下一条消息申请内存时,内存大小不够,则等待。可通过"buffer.memory"进行配置
        //具体使用RecordAccumulator中的BufferPool中使用
        private final long totalMemorySize;
    
        //作为获取集群信息的元数据类
        private final Metadata metadata;
    
        //message消息的累积类, 每次发送消息,都将消息append在RecordAccumulator中
        private final RecordAccumulator accumulator;
    
        //消息发送线程,从accumulator中获取可以发送的消息, 进行消息的发送
        //放入ioThread线程中, 实例化的时候就会启动
        private final Sender sender;
    
        //metrics 数据监控类
        private final Metrics metrics;
    
        //作为sender启动类
        private final Thread ioThread;
    
        //数据传输的压缩格式
        private final CompressionType compressionType;
    
        //消息发送失败的统计类
        private final Sensor errors;
    
        private final Time time;
    
        //通过此类将消息的key序列化为传输的byte[]
        //用户可自己实现序列化方法, 可通过"key.serializer"进行配置
        private final Serializer<K> keySerializer;
    
        //通过此类将消息的value序列化为传输的byte[]
        //同样可以自己实现, 可通过"value.serializer"进行配置
        private final Serializer<V> valueSerializer;
    
        //作为KafkaProducer实现的的输入参数, 用户配置信息类
        private final ProducerConfig producerConfig;
    
        //发送消息时,最大的阻塞时间,the buffer is full or metadata unavailable,可通过"max.block.ms"配置(0.10.1版本)
        private final long maxBlockTimeMs;
    
        //发送消息时,发送请求的最大超时时间, 可通过"request.timeout.ms"配置(0.10.1版本)
        private final int requestTimeoutMs;
    
        //发送数据的拦截器列表, 对发送ProducerRecord时, 进行一些拦截处理
        private final ProducerInterceptors<K, V> interceptors; 
    
    
    • 消息的发送方法 KafkaProducer.send(ProducerRecord<K, V> record, Callback callback)
        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            // intercept the record, which can be potentially modified; this method does not throw exceptions
            ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
            return doSend(interceptedRecord, callback);
        }
    
        private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            TopicPartition tp = null;
            try {
                //确保metadata对当前record.topic可用,并返回cluster + waitedOnMetadataMs(此方法的阻塞时间)
                //可用的条件:metadata中cluster当前topic的partitionsCount != null
                //1> 用户没有指定partion
                //2> 用户指定了partion,必须 partition < partitionsCount (因为partition是从0开始)
                ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
                //计算下面操作最大可阻塞的时间
                long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                Cluster cluster = clusterAndWaitTime.cluster;
                //将ProducerRecord 中的key跟value根据对应的序列化类序列化为对应的byte[]
                byte[] serializedKey;
                try {
                    serializedKey = keySerializer.serialize(record.topic(), record.key());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in key.serializer");
                }
                byte[] serializedValue;
                try {
                    serializedValue = valueSerializer.serialize(record.topic(), record.value());
                } catch (ClassCastException cce) {
                    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                            " specified in value.serializer");
                }
                //如果用户没有指定partition即record.partition != null
                //则根据配置的Partitioner进行消息的负载分配
                int partition = partition(record, serializedKey, serializedValue, cluster);
                //消息序列化的size, 加上消息头的大小 SIZE_LENGTH(INT类型的大小 4) + OFFSET_LENGTH(LONG类型的大小 8)
                int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
                //验证单次消息的大小, 必须小于maxRequestSize, 必须小于totalMemorySize
                ensureValidRecordSize(serializedSize);
                //生成消息发送对应的TopicPartition
                tp = new TopicPartition(record.topic(), partition);
                long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
                log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
                // producer callback will make sure to call both 'callback' and interceptor callback
                Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
                //将消息append到accumulator 中
                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
                //如果消息满足发送的条件, 则唤醒发送线程, 进行消息的发送
                //满足消息发送的条件:
                //1> RecordAccumulator中batches对应的
                //   TopicPartition的消息队列Deque<RecordBatch>的size() > 1; 
                //   或者当前RecordBatch.isFull()已经满了
                // 2> 当前RecordBatch 是新建的, 新建的表示一定有数据                     
                if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
                return result.future;
            } catch (...) {
                // handling exceptions and record the errors;
                // for API exceptions return them in the future,
                // for other exceptions throw directly
            }
        }
    
    • 消息的batch方法。 accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs)
        /**
         * Add a record to the accumulator, return the append result
         * <p>
         * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
         * <p>
         *
         * @param tp The topic/partition to which this record is being sent
         * @param timestamp The timestamp of the record
         * @param key The key for the record
         * @param value The value for the record
         * @param callback The user-supplied callback to execute when the request is complete
         * @param maxTimeToBlock 最大的申请内存的阻塞时间
         */
        public RecordAppendResult append(TopicPartition tp,
                                         long timestamp,
                                         byte[] key,
                                         byte[] value,
                                         Callback callback,
                                         long maxTimeToBlock) throws InterruptedException {
            //统计, 在append 中的消息的数据信息
            appendsInProgress.incrementAndGet();
            try {
                // 如果batches中存在对应的TopicPartition的消息队列, 直接返回, 否则创建一个
                Deque<RecordBatch> dq = getOrCreateDeque(tp);
                synchronized (dq) {
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
                    //将消息放入dq中: 获取dq中最后一个RecordBatch, 如果不存在, 直接返回NULL
                    //               如果存在, 将消息append到RecordBatch中, 如果RecordBatch没有空间存放,直接返回NULL
                    //                                如果有空间, append进去, 生成一个FutureRecordMetadata, 
                    //==>并通过callback+FutureRecordMetadata实例化一个Thunk, 添加到thunks中, 供消息响应之后回调
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    if (appendResult != null)
                        return appendResult;
                }
    
                //上面流程append不成功, 则重新申请内存,创建Records, 进行append
                //申请内存的大小为, batchSize与当前消息需要size的最大值
                //free申请内存时: 1: 如果size == poolableSize(即为batchSize), 从Deque<ByteBuffer> free 队列中获取, 
                //          如果队列为空,则重新分配一个batchSize的ByteBuffer, 需要跑判断availableMemory是否大于需要分配size
                //          如果满足,则直接分配, 否则需要等待内存的内存的释放
                //                2: 如果内存不为空,跟1中队列为空之后的分配策略相同
                int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
                log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
                ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
                synchronized (dq) {
                    // Need to check if producer is closed again after grabbing the dequeue lock.
                    if (closed)
                        throw new IllegalStateException("Cannot send after the producer is closed.");
    
                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                    if (appendResult != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);
                        return appendResult;
                    }
                    MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                    RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                    FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
    
                    dq.addLast(batch);
                    incomplete.add(batch);
                    return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
                }
            } finally {
                appendsInProgress.decrementAndGet();
            }
        }
    
    
    • ioThread 中 Sender 的工作流程
        public void run() {
    
            // 主调用流程,循环执行
            while (running) {
                try {
                    run(time.milliseconds());
                } catch (Exception e) {
                    log.error("Uncaught error in kafka producer I/O thread: ", e);
                }
            }
    
            log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
    
            // 非强制关闭时, 如果accumulator 跟 client 还有未发送完的消息, 等待发送
            while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
                try {
                    run(time.milliseconds());
                } catch (Exception e) {
                    log.error("Uncaught error in kafka producer I/O thread: ", e);
                }
            }
    
            // 强制关闭, accumulator中数据直接abort
            if (forceClose) {
                // We need to fail all the incomplete batches and wake up the threads waiting on
                // the futures.
                this.accumulator.abortIncompleteBatches();
            }
            try {
                this.client.close();
            } catch (Exception e) {
                log.error("Failed to close network client", e);
            }
    
            log.debug("Shutdown of Kafka producer I/O thread has completed.");
        }
    
        /**
         * 方法几点说明:
         * 1: guaranteeMessageOrder字段来判断是否需要担保,数据发送的有序性
         *     kafka这里为了保证消息发送的顺序, 发送一条Record消息, 进行muted操作,响应之后umuted, 就可以继续发送
         * 2: 消息重新发送, RecordBatch中字段attempts + lastAttemptMs, attempts>0 表示重新发送的Record,
         *     必须满足 batch.lastAttemptMs + retryBackoffMs > nowMs 才能继续发送
         * 3: this.client.ready(node, now) 
         *     必须为连接状态, 即ConnectionState.CONNECTED
         *     对于需要权限验证的请求,必须已验证
         *     InFlightRequests.canSendMore(node): 当前节点请求队列为空
         *         或者队列中第一个请求已完成且queue.size() < this.maxInFlightRequestsPerConnection   
         */
        void run(long now) {
            Cluster cluster = metadata.fetch();
            //获取当前accumulator中batches中的数据, readyNodes + nextReadyCheckDelayMs + unknownLeaderTopics
            //readyNodes: 同时满足下面两个条件 
            //  1.可以发送数据, 下面任何一个条件满足即可
            //      a.数据有满的数据: deque.size() > 1 (一定有一个数据是满的) 或者第一个; 或者deque中第一个数据是满的
            //      b.数据存放的时间已失效
            //      c.BufferPool中有等待释放内存的队列有数据
            //      d.accumulator 中有刷新操作, 此操作是用户进行KafkaProducer.flush()操作                
            //  2.若是重试数据,已超过重试的阻塞时间,可以重新发送
            //nextReadyCheckDelayMs: 对于readyNodes中不满足可以发送数据数据时, 需要等待可以发送数据的时间,即下一个检测准备数据的延迟的时间
            //unknownLeaderTopics: batches中的TopicPartition在cluster不能找到Leader且!deque.isEmpty()(有数据需要发送)
            RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    
            // if there are any partitions whose leaders are not known yet, force metadata update
            // 如果返回的数据有不知道Leader的Topic, 则放入metadata 中, 请求更新metadata
            if (!result.unknownLeaderTopics.isEmpty()) {
                for (String topic : result.unknownLeaderTopics)
                    this.metadata.add(topic);
                this.metadata.requestUpdate();
            }
    
            // 对于readyNodes中Node中不能发送数据的直接 移除
            // notReadyTimeout 用于this.client.poll(pollTimeout, now);即此方法的最大阻塞时间
            Iterator<Node> iter = result.readyNodes.iterator();
            long notReadyTimeout = Long.MAX_VALUE;
            while (iter.hasNext()) {
                Node node = iter.next();
                if (!this.client.ready(node, now)) {
                    iter.remove();
                    notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
                }
            }
    
            // create produce requests
            Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                             result.readyNodes,
                                                                             this.maxRequestSize,
                                                                             now);
            if (guaranteeMessageOrder) {
                // Mute all the partitions drained
                for (List<RecordBatch> batchList : batches.values()) {
                    for (RecordBatch batch : batchList)
                        this.accumulator.mutePartition(batch.topicPartition);
                }
            }
    
            //移除超时的 RecordBatch
            List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
            // update sensors
            for (RecordBatch expiredBatch : expiredBatches)
                this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    
            sensors.updateProduceRequestMetrics(batches);
            List<ClientRequest> requests = createProduceRequests(batches, now);
            // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
            // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
            // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
            // with sendable data that aren't ready to send since they would cause busy looping.
            long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
            if (result.readyNodes.size() > 0) {
                log.trace("Nodes with data ready to send: {}", result.readyNodes);
                log.trace("Created {} produce requests: {}", requests.size(), requests);
                pollTimeout = 0;
            }
            for (ClientRequest request : requests)
                client.send(request, now);
    
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            this.client.poll(pollTimeout, now);
        }
    
    

    相关文章

      网友评论

          本文标题:KafkaProducer 流程解析

          本文链接:https://www.haomeiwen.com/subject/kzgxextx.html