美文网首页我爱编程CODE REVIEW INDEX
APACHE KAFKA 0.10.0 CODE REVIEW

APACHE KAFKA 0.10.0 CODE REVIEW

作者: hnail | 来源:发表于2018-05-20 20:52 被阅读0次

    导语

    1.Producer、Replicas机制如何实现生产端的At-Least-Once语义?
    2.Consumer、GroupCoordinator如何做到负载均衡以及可靠的offset管理?
    3.Kafka 0.11.0.0版本是如何实现高效的Exactly-Once语义的?//TODO:
    

    Kafka 0.10.0还未实现Exactly-Once[1],consume虽然可以基于事务实现Exactly-Once,但是produce流程如果ack阶段丢失响应依然会造成日志重复,所以本版本的实现Exactly-Once的方式还是建议采用消费端做幂等来实现。


    一.Index

    1.1 Produce流程

    org.apache.kafka.clients.producer.KafkaProducer.KafkaProducer
        ->KafkaProducer.KafkaProducer()
            ->KafkaProducer.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            ->retryBackoffMs、METADATA_MAX_AGE_CONFIG、maxRequestSize、totalMemorySize、compressionType、maxBlockTimeMs、requestTimeoutMs、
            ->this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
            ->org.apache.kafka.common.network.ChannelBuilders.create
                ->PlaintextChannelBuilder.configure //权限相关的暂时都跳过
            ->NetworkClient client = new NetworkClient(new Selector(..channelBuilder..);
                ->client.metadataUpdater=new DefaultMetadataUpdater(metadata)
                ->this.selector = selector;
                ->this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
                ->this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs)
            ->KafkaProducer.sender = new Sender(client,this.metadata,..)
            ->KafkaProducer.ioThread = new KafkaThread(ioThreadName, this.sender, true);
                ->org.apache.kafka.clients.producer.internals.Sender.run //Sender线程
                    ->while (running) 
                        ->org.apache.kafka.clients.producer.internals.Sender.run
                            ->RecordAccumulator.ReadyCheckResult result= Sender.accumulator.ready(cluster, now);//get the list of partitions with data ready to send
                            ->if (result.unknownLeadersExist) this.metadata.requestUpdate() // if there are any partitions whose leaders are not known yet, force metadata update
                            ->client.ready(node, now)//检查metadata是否为最新的、检查connection是否连接、Channel是否ready、inFlightRequests是否超过阈值,不合格的node移除
                            ->Map<Integer, List<RecordBatch>> batches = Sender.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now)
                            ->guaranteeMessageOrder?this.accumulator.mutePartition(batch.topicPartition)//如果设置max.in.flight.requests.per.connection=1,则producer发送完全有序
                                ->this.accumulator.mutePartition(batch.topicPartition);//设置RecordAccumulator.muted,这样drain的时候会检查下个循环放置当前in-flight TopicPartition的数据
                            ->List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); //对于确认超时失败的并不需要再重试的,回收内存,调用RecordBatch.done执行callback
                                ->RecordBatch.maybeExpire
                                    ->RecordBatch.done(new TimeoutException())
                                        ->RecordBatch.thunks.callback.onCompletion(null, exception);
                                        ->produceFuture.done(topicPartition, baseOffset, exception)
                            ->List<ClientRequest> requests = Sender.createProduceRequests(batches, now)
                                ->Sender.produceRequest
                                    ->ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
                                    ->RequestSend send = new RequestSend(Integer.toString(destination),this.client.nextRequestHeader(ApiKeys.PRODUCE),request.toStruct());
                                    ->new ClientRequest(now, acks != 0, send, callback);
                            ->NetworkClient.send(request, now); //Queue up the given request for sending
                                ->NetworkClient.doSend
                                    ->NetworkClient.inFlightRequests.add(request)
                                    ->NetworkClient.Selector.send
                                        ->KafkaChannel.send= send
                                        ->this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
                            ->Sender.NetworkClient.poll(pollTimeout, now); //Do actual reads and writes to sockets
                                ->metadataUpdater.maybeUpdate(now) //判断是否需要更新metadata,如果有则插入一个MetadataRequest请求
                                ->NetworkClient.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)) //Do I/O. Reads, writes, connection establishment
                                    ->Selector.pollSelectionKeys //NIO读写数据
                                        ->↓[Leader Broker]↓KafkaRequestHandler.run
                                            ->kafka.server.KafkaApis.handle
                                                ->kafka.server.KafkaApis.handleProducerRequest[TODO://]
                                                    ->ReplicaManager.appendMessages
                                                        ->ReplicaManager.appendToLocalLog
                                                            ->Partition.appendMessagesToLeader
                                                                ->if (inSyncSize < minIsr && requiredAcks == -1) {throw new NotEnoughReplicasException()}//如果ack要求isr的个数不满足最低配置,则抛出异常,不让写入
                                                                -> val info = log.append(messages, assignOffsets = true)
                                                                    ->val segment = maybeRoll(validMessages.sizeInBytes)   // maybe roll the log if this segment is full
                                                                    ->segment.append(appendInfo.firstOffset, validMessages)// now append to the log
                                                                    ->updateLogEndOffset(appendInfo.lastOffset + 1) //更新log的EOF
                                                                ->replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
                                                                    ->DelayedFetch.tryComplete
                                                                        ->DelayedFetch.onComplete
                                                                            ->ReplicaManager.readFromLocalLog
                                                                                ->if (readOnlyCommitted) localReplica.highWatermark.messageOffset //如果只读取被提交日志,则基于hw读取
                                                                ->Partition.maybeIncrementLeaderHW(leaderReplica) //更新hw,使消费者能消费到被isr最新同步的消息
                                                        ->if ReplicaManager.delayedRequestRequired //如果ack=all或是还有数据append等情况,则添加delayedRequest
                                                            ->delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
                                                                ->DelayedProduce.tryComplete
                                                                    ->Partition.checkEnoughReplicasReachOffset //ack=all时检查是否所有isr已经追上本次日志的offset
                                                                        -> if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) && if (minIsr <= curInSyncReplicas.size) //如果hw满足了requiredOffset,且isr的数量大于要求的数量,则满足日志添加成功的条件
                                                    ->sendResponseCallback
                                    ->Selector.addToCompletedReceives//Selector.stagedReceives->Selector.completedReceives
                                    ->Selector.maybeCloseOldestConnection//关闭空转的connect
                                ->NetworkClient.handleCompletedSends //初始化List<ClientResponse>
                                    ->Selector.completedSends
                                        ->InFlightRequests.requests.get(node)
                                ->NetworkClient.handleCompletedReceives //收到的response赋值List<ClientResponse>
                                    ->Selector.completedReceives、NetworkClient.source
                                    ->NetworkClient.parseResponse
                                    ->NetworkClient.MetadataUpdater.maybeHandleCompletedReceive(req, now, body)
                                    ->responses.add(new ClientResponse(req, now, false, body))
                                ->NetworkClient.handleDisconnections //处理断开的连接
                                    ->NetworkClient.processDisconnection
                                    ->metadataUpdater.requestUpdate()
                                ->NetworkClient.handleConnections //更新上次poll建立的连接
                                ->NetworkClient.handleTimedOutRequests //处理超时链接
                                    ->this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs)
                                    ->processDisconnection(responses, nodeId, now);
                                    ->metadataUpdater.requestUpdate();
                                ->response.request().callback().onComplete(response) //处理回调
                                    ->Sender.handleProduceResponse
            ->KafkaProducer.keySerializer、KafkaProducer.valueSerializer
            ->KafkaProducer.interceptors
        ->KafkaProducer.send(ProducerRecord<K, V> record, Callback callback)
            ->KafkaProducer.doSend()
                ->KafkaProducer.waitOnMetadata //设置Metadata.needUpdate=true,唤醒client,然后等待metadata更新,NetworkClient.poll会调用metadataUpdater.maybeUpdate(now)发送MetadataRequest
                    ->KafkaProducer.metadata.add(topic)
                    ->int version = metadata.requestUpdate()
                ->serializedKey = keySerializer.serialize(record.topic(), record.key());serializedValue = valueSerializer.serialize(record.topic(), record.value());
                ->KafkaProducer.partition(record, serializedKey, serializedValue, metadata.fetch());
                ->RecordAccumulator.append //数据存到RecordAccumulator.batches相应的TopicPartition的RecordBatch中
                    ->RecordAccumulator.tryAppend
                        ->RecordBatch.tryAppend->RecordBatch.records.append(offsetCounter++, timestamp, key, value);//add to MemoryRecords
                        ->RecordBatch.thunks.add(new Thunk(callback, future));//保存回调函数
                    ->//如果没有可用的,则初始化一个新的RecordBatch给当前TopicPartition
                        ->BufferPool.allocate
                        ->MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                        ->RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                        ->RecordBatch.tryAppend(timestamp, key, value, callback, time.milliseconds())
                        ->dq.addLast(batch);
                ->if (result.batchIsFull || result.newBatchCreated) {this.sender.wakeup();} //Waking up the sender 
        ->KafkaProducer.close
    

    1.2 Consume流程

    org.apache.kafka.clients.consumer.KafkaConsumer
        ->KafkaConsumer.KafkaConsumer()
            ->KafkaConsumer.metadata.update(Cluster.bootstrap(addresses), 0)
            ->NetworkClient netClient = new NetworkClient()
            ->KafkaConsumer.client = new ConsumerNetworkClient(netClient...)
            ->client = new ConsumerNetworkClient()
            ->this.subscriptions = new SubscriptionState(offsetResetStrategy);
            ->List<PartitionAssignor> assignors = config.getConfiguredInstances(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,PartitionAssignor.class);
            ->this.interceptors = interceptorList
            ->this.coordinator = new ConsumerCoordinator() //初始化当前consumer group的
                ->ConsumerCoordinator.metadata.requestUpdate();
                ->ConsumerCoordinator.addMetadataListener //metadata发生变化需要触发PartitionReAssignment
                    ->onMetadataUpdate->ConsumerCoordinator.subscriptions.needsPartitionAssignment=true //check if there are any changes to the metadata which should trigger a rebalance
                ->if (autoCommitEnabled)  //如果开启了AutoCommit
                    -> this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
                        ->AutoCommitTask.run
                            ->ConsumerCoordinator.commitOffsetsAsync
                                ->ConsumerCoordinator.lookupCoordinator()//查找当前group的Coordinator
                                    ->AbstractCoordinator.sendGroupCoordinatorRequest //发送CoordinatorRequest
                                        ->Node node = this.client.leastLoadedNode();//最后一个node
                                        ->GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
                                        ->AbstractCoordinator.client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) [Broker TODO://]
                                        ->AbstractCoordinator.handleGroupMetadataResponse
                                            ->AbstractCoordinator.coordinator=new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(),AbstractCoordinator.groupCoordinatorResponse.node().port())//设置本
                                ->ConsumerCoordinator.doCommitOffsetsAsync
                                    ->ConsumerCoordinator.sendOffsetCommitRequest(offsets);
                                        ->OffsetCommitRequest req = new OffsetCommitRequest()
                                        ->ConsumerNetworkClient.send
                                            ->ConsumerNetworkClient.unsent.put(node, nodeUnsent);
                                            ->OffsetCommitResponseHandler.handle //回调
                                                ->ConsumerCoordinator.subscriptions.committed(tp, offsetAndMetadata) // update the local cache only if the partition is still assigned
                                ->ConsumerNetworkClient.pollNoWakeup//触发IO
        ->KafkaConsumer.subscribe
            ->KafkaConsumer.subscriptions.subscribe(topics, listener);
                ->SubscriptionState.subscribe
                    ->setSubscriptionType(SubscriptionType.AUTO_TOPICS)]
                    ->SubscriptionState.changeSubscription
                        ->SubscriptionState.subscription.addAll(topicsToSubscribe);
                        ->SubscriptionState.groupSubscription.addAll(topicsToSubscribe)
                        ->SubscriptionState.needsPartitionAssignment = true;
                        ->org.apache.kafka.clients.Metadata.topics.addAll(topics)
            ->KafkaConsumer.metadata.setTopics(subscriptions.groupSubscription());
        ->KafkaConsumer.poll
            ->KafkaConsumer.pollOnce// this does any needed heart-beating, auto-commits, and offset updates.
                ->AbstractCoordinator.ensureActiveGroup //Ensure that the group is active (i.e. joined and synced)
                    ->AbstractCoordinator.ensureCoordinatorReady //可以基于groupid查到Coordinator 
                        ->[Broker]kafka.server.KafkaApis.handleGroupCoordinatorRequest //基于groupid hash然后判断__consumer_offsets topic相应replica的leader brokerID来确定的
                    ->ConsumerCoordinator.onJoinPrepare
                        ->ConsumerCoordinator.commitOffsetsSync //提交之前的offset
                        ->ConsumerCoordinator.subscriptions.needsPartitionAssignment=true
                    ->AbstractCoordinator.sendJoinGroupRequest
                        ->JoinGroupRequest request = new JoinGroupRequest()
                        ->ConsumerNetworkClient.send(coordinator, ApiKeys.JOIN_GROUP, request)
                            ->↓[GroupCoordinator]↓kafka.server.KafkaApis.handleJoinGroupRequest // join group
                                ->GroupCoordinator.handleJoinGroup
                                    ->if (group == null) group = groupManager.addGroup(new GroupMetadata(groupId, protocolType)) //如果没有group信息,new一个
                                    ->GroupCoordinator.doJoinGroup
                                        ->Case => memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID)//如果memberId不存在,则初始化一个,并返回给客户端
                                            ->GroupCoordinator.addMemberAndRebalance
                                                ->val memberId = clientId + "-" + group.generateMemberIdSuffix
                                                ->val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
                                                ->group.add(member.memberId, member)
                                                ->GroupCoordinator.maybePrepareRebalance
                                                    ->GroupCoordinator.prepareRebalance
                                                        -> group.transitionTo(PreparingRebalance)
                                                        ->joinPurgatory.tryCompleteElseWatch(new DelayedJoin(this, group, rebalanceTimeout), Seq(groupKey)) //加入DelayedJoin监听,等待已知的member重新join或是超时
                                                            ->group.notYetRejoinedMembers.isEmpty //已知的member都已经join
                                                                ->GroupCoordinator.onCompleteJoin
                                                                    ->group.initNextGeneration() //标志一个周期,并转为AwaitingSync状态
                                                                        ->generationId += 1
                                                                        ->transitionTo(AwaitingSync)
                                                                    ->member.awaitingJoinCallback(joinResult) //向所有节点发送join response
                                                                        ->val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,joinResult.memberId, joinResult.leaderId, members)
                                                                        ->requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    
                                                                    ->GroupCoordinator.completeAndScheduleNextHeartbeatExpiration //更新当前节点心跳
                                        ->ELSE => 
                                            ->GroupCoordinator.updateMemberAndRebalance 
                                                ->GroupCoordinator.maybePrepareRebalance //同上
    
                            ->↓[Consumer]↓JoinGroupResponseHandler.handle
                                ->AbstractCoordinator.this.memberId = joinResponse.memberId()
                                ->AbstractCoordinator.this.generation = joinResponse.generationId();
                                ->if (joinResponse.isLeader()) 
                                    ->[Leader]AbstractCoordinator.onJoinLeader
                                        ->Map<String, ByteBuffer> groupAssignment = ConsumerCoordinator.performAssignment
                                            ->ConsumerCoordinator.performAssignment //Consumer端进行Assignment
                                                ->PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
                                                ->......
                                        ->SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
                                        ->AbstractCoordinator.sendSyncGroupRequest
                                            ->client.send(coordinator, ApiKeys.SYNC_GROUP, request).compose(new SyncGroupResponseHandler())
                                                ->↓[GroupCoordinator]↓KafkaApis.handleSyncGroupRequest
                                                    ->GroupCoordinator.handleSyncGroup
                                                        ->GroupCoordinator.doSyncGroup
                                                            ->completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) //更新心跳
                                                            ->if (memberId == group.leaderId) 
                                                                ->GroupMetadataManager.store
                                                                    ->GroupMetadataManager.prepareStoreGroup
                                                                        ->val groupMetadataMessageSet = Map(groupMetadataPartition ->new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message)) //分区信息写入__consumer_offsets分区
                                                                        ->replicaManager.appendMessages()
                                                                    ->DelayedStore(groupMetadataMessageSet, putCacheCallback)//等待replica确认写入
                                                                    ->GroupCoordinator.setAndPropagateAssignment(group, assignment) //写入成功则
                                                                        ->GroupCoordinator.propagateAssignment
                                                                            ->member.awaitingSyncCallback(member.assignment, errorCode) //写回consumer
                                                                                ->val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
                                                                                ->requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
                                                                            ->completeAndScheduleNextHeartbeatExpiration(group, member)//保存心跳
                                                                    ->group.transitionTo(Stable) //切换为Stable状态
                                                ->↓[Consumer]↓SyncGroupResponseHandler.handle
                                    ->[Follower]AbstractCoordinator.onJoinFollower
                                        ->AbstractCoordinator.sendSyncGroupRequest//send follower's sync group with an empty assignment
                    ->ConsumerCoordinator.onJoinComplete
                        ->Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
                        ->ConsumerCoordinator.SubscriptionState.assignFromSubscribed(assignment.partitions()); //更新SubscriptionState.assignment
                        ->ConsumerRebalanceListener.onPartitionsAssigned(assigned);
                    ->AbstractCoordinator.HeartbeatTask.reset
                        ->AbstractCoordinator.HeartbeatTask.run //发送心跳线程
                            ->AbstractCoordinator.sendHeartbeatRequest
                                ->HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId)
                                ->client.send(coordinator, ApiKeys.HEARTBEAT, req)
                                ->HeartbeatCompletionHandler.handle
                ->KafkaConsumer.updateFetchPositions //向leader发送请求,更新subscriptions中的offset
                    ->KafkaConsumer.Fetcher.resetOffsetsIfNeeded(partitions); //如果重置offset,则向leader Replica请求hw的值
                        ->Fetcher.resetOffset
                            ->Fetcher.sendListOffsetRequest //请求topicPartition的leader
                                ->Node node = info.leader();
                                ->ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
                                ->client.send(node, ApiKeys.LIST_OFFSETS, request)
                                    ->↓[GroupCoordinator]↓KafkaApis.handleOffsetRequest
                                        ->val hw = localReplica.highWatermark.messageOffset
                                ->Fetcher.handleListOffsetResponse
                                    ->Fetcher.subscriptions.seek(partition, offset);
                    ->coordinator.refreshCommittedOffsetsIfNeeded();//更新之前的消费offset
                        ->ConsumerCoordinator.fetchCommittedOffsets
                            ->ConsumerCoordinator.sendOffsetFetchRequest
                                ->OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));
                                ->client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
                                    ->↓[GroupCoordinator]↓KafkaApis.handleOffsetFetchRequest
                                        ->GroupCoordinator.handleFetchOffsets
                                            ->GroupMetadataManager.getOffsets
                                                ->GroupMetadataManager.offsetsCache.get(...)//直接从offsetcache中读取
                                ->OffsetFetchResponseHandler.handle
                            ->client.poll(future);
                        ->ConsumerCoordinator.subscriptions.committed(tp, entry.getValue());//更新到ConsumerCoordinator的SubscriptionState中
                    ->fetcher.updateFetchPositions(partitions);
                ->Fetcher.fetchedRecords//Return the fetched records, empty the record buffer and update the consumed position. 从Fetcher.completedFetches提取历史抓取的记录先返回
                ->Fetcher.sendFetches //如果历史的不能满足需求,马上发送Fetcher请求,抓取数据
                    ->Fetcher.createFetchRequests //创建各个node的FetchRequest
                    ->client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
                    ->Fetcher.completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));//抓到的存入Fetcher.completedFetches
                ->client.poll(timeout, now);
                    ->↓[Leader]↓KafkaApis.handleFetchRequest //同下Follower抓取日志,区别是基于hw作为读取的阈值
                ->Fetcher.fetchedRecords()//同上
            ->KafkaConsumer.Fetcher.sendFetches//继续发送下一轮抓取请求
            ->ConsumerNetworkClient.poll
        ->KafkaConsumer.commitSync
            ->ConsumerCoordinator.commitOffsetsSync
                ->ConsumerCoordinator.sendOffsetCommitRequest
                    ->OffsetCommitRequest req = new OffsetCommitRequest();
                    ->client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                        ->↓[GroupCoordinator]↓KafkaApis.handleOffsetCommitRequest
                            ->GroupCoordinator.handleCommitOffsets //等待写入topic offset,更新缓存
                                ->delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,offsetMetadata, responseCallback))
                                ->replicaManager.appendMessages()
                                ->GroupMetadataManager.offsetsCache.put(key, offsetAndMetadata) //update cache
                    ->OffsetCommitResponseHandler.handle
                        ->ConsumerCoordinator.subscriptions.committed(tp, offsetAndMetadata);// update the local cache
    

    1.3 Replicas机制

    ↓[Controller]↓kafka.server.KafkaApis.createTopic
        ->kafka.admin.AdminUtils.createTopic
            ->val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)//查询机架信息
            ->AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)//分配partition的AR列表
            ->AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK //AR保存到zk中[NewPartition]
                ->[NewPartition -> OnlinePartition]kafka.controller.PartitionStateMachine.handleStateChange//触发NewPartition -> OnlinePartition
                    ->PartitionStateMachine.initializeLeaderAndIsrForPartition(topicAndPartition) //initialize leader and isr path for new partition
                        ->val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),controller.epoch) //AR列表中的第一个作为leader,其他作为isr成员
                        ->zkUtils.createPersistentPath(getTopicPartitionLeaderAndIsrPath() //将PartitionLeaderAndIsr写入zk
                        ->controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch) //controllerContext中保存leaderIsr信息
                        ->ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers //向broker发送LeaderAndIsrRequest
                            ->controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
                                ->↓[Broker]↓kafka.server.KafkaApis.handleLeaderAndIsrRequest //broker处理LeaderAndIsrRequest请求
                                    ->kafka.server.ReplicaManager.becomeLeaderOrFollower
                                        ->partitionLeaderEpoch < stateInfo.leaderEpoch//检查controller周期是否大于broker周期,防止接收过期的Controller请求
                                        ->↓[Leader]↓kafka.server.ReplicaManager.makeLeaders //基于leaderID是否为当前brokerID判断自己是不是leader
                                            ->replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) //First stop fetchers for all the partitions
                                            ->kafka.cluster.Partition.makeLeader //设置remote replicas的LEO并设置leader and isr
                                                ->Partition.getOrCreateReplica
                                                    ->Case Local //如果是leader Replica
                                                        ->val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) //关联本地Log对象,基于此分辨是本地副本还是远程副本
                                                        ->val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) //读取hw的Checkpoint存储
                                                        ->val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset) //如果有hw则取出当前patition的hw
                                                        ->val localReplica = new Replica(replicaId, this, time, offset, Some(log))
                                                    ->Case remote
                                                        ->val remoteReplica = new Replica(replicaId, this, time)  
                                                    ->Partition.assignedReplicaMap.putIfNotExists(replica.brokerId, replica)//更新
                                            ->isNewLeader
                                                ->leaderReplica.convertHWToLocalOffsetMetadata()
                                                    ->Replica.highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)//设置本地副本的hw
                                                ->kafka.cluster.Replica.updateLogReadResult //更新远程副本的LEO Metadata
                                            ->Partition.maybeIncrementLeaderHW //检查isr的LEO,更新hw
                                            ->kafka.cluster.Partition.tryCompleteDelayedRequests[TODO://检查DelayedFetch与DelayedProduce条件]
                                                ->replicaManager.tryCompleteDelayedFetch(requestKey)
                                                ->replicaManager.tryCompleteDelayedProduce(requestKey)
                                        ->↓[Follower]↓kafka.server.ReplicaManager.makeFollowers
                                            ->AbstractFetcherManager.removeFetcherForPartitions //首先停止之前的Fetcher
                                                ->AbstractFetcherThread.removePartitions
                                            ->kafka.log.LogManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)//日志截断到hw位置
                                            ->tryCompleteDelayedProduce(topicPartitionOperationKey)
                                            ->tryCompleteDelayedFetch(topicPartitionOperationKey)
                                            ->kafka.server.AbstractFetcherManager.addFetcherForPartitions //启动Fetcher任务
                                                ->createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
                                                    ->ReplicaFetcherManager.createFetcherThread
                                                        ->ReplicaFetcherThread.ReplicaFetcherThread()
                                                ->fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
                                                ->fetcherThread.start
                                                    ->AbstractFetcherThread.doWork //开始从leader抓取数据
                                                        ->new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))
                                                        ->AbstractFetcherThread.processFetchRequest
                                                            ->ReplicaFetcherThread.fetch
                                                                ->ReplicaFetcherThread.sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
                                                                    ->↓[Leader]↓kafka.server.KafkaApis.handleFetchRequest //leader响应fetch请求
                                                                        ->kafka.server.ReplicaManager.fetchMessages
                                                                            ->ReplicaManager.readFromLocalLog// read from local logs
                                                                            ->kafka.server.ReplicaManager.updateFollowerLogReadResults //如果是Follower的fetch请求
                                                                                ->Partition.updateReplicaLogReadResult //更新replica的LEO Metadata
                                                                                    ->Partition.maybeExpandIsr //如果replica的LEO达到了hw,则加入isr集合,即Partition.inSyncReplicas,并更新zk
                                                                                        ->val newInSyncReplicas = inSyncReplicas + replica
                                                                                        ->Partition.updateIsr
                                                                                        ->Partition.maybeIncrementLeaderHW(leaderReplica)
                                                                                ->tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicAndPartition)) [TODO://]
                                                                            ->val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) //如果条件不满足,则创建DelayedFetch等条件,否则直接返回数据
                                                                            ->delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
                                                                        ->sendResponseCallback
                                                            ->ReplicaFetcherThread.processPartitionData  //process fetched data
                                                                ->replica.log.get.append(messageSet, assignOffsets = false)
                        ->ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers //Send UpdateMetadataRequest to the given brokers
                    ->PartitionStateMachine.partitionState.put(topicAndPartition, OnlinePartition)//保存Partition状态Controller
    

    二.reference

    三.下篇

    APACHE HBASE 1.2.0 CODE REVIEW


    1. 0.11.0+版本提供支持

    相关文章

      网友评论

        本文标题:APACHE KAFKA 0.10.0 CODE REVIEW

        本文链接:https://www.haomeiwen.com/subject/lqztjftx.html