美文网首页
dubbo源码解析之处理请求(五)

dubbo源码解析之处理请求(五)

作者: binecy | 来源:发表于2019-03-19 20:44 被阅读0次

    源码分析基于dubbo 2.7.1

    NettyServerHandler继承了netty的ChannelDuplexHandler,这里只关注channelRead方法

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                handler.received(channel, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    

    异步处理

    handler就是NettyServer, 前面说过NettyServer是一个装饰类,它会调用到AllChannelHandler.received

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            ...
        }
    }
    

    就是给ExecutorService提交一个任务,异步处理请求。

    看看ChannelEventRunnable.run,会根据state进行不同的处理。但实际操作都转发到handler继续处理。

    解码

    再看看DecodeHandler.received

        public void received(Channel channel, Object message) throws RemotingException {
            // 解码
            if (message instanceof Decodeable) {
                decode(message);
            }
    
            if (message instanceof Request) {
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
                decode(((Response) message).getResult());
            }
            // 调用下一个节点
            handler.received(channel, message);
        }
    

    处理请求

    HeaderExchangeHandler

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // 处理请求
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                // 处理响应
                handleResponse(channel, (Response) message);
            } ...
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    

    twoWay表示需要响应的请求。这里调用handleRequest方法,将返回一个Response,最后通过channel将它发送给客户端。

        void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
            Response res = new Response(req.getId(), req.getVersion());
            ...
            Object msg = req.getData();
            try {
                // 处理请求
                CompletableFuture<Object> future = handler.reply(channel, msg);
                if (future.isDone()) {
                    res.setStatus(Response.OK);
                    res.setResult(future.get());
                    channel.send(res);  // 发送结果
                    return;
                }
                future.whenComplete((result, t) -> {
                    ...
                });
            } catch (Throwable e) {
                ...
            }
        }
    

    处理完成后通过channel.send(res);发送结果给客户端,这个方法会调用NettyChannel

        public void send(Object message, boolean sent) throws RemotingException {
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                // 发送结果
                ChannelFuture future = channel.write(message);
                if (sent) { // 是否发送超时
                    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.getCause();    // 抛出异常
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
    
            if (!success) {
                ...
        }
    

    可以看到,也是通过netty发送结果。

    业务逻辑处理

    handler.reply终于调用到DubboProtocol.requestHandler了

            public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
                Invocation inv = (Invocation) message;
                // 获取Invoker
                Invoker<?> invoker = getInvoker(channel, inv);
                ...
                
                RpcContext rpcContext = RpcContext.getContext();
                rpcContext.setRemoteAddress(channel.getRemoteAddress());
                // 调用Invoker
                Result result = invoker.invoke(inv);
    
                if (result instanceof AsyncRpcResult) {
                    return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
                } else {
                    return CompletableFuture.completedFuture(result);
                }
            }
    

    这里也是通过invoker调用,逐渐调用到我们的业务方法。

    先看看getInvoker方法

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
    
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
        return exporter.getInvoker();
    }
    

    很简单,从exporterMap中获取DubboExporter,再获取对应的Invoker。
    这里可以回顾上篇文章说到的DubboProtocol.export方法,它用invoker构建了DubboExporter,缓存到exporterMap中。

    invoker的创建

    那么invoker在哪里创建的呢?

    这时要回到dubbo server启动,那里说到ServiceConfig.doExportUrlsFor1Protocol

    if (registryURLs != null && registryURLs.size() > 0) {
        for (URL registryURL : registryURLs) {
            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
            Exporter<?> exporter = protocol.export(wrapperInvoker);
        }
    }
    

    是的,invoker就是在这里创建的。(注意,参数ref就是业务逻辑的实现类)

    proxyFactory默认使用的是JavassistProxyFactory。

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // 生成动态代理类
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    

    再回顾一下RegistryProtocol.doLocalExport方法,为Invoker添加了包装类,生成了InvokerDelegete。所以invoker装饰层次为ProtocolFilterWrapper&Invoker ---> RegistryProtocol&InvokerDelegete ---> DelegateProviderMetaDataInvoker ---> JavassistProxyFactory&AbstractProxyInvoker。
    (ProtocolFilterWrapper下面会说到)

    JavassistProxyFactory中创建的invoke,会通过Wrapper调用到实际的业务方法。

    Wrapper.getWrapper也是动态生成代理类,JavassistProxyFactory正是通过它调用业务方法。
    Wrapper会拼凑代码字符串,再通过javassist生成代理类。
    过程比较繁琐,直接看生成的代理类吧。

    原接口

    public interface HelloService {
        String hello(String user) ;
        String hello2(String user) ;
    }
    

    生成的Wrapper,关键的方法在invokeMethod

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{ 
        com.dubbo.start.service.HelloService w; 
        try{
            w = ((com.dubbo.start.service.HelloService)$1); 
        } catch(Throwable e){ 
            throw new IllegalArgumentException(e); 
        } 
        try{ 
            if( "hello".equals( $2 )  &&  $3.length == 1 ) {
                return ($w)w.hello((java.lang.String)$4[0]); 
            } 
            if( "hello2".equals( $2 )  &&  $3.length == 1 ) {  
                return ($w)w.hello2((java.lang.String)$4[0]);
            } 
        } catch(Throwable e) {     
            throw new java.lang.reflect.InvocationTargetException(e);  
        }
    }
    

    $1, $2, $3在javassist中表示方法参数。
    可以看到,通过方法名和参数数调用对应的逻辑方法。
    所以dubbo暴露的接口就尽量不要方法重构了

    Filter

    前面也说过了,DubboProtocol处理前,会经过包装类ProtocolListenerWrapper,ProtocolFilterWrapper。

    ProtocolFilterWrapper.export 会调用buildInvokerChain,为每一个filter创建一个invoker,

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    ...
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                };
            }
        }
        return last;
    }
    

    dubbo通过url中的service.filter参数查找filter扩展类,可以dubbo:service标签上添加filter属性,用逗号分隔多个filter。

    dubbo默认有如下Filter

    • EchoFilter
    • ClassLoaderFilter
    • GenericFilter
    • ContextFilter
    • TraceFilter
    • TimeoutFilter
    • MonitorFilter
    • ExceptionFilter
    • ValidationFilter
    • TpsLimitFilter

    可以通过Filter实现限流:
    Dubbo之限流TpsLimitFilter源码分析

    相关文章

      网友评论

          本文标题:dubbo源码解析之处理请求(五)

          本文链接:https://www.haomeiwen.com/subject/bkogmqtx.html