美文网首页
dubbo源码解析之处理请求(五)

dubbo源码解析之处理请求(五)

作者: binecy | 来源:发表于2019-03-19 20:44 被阅读0次

源码分析基于dubbo 2.7.1

NettyServerHandler继承了netty的ChannelDuplexHandler,这里只关注channelRead方法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

异步处理

handler就是NettyServer, 前面说过NettyServer是一个装饰类,它会调用到AllChannelHandler.received

public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        ...
    }
}

就是给ExecutorService提交一个任务,异步处理请求。

看看ChannelEventRunnable.run,会根据state进行不同的处理。但实际操作都转发到handler继续处理。

解码

再看看DecodeHandler.received

    public void received(Channel channel, Object message) throws RemotingException {
        // 解码
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }
        // 调用下一个节点
        handler.received(channel, message);
    }

处理请求

HeaderExchangeHandler

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // 处理请求
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            // 处理响应
            handleResponse(channel, (Response) message);
        } ...
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

twoWay表示需要响应的请求。这里调用handleRequest方法,将返回一个Response,最后通过channel将它发送给客户端。

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        ...
        Object msg = req.getData();
        try {
            // 处理请求
            CompletableFuture<Object> future = handler.reply(channel, msg);
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);  // 发送结果
                return;
            }
            future.whenComplete((result, t) -> {
                ...
            });
        } catch (Throwable e) {
            ...
        }
    }

处理完成后通过channel.send(res);发送结果给客户端,这个方法会调用NettyChannel

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // 发送结果
            ChannelFuture future = channel.write(message);
            if (sent) { // 是否发送超时
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();    // 抛出异常
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if (!success) {
            ...
    }

可以看到,也是通过netty发送结果。

业务逻辑处理

handler.reply终于调用到DubboProtocol.requestHandler了

        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            Invocation inv = (Invocation) message;
            // 获取Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            ...
            
            RpcContext rpcContext = RpcContext.getContext();
            rpcContext.setRemoteAddress(channel.getRemoteAddress());
            // 调用Invoker
            Result result = invoker.invoke(inv);

            if (result instanceof AsyncRpcResult) {
                return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
            } else {
                return CompletableFuture.completedFuture(result);
            }
        }

这里也是通过invoker调用,逐渐调用到我们的业务方法。

先看看getInvoker方法

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    ...

    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    return exporter.getInvoker();
}

很简单,从exporterMap中获取DubboExporter,再获取对应的Invoker。
这里可以回顾上篇文章说到的DubboProtocol.export方法,它用invoker构建了DubboExporter,缓存到exporterMap中。

invoker的创建

那么invoker在哪里创建的呢?

这时要回到dubbo server启动,那里说到ServiceConfig.doExportUrlsFor1Protocol

if (registryURLs != null && registryURLs.size() > 0) {
    for (URL registryURL : registryURLs) {
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

        Exporter<?> exporter = protocol.export(wrapperInvoker);
    }
}

是的,invoker就是在这里创建的。(注意,参数ref就是业务逻辑的实现类)

proxyFactory默认使用的是JavassistProxyFactory。

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // 生成动态代理类
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

再回顾一下RegistryProtocol.doLocalExport方法,为Invoker添加了包装类,生成了InvokerDelegete。所以invoker装饰层次为ProtocolFilterWrapper&Invoker ---> RegistryProtocol&InvokerDelegete ---> DelegateProviderMetaDataInvoker ---> JavassistProxyFactory&AbstractProxyInvoker。
(ProtocolFilterWrapper下面会说到)

JavassistProxyFactory中创建的invoke,会通过Wrapper调用到实际的业务方法。

Wrapper.getWrapper也是动态生成代理类,JavassistProxyFactory正是通过它调用业务方法。
Wrapper会拼凑代码字符串,再通过javassist生成代理类。
过程比较繁琐,直接看生成的代理类吧。

原接口

public interface HelloService {
    String hello(String user) ;
    String hello2(String user) ;
}

生成的Wrapper,关键的方法在invokeMethod

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{ 
    com.dubbo.start.service.HelloService w; 
    try{
        w = ((com.dubbo.start.service.HelloService)$1); 
    } catch(Throwable e){ 
        throw new IllegalArgumentException(e); 
    } 
    try{ 
        if( "hello".equals( $2 )  &&  $3.length == 1 ) {
            return ($w)w.hello((java.lang.String)$4[0]); 
        } 
        if( "hello2".equals( $2 )  &&  $3.length == 1 ) {  
            return ($w)w.hello2((java.lang.String)$4[0]);
        } 
    } catch(Throwable e) {     
        throw new java.lang.reflect.InvocationTargetException(e);  
    }
}

$1, $2, $3在javassist中表示方法参数。
可以看到,通过方法名和参数数调用对应的逻辑方法。
所以dubbo暴露的接口就尽量不要方法重构了

Filter

前面也说过了,DubboProtocol处理前,会经过包装类ProtocolListenerWrapper,ProtocolFilterWrapper。

ProtocolFilterWrapper.export 会调用buildInvokerChain,为每一个filter创建一个invoker,

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (filters.size() > 0) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {
                ...
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }
            };
        }
    }
    return last;
}

dubbo通过url中的service.filter参数查找filter扩展类,可以dubbo:service标签上添加filter属性,用逗号分隔多个filter。

dubbo默认有如下Filter

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • TraceFilter
  • TimeoutFilter
  • MonitorFilter
  • ExceptionFilter
  • ValidationFilter
  • TpsLimitFilter

可以通过Filter实现限流:
Dubbo之限流TpsLimitFilter源码分析

相关文章

网友评论

      本文标题:dubbo源码解析之处理请求(五)

      本文链接:https://www.haomeiwen.com/subject/bkogmqtx.html