一.概述KafkaConsumer类
Kafka Consumer不是个线程安全的类。为了便于分析,我们认为消费者所有的操作都是在同一个线程中完成,所以不用考虑锁的问题。这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单,快速。还可以使用两个线程池实现“生产者-消费者”模式,解耦消息消费和消息处理的逻辑。其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。
Kafka Consumer实现了Consumer接口,Consumer接口中定义了Kafka Consumer对外的api,核心接口有以下六类。
- subscribe()方法:订阅指定的topic,并为消费者自动分配分区。
- assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥,后面讲述如何互斥的。
- commit*()方法:提交消费者已经消费的位置。
- seek*()方法:指定消费者起始消费位置。
- poll()方法:负责从服务端获取消息。
-
pause(),resume()方法:暂停/继续consumer,暂停后poll()方法会返回空。
下面来看下KafkaConsumer的具体实现,先了解下字段:
image.png - clientId:Consumer的唯一标识。
- coordinator: 控制Consuemr与服务端的GroupCoordinator之间的通信,可以理解为Consumer与服务端GroupCoordinator通信的门面。
- keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
- fetcher:负责从服务端获取信息。
- interceptors:ConsumerInterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。与ProducerInterceptors类似。
- client:负责消费者和Kafka服务端的通信。
- subscriptions:维护了消费者的消费状态。
- metadata:记录了整个Kafka集群的元信息。
- currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,并不是真正意义上的锁,仅仅是为了检测是否有多线程并发操作KafkaConsumer而已。
我们逐一分析KafkaConsumer依赖的组件的功能和实现。
ConsumerNetworkClient:
NetworkClient依赖KSeelctor,InFlightRequests,Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector类实现了发送请求的功能,并通过一系列handle*()方法处理请求响应,超时请求以及断线重连。ConsumerNetworkClient在NetworkClient的基础上进行了封装,提供了更高级的功能和更加易用的API。
image.png
- client:NetworkClient对象。
- delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。PriorityQueue是一个非线程安全的,无界的,优先级队列,实现原理四小顶堆,底层基于数组实现,对应的线程安全实现是PriorityBlockingQueue。可以读下JDK相关的源码。后面会分析到,这个定时任务队列是心跳任务。
- metadata:用于管理Kafka集群元数据。
- unsent: 缓冲队列,Map<Node,List<ClientRequest>>类型,key是Node节点,value是发往此Node的ClientRequest集合。
- unsentExpiryMs: ClientRequest在unsent中缓存的超时时长。
- wakeup:由调用KafkaConsumer对象的的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
- wakeupDisabledCount: KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。
ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll()方法有多个重载,最终会调用poll(long timeout, long now, boolean executeDelayedTasks)重载,这三个参数的含义分别是:timeout表示poll()方法最长阻塞时间。now表示当前的时间戳;executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。
下面介绍下流程:
(1)调用ConsumerNetworkClient.trySend()方法循环处理unsent中缓存的请求。具体逻辑是:对每个Node节点,循环遍历所有对应的ClientRequest列表,每次循环都调用NetworkClient.ready()方法检测消费者与此节点之间的连接,已经发送请求的条件。若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队列中等待响应,然后放入KafkaChannel的send字段中等待发送,并将此消息从列表删除。代码如下:
private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {//便利unsent集合
ClientRequest request = iterator.next();
//调用NetworkClient.ready()检测是否可以发送请求
if (client.ready(node, now)) {
//等待发送请求
client.send(request, now);
//从unsent集合中删除此请求。
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}
(2)计算超时时间,由timeout和delayedTasks队列中最近要执行的定时任务的时间共同决定。下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时间,避免影响定时任务的执行。
(3)调用NetworkClient.poll()方法,将KafkaChannel.send字段指定的消息发送出去。NetworkClient.poll()方法可能会更新Metadata,并且使用一系列handle*()方法处理请求响应,连接断开,超时等情况,并调用每个请求的回调函数。
(4)调用ConsumerNetworkClient.maybeTriggerWakeup()方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求抛出WakeupException异常,中断当前ConsumerNetworkClient.poll()方法。
private void maybeTriggerWakeup() {
//通过wakeupDisabledCount检测是否在执行不可中断的方法,通过wakeup检测是否有中断请求
if (wakeupDisabledCount == 0 && wakeup.get()) {
wakeup.set(false);//重置中断标志
throw new WakeupException();
}
}
(5)调用checkDisconnects()方法检测连接状态。检测消费者和每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。
private void checkDisconnects(long now) {
// any disconnects affecting requests that have already been transmitted will be handled
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
// and set the disconnect flag in the ClientResponse
Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
while (iterator.hasNext()) {//遍历unsent集合中的每个 Node
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Node node = requestEntry.getKey();
if (client.connectionFailed(node)) {//检测消费者与每个node的连接状态。
// Remove entry before invoking request callback to avoid callbacks handling
// coordinator failures traversing the unsent list again.
iterator.remove();//从unsent集合中删除此Node对应的全部 ClientRequest
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
//调用ClientRequest的回调函数。
handler.onComplete(new ClientResponse(request, now, true, null));
}
}
}
}
断开连接的Node对应的已经发出去的请求,由NetworkClient进行异常处理,具体参照前面生产者的分析。
(6)根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中执行的任务,则调用delayedTasks.poll()方法。
(7)再次调用trySend()方法。在步骤3中调用了NetworkClient.poll()方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这次再次尝试调用trySend()方法。
(8)调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求,他会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合删除。
private void failExpiredRequests(long now) {
// clear all expired unsent requests and fail their corresponding futures
Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
while (iterator.hasNext()) {//遍历unsent集合
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
while (requestIterator.hasNext()) {
ClientRequest request = requestIterator.next();
if (request.createdTimeMs() < now - unsentExpiryMs) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));//调用回调函数
requestIterator.remove();//删除ClientRequest
} else
break;
}
if (requestEntry.getValue().isEmpty())//队列已经为空,则从unsent集合中删除
iterator.remove();
}
}
分析完poll()方法的详细步骤后,我们看看其实现代码:
private void poll(long timeout, long now, boolean executeDelayedTasks) {
// send all the requests we can send now
// 步骤1:检测发送条件,并将请求放入KafkaChannel.send字段,待发送。
trySend(now);
// ensure we don't poll any longer than the deadline for
// the next scheduled task 步骤2:计算超时时间
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
//步骤3,4:调用NetworkClient.poll()方法,并检测是否有中断请求。
clientPoll(timeout, now);
now = time.milliseconds();//重置当前时间。
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);//步骤5:根据连接状态,处理unsent中的请求
// execute scheduled tasks
if (executeDelayedTasks)//步骤6:处理定时任务
delayedTasks.poll(now);
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
//步骤7:检测发送条件,重新设置KafkaChannel.send字段,并超时断线重连。
trySend(now);
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);//步骤8:处理unsent中的超时任务
}
pollNoWakeup()方法是poll()方法的变体,表示执行不可中断的poll()方法。具体方法是:在执行poll()方法前,调用disableWakeups()方法将wakeupDisabledCount加一,然后调用poll()方法。这样即使别的线程请求中断,也不会响应。
poll(future)是poll()方法的另一个实现阻塞发送请求的功能,代码如下:
/**
* Block indefinitely until the given request future has finished.
* @param future The request future to await.
* @throws WakeupException if {@link #wakeup()} is called from another thread
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())//循环检测Future,即请求的完成情况。
poll(Long.MAX_VALUE);//请求未完成,则调用poll()方法
}
在ConsumerNetworkClient.send()方法中,会将待发送的请求封装成ClientRequest,然后保存在unsent集合中等待发送,代码如下:
/**
* Send a new request. Note that the request is not actually transmitted on the
* network until one of the {@link #poll(long)} variants is invoked. At this
* point the request will either be transmitted successfully or will fail.
* Use the returned future to obtain the result of the send. Note that there is no
* need to check for disconnects explicitly on the {@link ClientResponse} object;
* instead, the future will be failed with a {@link DisconnectException}.
* @param node The destination of the request
* @param api The Kafka API call
* @param request The request payload
* @return A future which indicates the result of the send.
*/
public RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
//创建ClientRequest对象,并保存到unsent集合中
put(node, new ClientRequest(now, true, send, future));
return future;
}
这里用到了回调对象RequestFutureCompletionHandler。RequestFutureHandler接口只有onComplete()方法,此接口的另一个实现是前面介绍的KafkaProducer端的请求回调(Sender的一个内部类)。
image.png
ConsumerNetworkClient.RequestFutureCompletionHandler.onComplete()方法的代码如下:
public void onComplete(ClientResponse response) {
if (response.wasDisconnected()) {//因连接故障而产生的ClientResponse对象
ClientRequest request = response.request();
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
int correlation = send.header().correlationId();
log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
api, request, correlation, send.destination());
//调用继承自父类 RequestFuture 的raise()方法
raise(DisconnectException.INSTANCE);
} else {
complete(response);//调用继承自父类RequestFuture的complete()方法
}
}
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,还继承了RequestFuture类。RequestFuture是一个泛型类,核心字段如下:
- isDone:表示当前的请求是否已经完成,无论正常完成还是出现异常,这个字段都是置为true。
- exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空表示出现异常,反之表示正常完成。
- value:记录请求正常完成时收到的响应,与exception互斥。此字段非空表示正常完成,反之表示出现异常。
- listeners: RequestFutureListener集合,用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常的情况。
在RequestFuture中有两处设计模式的使用:一处是compose()方法,使用了配适器模式;另一处是chain()方法,使用了责任链的模式。compose()方法相关代码如下:
/**
*
* 它将RequestFuture<T>适配成RequestFuture<S>
* Convert from a request future of one type to another type
* @param adapter The adapter which does the conversion
* @param <S> The type of the future adapted to
* @return The new future
*/
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
final RequestFuture<S> adapted = new RequestFuture<S>();//添加配置器后返回的结果
//在当前RequestFuture上添加监听器。
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
下图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播过程。当调用RequestFuture<T>对象的complete()或onFailure方法,然后调用RequestFutureAdapter<T, S>的对应方法,最终调用RequestFuture<S>对象的对应方法。
使用compose()方法适配.jpg
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture直接传递事件。代码如下:
public void chain(final RequestFuture<T> future) {
addListener(new RequestFutureListener<T>() {//添加监听器
@Override
public void onSuccess(T value) {
//通过监听器value传递给下一个RequestFuture对象
future.complete(value);
}
@Override
public void onFailure(RuntimeException e) {
future.raise(e);//通过监听器将异常传递给下一个RequestFuture对象。
}
});
}
RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法。
简单介绍ConsumerNetworkClient几个常用的功能:
- awaitMetadataUpdate()方法:循环调用poll()方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完毕。
- awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
- put()方法:向unsent中添加请求。
- schedule()方法:向delayedTasks队列中添加定时任务。
- leastLoadedNode()方法:查找Kafka集群中负载最低的Node。
网友评论