美文网首页
Kafka源码分析-Consumer(3)-ConsumerNe

Kafka源码分析-Consumer(3)-ConsumerNe

作者: 陈阳001 | 来源:发表于2018-11-12 13:50 被阅读0次

    一.概述KafkaConsumer类

    Kafka Consumer不是个线程安全的类。为了便于分析,我们认为消费者所有的操作都是在同一个线程中完成,所以不用考虑锁的问题。这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单,快速。还可以使用两个线程池实现“生产者-消费者”模式,解耦消息消费和消息处理的逻辑。其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。
    Kafka Consumer实现了Consumer接口,Consumer接口中定义了Kafka Consumer对外的api,核心接口有以下六类。

    • subscribe()方法:订阅指定的topic,并为消费者自动分配分区。
    • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥,后面讲述如何互斥的。
    • commit*()方法:提交消费者已经消费的位置。
    • seek*()方法:指定消费者起始消费位置。
    • poll()方法:负责从服务端获取消息。
    • pause(),resume()方法:暂停/继续consumer,暂停后poll()方法会返回空。
      下面来看下KafkaConsumer的具体实现,先了解下字段:


      image.png
    • clientId:Consumer的唯一标识。
    • coordinator: 控制Consuemr与服务端的GroupCoordinator之间的通信,可以理解为Consumer与服务端GroupCoordinator通信的门面。
    • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
    • fetcher:负责从服务端获取信息。
    • interceptors:ConsumerInterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。与ProducerInterceptors类似。
    • client:负责消费者和Kafka服务端的通信。
    • subscriptions:维护了消费者的消费状态。
    • metadata:记录了整个Kafka集群的元信息。
    • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,并不是真正意义上的锁,仅仅是为了检测是否有多线程并发操作KafkaConsumer而已。
      我们逐一分析KafkaConsumer依赖的组件的功能和实现。

    ConsumerNetworkClient:

    NetworkClient依赖KSeelctor,InFlightRequests,Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector类实现了发送请求的功能,并通过一系列handle*()方法处理请求响应,超时请求以及断线重连。ConsumerNetworkClient在NetworkClient的基础上进行了封装,提供了更高级的功能和更加易用的API。


    image.png
    • client:NetworkClient对象。
    • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。PriorityQueue是一个非线程安全的,无界的,优先级队列,实现原理四小顶堆,底层基于数组实现,对应的线程安全实现是PriorityBlockingQueue。可以读下JDK相关的源码。后面会分析到,这个定时任务队列是心跳任务。
    • metadata:用于管理Kafka集群元数据。
    • unsent: 缓冲队列,Map<Node,List<ClientRequest>>类型,key是Node节点,value是发往此Node的ClientRequest集合。
    • unsentExpiryMs: ClientRequest在unsent中缓存的超时时长。
    • wakeup:由调用KafkaConsumer对象的的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
    • wakeupDisabledCount: KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。
      ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll()方法有多个重载,最终会调用poll(long timeout, long now, boolean executeDelayedTasks)重载,这三个参数的含义分别是:timeout表示poll()方法最长阻塞时间。now表示当前的时间戳;executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

    下面介绍下流程:
    (1)调用ConsumerNetworkClient.trySend()方法循环处理unsent中缓存的请求。具体逻辑是:对每个Node节点,循环遍历所有对应的ClientRequest列表,每次循环都调用NetworkClient.ready()方法检测消费者与此节点之间的连接,已经发送请求的条件。若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队列中等待响应,然后放入KafkaChannel的send字段中等待发送,并将此消息从列表删除。代码如下:

     private boolean trySend(long now) {
            // send any requests that can be sent now
            boolean requestsSent = false;
            for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
                Node node = requestEntry.getKey();
                Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
                while (iterator.hasNext()) {//便利unsent集合
                    ClientRequest request = iterator.next();
                    //调用NetworkClient.ready()检测是否可以发送请求
                    if (client.ready(node, now)) {
                        //等待发送请求
                        client.send(request, now);
                        //从unsent集合中删除此请求。
                        iterator.remove();
                        requestsSent = true;
                    }
                }
            }
            return requestsSent;
        }
    

    (2)计算超时时间,由timeout和delayedTasks队列中最近要执行的定时任务的时间共同决定。下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时间,避免影响定时任务的执行。
    (3)调用NetworkClient.poll()方法,将KafkaChannel.send字段指定的消息发送出去。NetworkClient.poll()方法可能会更新Metadata,并且使用一系列handle*()方法处理请求响应,连接断开,超时等情况,并调用每个请求的回调函数。
    (4)调用ConsumerNetworkClient.maybeTriggerWakeup()方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求抛出WakeupException异常,中断当前ConsumerNetworkClient.poll()方法。

    private void maybeTriggerWakeup() {
            //通过wakeupDisabledCount检测是否在执行不可中断的方法,通过wakeup检测是否有中断请求
            if (wakeupDisabledCount == 0 && wakeup.get()) {
                wakeup.set(false);//重置中断标志
                throw new WakeupException();
            }
        }
    

    (5)调用checkDisconnects()方法检测连接状态。检测消费者和每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

        private void checkDisconnects(long now) {
            // any disconnects affecting requests that have already been transmitted will be handled
            // by NetworkClient, so we just need to check whether connections for any of the unsent
            // requests have been disconnected; if they have, then we complete the corresponding future
            // and set the disconnect flag in the ClientResponse
            Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
            while (iterator.hasNext()) {//遍历unsent集合中的每个 Node
                Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
                Node node = requestEntry.getKey();
                if (client.connectionFailed(node)) {//检测消费者与每个node的连接状态。
                    // Remove entry before invoking request callback to avoid callbacks handling
                    // coordinator failures traversing the unsent list again.
                    iterator.remove();//从unsent集合中删除此Node对应的全部 ClientRequest
                    for (ClientRequest request : requestEntry.getValue()) {
                        RequestFutureCompletionHandler handler =
                                (RequestFutureCompletionHandler) request.callback();
                        //调用ClientRequest的回调函数。
                        handler.onComplete(new ClientResponse(request, now, true, null));
                    }
                }
            }
        }
    

    断开连接的Node对应的已经发出去的请求,由NetworkClient进行异常处理,具体参照前面生产者的分析。
    (6)根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中执行的任务,则调用delayedTasks.poll()方法。
    (7)再次调用trySend()方法。在步骤3中调用了NetworkClient.poll()方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这次再次尝试调用trySend()方法。
    (8)调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求,他会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合删除。

    private void failExpiredRequests(long now) {
            // clear all expired unsent requests and fail their corresponding futures
            Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
            while (iterator.hasNext()) {//遍历unsent集合
                Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
                Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
                while (requestIterator.hasNext()) {
                    ClientRequest request = requestIterator.next();
                    if (request.createdTimeMs() < now - unsentExpiryMs) {
                        RequestFutureCompletionHandler handler =
                                (RequestFutureCompletionHandler) request.callback();
                        handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));//调用回调函数
                        requestIterator.remove();//删除ClientRequest
                    } else
                        break;
                }
                if (requestEntry.getValue().isEmpty())//队列已经为空,则从unsent集合中删除
                    iterator.remove();
            }
        }
    

    分析完poll()方法的详细步骤后,我们看看其实现代码:

    private void poll(long timeout, long now, boolean executeDelayedTasks) {
            // send all the requests we can send now
            // 步骤1:检测发送条件,并将请求放入KafkaChannel.send字段,待发送。
            
            trySend(now);
    
            // ensure we don't poll any longer than the deadline for
            // the next scheduled task 步骤2:计算超时时间
            timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
            //步骤3,4:调用NetworkClient.poll()方法,并检测是否有中断请求。
            clientPoll(timeout, now);
            now = time.milliseconds();//重置当前时间。
    
            // handle any disconnects by failing the active requests. note that disconnects must
            // be checked immediately following poll since any subsequent call to client.ready()
            // will reset the disconnect status
            checkDisconnects(now);//步骤5:根据连接状态,处理unsent中的请求
    
            // execute scheduled tasks
            if (executeDelayedTasks)//步骤6:处理定时任务
                delayedTasks.poll(now);
    
            // try again to send requests since buffer space may have been
            // cleared or a connect finished in the poll
            //步骤7:检测发送条件,重新设置KafkaChannel.send字段,并超时断线重连。
            trySend(now);
    
            // fail requests that couldn't be sent if they have expired
            failExpiredRequests(now);//步骤8:处理unsent中的超时任务
        }
    

    pollNoWakeup()方法是poll()方法的变体,表示执行不可中断的poll()方法。具体方法是:在执行poll()方法前,调用disableWakeups()方法将wakeupDisabledCount加一,然后调用poll()方法。这样即使别的线程请求中断,也不会响应。
    poll(future)是poll()方法的另一个实现阻塞发送请求的功能,代码如下:

    /**
         * Block indefinitely until the given request future has finished.
         * @param future The request future to await.
         * @throws WakeupException if {@link #wakeup()} is called from another thread
         */
        public void poll(RequestFuture<?> future) {
            while (!future.isDone())//循环检测Future,即请求的完成情况。
                poll(Long.MAX_VALUE);//请求未完成,则调用poll()方法
        }
    

    在ConsumerNetworkClient.send()方法中,会将待发送的请求封装成ClientRequest,然后保存在unsent集合中等待发送,代码如下:

    /**
         * Send a new request. Note that the request is not actually transmitted on the
         * network until one of the {@link #poll(long)} variants is invoked. At this
         * point the request will either be transmitted successfully or will fail.
         * Use the returned future to obtain the result of the send. Note that there is no
         * need to check for disconnects explicitly on the {@link ClientResponse} object;
         * instead, the future will be failed with a {@link DisconnectException}.
         * @param node The destination of the request
         * @param api The Kafka API call
         * @param request The request payload
         * @return A future which indicates the result of the send.
         */
        public RequestFuture<ClientResponse> send(Node node,
                                                  ApiKeys api,
                                                  AbstractRequest request) {
            long now = time.milliseconds();
            RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
            RequestHeader header = client.nextRequestHeader(api);
            RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
            //创建ClientRequest对象,并保存到unsent集合中
            put(node, new ClientRequest(now, true, send, future));
            return future;
        }
    

    这里用到了回调对象RequestFutureCompletionHandler。RequestFutureHandler接口只有onComplete()方法,此接口的另一个实现是前面介绍的KafkaProducer端的请求回调(Sender的一个内部类)。


    image.png

    ConsumerNetworkClient.RequestFutureCompletionHandler.onComplete()方法的代码如下:

    public void onComplete(ClientResponse response) {
                if (response.wasDisconnected()) {//因连接故障而产生的ClientResponse对象
                    ClientRequest request = response.request();
                    RequestSend send = request.request();
                    ApiKeys api = ApiKeys.forId(send.header().apiKey());
                    int correlation = send.header().correlationId();
                    log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                            api, request, correlation, send.destination());
                    //调用继承自父类 RequestFuture 的raise()方法
                    raise(DisconnectException.INSTANCE);
                } else {
                    complete(response);//调用继承自父类RequestFuture的complete()方法
                }
            }
    

    从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,还继承了RequestFuture类。RequestFuture是一个泛型类,核心字段如下:

    • isDone:表示当前的请求是否已经完成,无论正常完成还是出现异常,这个字段都是置为true。
    • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空表示出现异常,反之表示正常完成。
    • value:记录请求正常完成时收到的响应,与exception互斥。此字段非空表示正常完成,反之表示出现异常。
    • listeners: RequestFutureListener集合,用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常的情况。
      在RequestFuture中有两处设计模式的使用:一处是compose()方法,使用了配适器模式;另一处是chain()方法,使用了责任链的模式。compose()方法相关代码如下:
    /**
         *
         * 它将RequestFuture<T>适配成RequestFuture<S>
         * Convert from a request future of one type to another type
         * @param adapter The adapter which does the conversion
         * @param <S> The type of the future adapted to
         * @return The new future
         */
        public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
            final RequestFuture<S> adapted = new RequestFuture<S>();//添加配置器后返回的结果
            //在当前RequestFuture上添加监听器。
            addListener(new RequestFutureListener<T>() {
                @Override
                public void onSuccess(T value) {
                    adapter.onSuccess(value, adapted);
                }
    
                @Override
                public void onFailure(RuntimeException e) {
                    adapter.onFailure(e, adapted);
                }
            });
            return adapted;
        }
    

    下图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播过程。当调用RequestFuture<T>对象的complete()或onFailure方法,然后调用RequestFutureAdapter<T, S>的对应方法,最终调用RequestFuture<S>对象的对应方法。


    使用compose()方法适配.jpg

    RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture直接传递事件。代码如下:

    public void chain(final RequestFuture<T> future) {
            addListener(new RequestFutureListener<T>() {//添加监听器
                @Override
                public void onSuccess(T value) {
                    //通过监听器value传递给下一个RequestFuture对象
                    future.complete(value);
                }
    
                @Override
                public void onFailure(RuntimeException e) {
                    future.raise(e);//通过监听器将异常传递给下一个RequestFuture对象。
                }
            });
        }
    

    RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法。
    简单介绍ConsumerNetworkClient几个常用的功能:

    • awaitMetadataUpdate()方法:循环调用poll()方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完毕。
    • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
    • put()方法:向unsent中添加请求。
    • schedule()方法:向delayedTasks队列中添加定时任务。
    • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

    相关文章

      网友评论

          本文标题:Kafka源码分析-Consumer(3)-ConsumerNe

          本文链接:https://www.haomeiwen.com/subject/rmwtfqtx.html