美文网首页kafka
Kafka Producer(二)

Kafka Producer(二)

作者: 扎瓦叔叔 | 来源:发表于2019-03-04 17:03 被阅读0次

原创辛苦,转载请注明出处

接着上文,继续看kafka producer

RecordsAccumulator.ready()

上一节中在doSend()方法的最后如果满足条件,会去唤醒Sender线程,在Sender线程的run方法中,会去调用 RecordsAccumulator.ready()来获取集群中符合发送消息条件的节点集合

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    //记录向哪些Node发送过消息
    Set<Node> readyNodes = new HashSet<Node>();
    //记录下次需要调用ready()方法的时间间隔
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    boolean unknownLeadersExist = false;
    //是否有线程在阻塞等待BufferPool释放空间
    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();
        //找到当前分区的Leader副本所在的Node
        Node leader = cluster.leaderFor(part);
        if (leader == null) {
            //没有Leader副本则标记为true,等待metadata的更新
            unknownLeadersExist = true;
        } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
            synchronized (deque) {
                RecordBatch batch = deque.peekFirst();
                if (batch != null) {
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    //dq中有多个RecordBatch或者不止一个RecordBatch
                    boolean full = deque.size() > 1 || batch.records.isFull();
                    //是否超时了
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    //closed:Sender线程准备关闭
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();//是否有线程正在等待flush操作完成
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);
                    } else {
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        //记录下次需要调用ready()方法的检查时间间隔
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }

    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}

在调用ready方法得到ReadyCheckResult,里面包含了readyNodes和metaData需要更新的标志以及下次调用ready的时机

RecordsAccumulator.drain()

drain方法会根据之前readyNodes取出要发送的消息,组合成Map<Integer, List<RecordBatch>>的格式,因为Sender只关心向哪个节点发送数据

Sender

KafkaProducer.send()主要是将消息放入RecordAccumulator缓存中,而网络I/O操作由Sender线程完成

Sender实现了Runnable接口,那就来看看他的run方法

void run(long now) {
    //从metaData获取集群元数据
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (result.unknownLeadersExist)
        this.metadata.requestUpdate();

    // remove any nodes we aren't ready to send to
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // create produce requests
    Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                     result.readyNodes,
                                                                     this.maxRequestSize,
                                                                     now);
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<RecordBatch> batchList : batches.values()) {
            for (RecordBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
    // update sensors
    for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

    sensors.updateProduceRequestMetrics(batches);
    List<ClientRequest> requests = createProduceRequests(batches, now);
    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (result.readyNodes.size() > 0) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        log.trace("Created {} produce requests: {}", requests.size(), requests);
        pollTimeout = 0;
    }
    for (ClientRequest request : requests)
        client.send(request, now);

    // if some partitions are already ready to be sent, the select time would be 0;
    // otherwise if some partition already has some data accumulated but not ready yet,
    // the select time will be the time difference between now and its linger expiry time;
    // otherwise the select time will be the time difference between now and the metadata expiry time;
    this.client.poll(pollTimeout, now);
}

1、这里要注意的是,会对accumulator.ready()产生的readyNodes再次进行一次过滤,NetworkClient.ready(),确认节点可连接,可发送消息。
2、确认完后再调用accumulator的drain方法,所有确认待发送的part都mute掉,不再接受新的发送请求。
3、对accumulator中超时的消息,用abortExpiredBatches处理(遍历所有batch,调用maybeExpire确认后调用batch.done(),触发thunk.callback并释放资源)
4、createProduceRequests将消息封装成ClientRequest,然后NetworkClient.send()将ClientRequest写入KafkaChannel的send字段
5、最后由NetworkClient.poll()将请求发出并处理response和用户自定义的callback

Selector

一个单独的线程,管理多条网络连接上的连接、读、写等操作,使用NIO异步非阻塞模式实现网络I/O

@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    if (this.channels.containsKey(id))
        throw new IllegalStateException("There is already a connection for id " + id);

    //创建SocketChannel
    SocketChannel socketChannel = SocketChannel.open();
    //配置成非阻塞模式
    socketChannel.configureBlocking(false);
    Socket socket = socketChannel.socket();
    //设置为长连接
    socket.setKeepAlive(true);
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setSendBufferSize(sendBufferSize);
    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setReceiveBufferSize(receiveBufferSize);
    socket.setTcpNoDelay(true);
    boolean connected;
    try {
        connected = socketChannel.connect(address);
    } catch (UnresolvedAddressException e) {
        socketChannel.close();
        throw new IOException("Can't resolve address: " + address, e);
    } catch (IOException e) {
        socketChannel.close();
        throw e;
    }
    //将socketChannel注册到nioSelector上,并关注OP_CONNECT事件
    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    //创建KafkaChannel
    KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    //将kafkaChannel注册到key上
    key.attach(channel);
    this.channels.put(id, channel);

    if (connected) {
        // OP_CONNECT won't trigger for immediately connected channels
        log.debug("Immediately connected to node {}", channel.id());
        immediatelyConnectedKeys.add(key);
        key.interestOps(0);
    }
}
poll()

真正执行网络I/O的是Selector.poll()方法

@Override
public void poll(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("timeout should be >= 0");

    //将上一次poll方法的结果都清掉
    clear();

    if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
        timeout = 0;

    /* check ready keys */
    long startSelect = time.nanoseconds();
    //nioselector.select(),等待I/O事件发生
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    currentTimeNanos = endSelect;
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        //处理I/O
        pollSelectionKeys(this.nioSelector.selectedKeys(), false);
        pollSelectionKeys(immediatelyConnectedKeys, true);
    }

    //将stagedReceives复制到completeReceives集合中
    addToCompletedReceives();

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    //关闭长期空闲的连接
    maybeCloseOldestConnection();
}

这其中的核心方法一目了然,就是pollSelectionKeys这个方法,它处理connect、read、write事件

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        //获取kafkaChannel
        KafkaChannel channel = channel(key);

        // register all per-connection metrics at once
        sensors.maybeRegisterConnectionMetrics(channel.id());
        lruConnections.put(channel.id(), currentTimeNanos);

        try {

            /* complete any connections that have finished their handshake (either normally or immediately) */
            if (isImmediatelyConnected || key.isConnectable()) {
                //finishConnect检测CONNECT事件,如果已完成,则取消关注CONNECT,而关注READ
                if (channel.finishConnect()) {
                    //添加到已连接集合中
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                } else
                    //连接未完成,则跳过
                    continue;
            }

            /* if channel is not ready finish prepare */
            if (channel.isConnected() && !channel.ready())
                channel.prepare();

            /* if channel is ready read from any connections that have readable data */
            if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                NetworkReceive networkReceive;
                //read读取到一个完整的networkReceive,则添加到stageReceives中去,若取不到完整的,则返回null,直到读到一个完整的为止
                while ((networkReceive = channel.read()) != null)
                    addToStagedReceives(channel, networkReceive);
            }

            /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
            if (channel.ready() && key.isWritable()) {
                //write将send字段发送出去,如果未完成发送,则返回null,如果发送完成,则返回send,并添加到completedSends里去
                Send send = channel.write();
                if (send != null) {
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
            }

            /* cancel any defunct sockets */
            if (!key.isValid()) {
                close(channel);
                this.disconnected.add(channel.id());
            }

        } catch (Exception e) {
            String desc = channel.socketDescription();
            if (e instanceof IOException)
                log.debug("Connection with {} disconnected", desc, e);
            else
                log.warn("Unexpected error from {}; closing connection", desc, e);
            close(channel);
            this.disconnected.add(channel.id());
        }
    }
}

这里的核心就是read、write还有send方法了,这些操作都在KafkaChannel里进行

InFlightRequests

主要作用是缓存了已经发出去但没收到响应的ClientRequest集合

DefaultMetadataUpdater

在poll消息发送的第一步,首先要确认metadata是否需要更新

 @Override
    public long maybeUpdate(long now) {
        // should we update our metadata?
        // 这里会检测Metadata类的needUpdate的值,计算得出下次更新集群元数据的时间
        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);

        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
        // if there is no node available to connect, back off refreshing metadata
        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                waitForMetadataFetch);

        if (metadataTimeout == 0) {
            // Beware that the behavior of this method and the computation of timeouts for poll() are
            // highly dependent on the behavior of leastLoadedNode.
            Node node = leastLoadedNode(now);
            maybeUpdate(now, node);
        }

        return metadataTimeout;
    }

首先check Metadata.needUpdate字段,默认5分钟更新一次。需要更新时,发送MetadataRequest,通过leastLoadedNode找到负载最小的点,即根据InFlightRequests队列长度来决定负载大小(当发现没有可用节点的时候,就用lastNoNodeAvailableMs记录下当前时间戳)

在接收到response后,会调用MetaUpdater.maybeHandlerCompleteReceive方法检测有有效性,有效响应通过handleResponse来处理,创建新的Cluster并覆盖原有的Cluster数据

NetworkClient

通用的网络客户端实现,包括生产者发送消息,还有消费者消费消息以及服务端Broker之间的通信

ready()

判断ready的条件:
1、check node的host和port有效
2、确保metaData不是正在更新或立即需要更新,并且该节点可以接受消息
3、connectionStates管理的该节点的状态是可连接的
满足条件会调用initiateConnect()建立新连接

send()

NetworkClient.send()做的事就是将request添加到inFlightRequests,并且将请求设置到KafkaChannel.send字段上

poll()

poll方法中调用Selector.poll进行网络I/O,并用handle方法对产生的数据和队列进行处理。
将产生的全部ClientResponse收集到responses列表中,然后遍历responses列表,调用每个ClientRequest中记录的回调,如果异常则请求重发。

/**
 * Handle a produce response
 */
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
    int correlationId = response.request().request().header().correlationId();
    if (response.wasDisconnected()) {
        log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
                                                                                              .request()
                                                                                              .destination());
        for (RecordBatch batch : batches.values())
            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now);
    } else {
        log.trace("Received produce response from node {} with correlation id {}",
                  response.request().request().destination(),
                  correlationId);
        // if we have a response, parse it
        if (response.hasResponse()) {
            ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                TopicPartition tp = entry.getKey();
                ProduceResponse.PartitionResponse partResp = entry.getValue();
                Errors error = Errors.forCode(partResp.errorCode);
                RecordBatch batch = batches.get(tp);
                completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
            }
            this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
            this.sensors.recordThrottleTime(response.request().request().destination(),
                                            produceResponse.getThrottleTime());
        } else {
            // this is the acks = 0 case, just complete all requests
            for (RecordBatch batch : batches.values())
                completeBatch(batch, Errors.NONE, -1L, Record.NO_TIMESTAMP, correlationId, now);
        }
    }
}

因断开连接而产生的ClientResponse,会用canRetry判断并进行重试,若不能重试,则遍历RecordBatch每个消息的CallBack函数,并把RecordBatch的状态设置成异常完成并释放ByteBuffer

client producer差不多就介绍到这里

一些觉得可以Mark的点

1、在KafkaProducer初始化过程里的getConfiguredInstance,主要功能是通过统一反射机制实例化originals字段中指定的类

public <T> T getConfiguredInstance(String key, Class<T> t) {
    Class<?> c = getClass(key);
    if (c == null)
        return null;
    Object o = Utils.newInstance(c);
    if (!t.isInstance(o))
        throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
    if (o instanceof Configurable)
        ((Configurable) o).configure(this.originals);
    return t.cast(o);
}

2、Compress类中初始化GZIP直接new,而Snappy则用反射方式来初始化

private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
    @Override
    public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
        return Class.forName("org.xerial.snappy.SnappyOutputStream")
            .getConstructor(OutputStream.class, Integer.TYPE);
    }
});

主要为了在不使用Snappy时避免引入额外的依赖包

3、在RecordAccumulator.tryAppend()方法中,先加synchronized锁然后再重试,而不是放在一个完整的synchronized代码块中。
——主要是因为BufferPool分配新的ByteBuffer的时候可能会阻塞,当消息1较大等待分配空间时,如果一直持有锁的话,消息2如果较小也需要一起等待,造成不必要的阻塞,所以这里将synchronized代码块拆开。

相关文章

网友评论

    本文标题:Kafka Producer(二)

    本文链接:https://www.haomeiwen.com/subject/xcjdyqtx.html