美文网首页
Dubbo存在内存泄漏

Dubbo存在内存泄漏

作者: 书唐瑞 | 来源:发表于2022-01-01 18:04 被阅读0次

    [前提]

    本文阐述的内容基于Dubbo 2.7.3版本

    [正文]

    在这里插入图片描述

    如上图, 在上一篇com.alibaba.fastjson存在内存泄漏 文章中, 我们解释了线程的threadLocals中存在内存泄漏的情况, 仔细观察上图, 还有一个地方, 在threadLocalMap属性的内部也存在662.19KB的内存, 这个地方也不正常.

    查看threadLocalMap的内部属性

    在这里插入图片描述

    在org.apache.dubbo.rpc.FutureContext内部的result属性`持有`651.93KB内存, 这个result的内容实际是Dubbo接口的返回值. 而这个FutureContext对象也是在调用外部Dubbo接口的时候创建的.

    我们简单分析下一个业务线程调用Dubbo接口的过程.

    当业务线程需要调用外部Dubbo接口的时候, 会创建一个DefaultFuture, 每个DefaultFuture对象都会有唯一的一个Id与之对应, 并把这个关系放到Map中

    在这里插入图片描述
    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // 存储 id <-> DefaultFuture关系
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    

    由于接口调用都会有超时, 那么如何实现这个超时机制呢?

    将一个超时任务放入到时间轮上.

    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture
    public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        // 超时检查
        timeoutCheck(future);
        return future;
    }
    
    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#timeoutCheck
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }
    

    在我之前的 Netty中的时间轮(v3.10.7) 文章中介绍了时间轮, 通过时间轮的方式, 检测任务是否超时到期了.

    接下来就是将DefaultFuture等信息组装成一个FutureContext放入到线程的ThreadLocalMap中.


    在这里插入图片描述
    // org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                asyncRpcResult.subscribeTo(responseFuture);
                // 
                FutureContext.getContext().setCompatibleFuture(responseFuture);
                return asyncRpcResult;
            }
        }
    }
    
    // org.apache.dubbo.rpc.FutureContext
    public class FutureContext {
        private static InternalThreadLocal<FutureContext> futureTL = new InternalThreadLocal<FutureContext>() {
            @Override
            protected FutureContext initialValue() {
                return new FutureContext();
            }
        };
    
        public static FutureContext getContext() {
            return futureTL.get();
        }
    }
    

    综上, id和DefaultFuture存到Map中, 设置好定时任务, DefaultFuture放到线程ThreadLocalMap中之后, 线程就可以被阻塞了

    在这里插入图片描述

    线程调用get方法一直被阻塞.

    当Dubbo的提供方返回数据之后, Dubbo调用方的线程就可以处理响应了.

    在这里插入图片描述

    如上图, Dubbo调用方的Dubbo线程开始处理响应.

    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean)
    public static void received(Channel channel, Response response, boolean timeout) {
        try {
            // 从Map中移除id <-> DefaultFuture的关系
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // 将时间轮上的超时任务取消掉
                    t.cancel();
                }
                // 
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    

    首先从Map中移除id <-> DefaultFuture的关系, 将时间轮上的超时任务取消掉.

    接下来就是把响应数据设置到DefaultFuture上, 并唤醒之前阻塞的线程.

    // org.apache.dubbo.remoting.exchange.support.DefaultFuture#doReceived
    private void doReceived(Response res) {
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        }
    }
    
    // java.util.concurrent.CompletableFuture#complete
    public boolean complete(T value) {
        // 将结果设置到DefaultFuture
        boolean triggered = completeValue(value);
        // 唤醒阻塞线程
        postComplete();
        return triggered;
    }
    

    使用到了异步编程

    被唤醒的阻塞线程就可以从DefaultFuture中拿到已设置好的数据,继续后续的业务处理.

    在这里插入图片描述

    如上图, 消费者的线程ThreadLocalMap中的FutureContext中的result值却一直留在线程的ThreadLocalMap中了,并不会被释放掉, 造成了内存泄漏.

    相关文章

      网友评论

          本文标题:Dubbo存在内存泄漏

          本文链接:https://www.haomeiwen.com/subject/dotaqrtx.html