美文网首页
Kafka源码分析-Consumer(11)-总结(1)

Kafka源码分析-Consumer(11)-总结(1)

作者: 陈阳001 | 来源:发表于2019-01-05 13:34 被阅读0次

    现在回顾KafkaConsumer的整体架构。KafkaConsumer依赖SubscriptionState管理订阅topic集合和Partition的消费状态,通过ConsumerCoordinator与服务端的GroupCoordinator交互,完成Rebalance操作并请求最近提交的offset。Fetcher负责从Kafka拉取消息并进行解析,同时参与position的重置操作,提供获取指定topic的集群元数据的操作。
    上面所有的请求都是通过ConsumerNetworkClient缓存并发送的,在ConsumerNetworkClient还维护了定时任务队列,用来完成HeartbeatTask任务和AutoCommit任务,NetworkClient在接收到上述请求的响应时会调用相应的回调,最终交给对应的*Handler以及RequestFuture的监听器进行处理。
    KafkaConsumer的整体架构图如下:


    KafkaConsumer整体架构.jpg

    下面分析下KafkaConsumer剩余的代码:
    KafkaConsumer不是一个线程安全的类,为了防止多线程并发操作造成的一致性问题,KafkaConsumer提供了多线程并发的检测机制,涉及到的方法是acquire()和release()。这两个方法的代码如下:

     /**
         * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
         * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
         * supported).
         * @throws IllegalStateException if the consumer has been closed
         * @throws ConcurrentModificationException if another thread already has the lock
         */
        private void acquire() {
            ensureNotClosed();
            long threadId = Thread.currentThread().getId();
            //记录当前线程Id,通过CAS操作完成
            if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
                throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
            //记录重入次数
            refcount.incrementAndGet();
        }
    
    /**
         * Release the light lock protecting the consumer from multi-threaded access.
         */
        private void release() {
            if (refcount.decrementAndGet() == 0)
                //更新线程id
                currentThread.set(NO_CURRENT_THREAD);
        }
    

    上面的这两个方法并不是一种锁的实现,仅仅是实现了检测多线程并发操作的检测。使用CAS保证线程间的可见性。
    分析下KafkaConsumer.poll()方法进行消息消费的整个流程以及相关代码如下:

        public ConsumerRecords<K, V> poll(long timeout) {
            acquire();//防止多线程操作。
            try {
                if (timeout < 0)
                    throw new IllegalArgumentException("Timeout must not be negative");
    
                // poll for new data until the timeout expires
                long start = time.milliseconds();
                long remaining = timeout;
                do {
                    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);//核心方法
                    if (!records.isEmpty()) {//检测是否有消息返回
                        // before returning the fetched records, we can send off the next round of fetches
                        // and avoid block waiting for their responses to enable pipelining while the user
                        // is handling the fetched records.
                        //
                        // NOTE: since the consumed position has already been updated, we must not allow
                        // wakeups or any other errors to be triggered prior to returning the fetched records.
                        // Additionally, pollNoWakeup does not allow automatic commits to get triggered.
                        // 为了提升效率,在对records集合进行处理之前,先发送一次FetchRequest。这样,线程处理完
                        // 本次records集合的操作,与 FetchRequest 及其响应在网络上传输以及在服务端的处理就变成并行
                        // 这样就减少等待网络IO的时间。
                        fetcher.sendFetches();//创建并缓存 FetchRequest
                        
                        //调用ConsumerNetworkClient.pollNoWakeUp()方法将FetchRequest发送
                        //出去。这里的pollNoWakeup()方法并不会阻塞,不能被中断,不会执行定时任务
                        client.pollNoWakeup();
    
                        if (this.interceptors == null)
                            return new ConsumerRecords<>(records);
                        else
                            //调用ConsumerInterceptors
                            return this.interceptors.onConsume(new ConsumerRecords<>(records));
                    }
    
                    long elapsed = time.milliseconds() - start;//计算超时时间
                    remaining = timeout - elapsed;
                } while (remaining > 0);
    
                return ConsumerRecords.empty();
            } finally {
                release();
            }
        }
    

    在消费完成后,客户端还要commit offset,手动提交调offset用commitSync(),手动异步提交用commitAsync(),自动commit offset使用定时任务AutoCommitTask。
    在pollOnce()方法中先通过ConsumerCoordinator与GroupCoordinator交互完成Rebalance操作,之后从GroupCoordinator获取最近一次提交的offset(或重置position),最后才是使用Fetcher,从Kafka获取消息进行消费。pollOnce()方法如下:

    /**
         * Do one round of polling. In addition to checking for new data, this does any needed
         * heart-beating, auto-commits, and offset updates.
         * @param timeout The maximum time to block in the underlying poll
         * @return The fetched records (may be empty)
         */
        private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
            // ensure we have partitions assigned if we expect to
            
            //如果是AUTO_TOPICS或AUTO_PATTERN订阅模式
            if (subscriptions.partitionsAutoAssigned())
                coordinator.ensurePartitionAssignment();//完成rebalance操作
    
            // fetch positions if we have partitions we're subscribed to that we
            // don't know the offset for
            //恢复SubscriptionState中对应的TopicPartitionState状态
            //主要是committed字段和position字段
            if (!subscriptions.hasAllFetchPositions())
                updateFetchPositions(this.subscriptions.missingFetchPositions());
    
            long now = time.milliseconds();
    
            // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
            client.executeDelayedTasks(now);//执行定时任务,HeartbeatTask和AutoCommitTask
    
            // init any new fetches (won't resend pending fetches)
            //尝试从completedFetches缓存中解析消息
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    
            // if data is available already, e.g. from a previous network client poll() call to commit,
            // then just return it immediately
            if (!records.isEmpty())
                return records;
    
            fetcher.sendFetches();//创建并缓存FetchRequest请求
            client.poll(timeout, now);//发送FetchRequest
            return fetcher.fetchedRecords();//从completedFetches缓存中解析消息
        }
    

    相关文章

      网友评论

          本文标题:Kafka源码分析-Consumer(11)-总结(1)

          本文链接:https://www.haomeiwen.com/subject/atvrrqtx.html