美文网首页程序员干货分享
dubbo 超时控制 HashedWheelTimer 源码分析

dubbo 超时控制 HashedWheelTimer 源码分析

作者: 小白菜aaa | 来源:发表于2020-10-30 15:20 被阅读0次

    你想成为rapstar吗?(bushi)
    你想成为超级列害的程序员吗?
    那就加入我们吧,加入我们吧,啦啦啦啦啦啦啦

    点击这里搅和秃头乐园欢迎你!!!!暗号:简书


    进入正文

    场景分析

    每隔60秒一次的心跳检测或发送请求超时等待响应;即:等待xxx时间就执行yyy任务;可以开启定时任务timer,周期性检测是否要执行任务来处理此类需求

    当存在有大量的心跳检测任务或超时控制任务, 如 每个超时任务都开启定时任务timer则会消耗大量资源; 只开启一个定时任务timer用来检测大量的任务则会遇到 遍历耗时长问题 (每次循环遍历所有任务检测时间)如:java.util.Timer

    更好的解决思路,开启一个定时任务time,拆分大量的定时任务,每次只检测部分任务,没有被检测到的任务在理论上保证不需要执行;即可以节省资源,也很大程度缓解遍历耗时长的问题 如: HashedWheelTimer

    HashedWheelTimer使用方式

    
    // 第一步编写TimeTask任务
        private class PrintTask implements TimerTask {
            @Override
            public void run(Timeout timeout) {
                final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                System.out.println("task :" + LocalDateTime.now().format(formatter));
            }
        }
    // 第二步创建 HashedWheelTime
        Timer timer =  new HashedWheelTimer();
    // 第三步 创建timeout (1s后执行PrintTask#run方法)
        timer.newTimeout(new PrintTask(), 1, TimeUnit.SECONDS);
    

    Hash轮

    HashedWheelTimer创建的时候创建worker线程(用于循环检测),走一圈有多少个bucket, 走一个bucket 隔需要多久时间 tickDuration

       public HashedWheelTimer(
                ThreadFactory threadFactory,
                long tickDuration, TimeUnit unit, int ticksPerWheel,
                long maxPendingTimeouts) {
     
            // Normalize ticksPerWheel to power of two and initialize the wheel.
            // wheel 是刚好大于等于ticksPerWheel中最小的2的N次方 N是整数
            wheel = createWheel(ticksPerWheel);
     
            // mask 用二进制表示 111111...; 用“与操作”计算wheel下标
            mask = wheel.length - 1;
     
            // 走一个bucket 需要花费的纳秒时长
            this.tickDuration = unit.toNanos(tickDuration);
     
            // 创建work线程
            workerThread = threadFactory.newThread(worker);
        }
    

    第一个timeout 创建时,启动worker线程,开始循环检测,并记录开始时间startTime;新创建timeout时不会计算加入到具体bucket中

     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
            // 调用start方法
            start();
            // deadline 计算方式中有减去worker线程启动的时间点startTime
            long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
            
            HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
            // 新创建的timeout添加到timeousts集合,此时并没有计算timeout在哪个bucket
            timeouts.add(timeout);
            return timeout;
        }
     
        public void start() {
            switch (WORKER_STATE_UPDATER.get(this)) {
                case WORKER_STATE_INIT:
                    // 初始化状态只启动一次
                    if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) 
                        workerThread.start();
                    }
                    break;
            }
     
            // Wait until the startTime is initialized by the worker.
            while (startTime == 0) {
                try {
                    // 调用newTimeout方法的线程等待 worker线程设置 startTime值
                    startTimeInitialized.await();
                } catch (InterruptedException ignore) {
                    // Ignore - it will be ready very soon.
                }
            }
        }
    

    worker作为HashedWheelTimer循环检测调度器,会将新创建的timeout添加到对应的bucket,并周期性检测每个bucket取出到期(deadline)的timeout 执行对应的TimeTask

      private final class Worker implements Runnable {
         public void run() { 
                // worker线程初始化 startTime
                startTime = System.nanoTime();
                // 通知第一次调用newTimeout方法的线程
                startTimeInitialized.countDown();
                // HashedWheelTimer 不停,循环不停
                do {
                    // 等待并返回走到下一个bucket时对应deadline
                    final long deadline = waitForNextTick();
                    if (deadline > 0) {
                        // &操作计算当前bucket[]下标
                        int idx = (int) (tick & mask);
                        // 将已取消任务从bucket中删除
                        processCancelledTasks();
                        HashedWheelBucket bucket = wheel[idx];
                        // 将新创建的timeout 放到对应的bucket中     
                        transferTimeoutsToBuckets();
                        // 将bucket中所有的timeout 判断超时处理
                        bucket.expireTimeouts(deadline);
                        tick++;
                    }
                } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
     
              // HashedWheelTimer停止后处理,未到期的timeout对象 处理逻辑 忽略。。。
            }
        。。。
      }
    

    waitForNextTick() 等待期间做什么呢?答案是sleep;那么在sleep期间如果有timeout到期了怎么办?答案是睡醒来在执行;那么tickDuration 就控制着时间的精准度,值越小精准度越高;如tickDuration=1秒,在1.1秒时添加了timeout,2秒时间点扫描发现没有超时,3s秒才扫描到timeout已超时;此时已经过了1.9秒;若tickDuration非常小worker线程则越繁忙

    dubbo中发送请求超时检测time中tickDuration=30毫秒

      private long waitForNextTick() {
                long deadline = tickDuration * (tick + 1);
     
                for (; ; ) {
                    final long currentTime = System.nanoTime() - startTime;
                    long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
                    // deadline 已过则返回
                    if (sleepTimeMs <= 0) {
                        if (currentTime == Long.MIN_VALUE) {
                            return -Long.MAX_VALUE;
                        } else {
                            return currentTime;
                        }
                    }
                    if (isWindows()) {
                        sleepTimeMs = sleepTimeMs / 10 * 10;
                    }
     
                    try {
                        Thread.sleep(sleepTimeMs);
                    } catch (InterruptedException ignored) {
                        。。。
                    }
                }
            }
    

    未到期但被取消的任务会放到 cancelledTimeouts集合中, worker线程周期性的执行do while方法会调用processCancelledTasks() 会从bucket中删除调对应的timeout;如请求1s超时检测任务,正常返回之后需要cancel调对应的timeout

    worker线程执行 transferTimeoutsToBuckets() 将新创建的timeout根据deadline计算出remainingRounds几轮与idx 并加入到对应的bucket

        private void transferTimeoutsToBuckets() {
                // 每次循环最多加10W个
                for (int i = 0; i < 100000; i++) {
                    HashedWheelTimeout timeout = timeouts.poll();
                    if (timeout == null) {
                        break;
                    }
                    if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                        continue;
                    }
     
                    long calculated = timeout.deadline / tickDuration;
                    timeout.remainingRounds = (calculated - tick) / wheel.length;
     
                    // tick 表示当前的的tick数,
                    // 如果timeout创建较早,可能导致已过期还没加入到bucket,则此时 calculated < tick
                    final long ticks = Math.max(calculated, tick);
                    int stopIndex = (int) (ticks & mask);
     
                    HashedWheelBucket bucket = wheel[stopIndex];
                    bucket.addTimeout(timeout);
                }
            }
    

    bucket.expireTimeouts(deadline); 将到期的bucket中所有的timeout判断并进行超时处理,到期的timeout进行超时处理,即调用TimeTask的run方法;此处风险:TimeTask.run() 方法是阻塞同步执行的,如果某task执行时间过久则会阻塞Worker线程 进一步拖慢超时检测流程

        void expireTimeouts(long deadline) {
                HashedWheelTimeout timeout = head;
                // 处理所有timeout
                while (timeout != null) {
                    HashedWheelTimeout next = timeout.next;
                    if (timeout.remainingRounds <= 0) {
                        next = remove(timeout);
                        if (timeout.deadline <= deadline) {
                            // 执行TimeTask.run方法
                            timeout.expire();
                        } else {
                           // error
                        }
                    } else if (timeout.isCancelled()) {
                        // 取消则直接删除
                        next = remove(timeout);
                    } else {
                        // 经过一轮后
                        timeout.remainingRounds--;
                    }
                    timeout = next;
                }
            }
    

    Dubbo中应用

    请求超时检测,发送请求时创建 timeout

    
    public class DefaultFuture extends CompletableFuture<Object> {
     
        // 创建HashedWheelTimer 对象
        public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
                new NamedThreadFactory("dubbo-future-timeout", true),
                30, // tickDuration=30毫秒 检测一次
                TimeUnit.MILLISECONDS);
     
        private Timeout timeoutCheckTask;
        
        // 发送请求时会创建DefaultFuture,并创建timeout对象
        public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
           。。。。
            // 创建TimeTask
            TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
            // 创建timeout对象
            future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
            return future;
        }
     
       private static class TimeoutCheckTask implements TimerTask {
     
            private final Long requestID;
     
            @Override
            public void run(Timeout timeout) {
                DefaultFuture future = DefaultFuture.getFuture(requestID);
                if (future.getExecutor() != null) {
                    future.getExecutor().execute(() -> notifyTimeout(future));
                } else {
                    notifyTimeout(future);
                }
            }
     
            private void notifyTimeout(DefaultFuture future) {
                // create exception response.
                Response timeoutResponse = new Response(future.getId());
                。。。
               // 超时则返回构造timeoutResponse 返回
                DefaultFuture.received(future.getChannel(), timeoutResponse, true);
            }
        }
    

    请求正常返回时cancel timeout

         public static void received(Channel channel, Response response, boolean timeout) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    Timeout t = future.timeoutCheckTask;
                    if (!timeout) {
                        // cancel timeout
                        t.cancel();
                    }
                    future.doReceived(response);
                } else {
                    // warn
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        } 
    

    发送同步请求,将超时检测交给HashedWheelTimer中worker线程 ,那么发送请求的工作线程在做什么呢?答案是在阻塞等待response

    
    public class AsyncToSyncInvoker<T> implements Invoker<T> {
     
       
        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult = invoker.invoke(invocation);
     
            try {
                // 同步阻塞等待
                if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { 
                    asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
               // error
            } catch (Throwable e) {
                throw new RpcException(e.getMessage(), e);
            }
            return asyncResult;
        }
       。。。
    }
    

    结尾

    本文到这里就结束了,感谢看到最后的朋友,都看到最后了,点个赞再走啊,如有不对之处还请多多指正。
    关注我带你解锁更多精彩内容

    面试资料领取只需: 点击这里领取!!!!暗号:简书

    相关文章

      网友评论

        本文标题:dubbo 超时控制 HashedWheelTimer 源码分析

        本文链接:https://www.haomeiwen.com/subject/lwgkmktx.html