美文网首页
dubbo服务端接收消息处理

dubbo服务端接收消息处理

作者: 剑道_7ffc | 来源:发表于2020-06-14 16:28 被阅读0次

    NettyHandler#messageReceived

    从NettyServer的doOpen得到处理流程是NettyHandler。

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
    

    handler的对象是NettyClient(MultiMessageHandler(HeartbeatHandler(AllDispatcher(DecodeHandler(HeaderExchangeHandler(ExchangeHandlerAdapter)))))),所以进入DubboProtocol$ExchangeHandlerAdapter的received方法

    HeaderExchangeHandler.received

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {//请求方法
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {//双向
                        handleRequest(exchangeChannel, request);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {//响应方法
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {//字符串
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {//接收
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    

    DubboProtocol$ExchangeHandlerAdapter.received

    public void received(Channel channel, Object message) throws RemotingException {
            //RpcInvocation
        if (message instanceof Invocation) {
            reply((ExchangeChannel) channel, message);
    
        } else {
            super.received(channel, message);
        }
    }
    

    ExchangeHandlerAdapter#reply

    public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
        Invocation inv = (Invocation) message;
        //获取服务端对应的invoker
        Invoker<?> invoker = getInvoker(channel, inv);
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        //调用方法
        Result result = invoker.invoke(inv);
        //结果返回
        return result.completionFuture().thenApply(Function.identity());
    }
    

    DubboProtocol#getInvoker

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        //接口名称
        String path = inv.getAttachments().get(PATH_KEY);
    
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
        return exporter.getInvoker();
    }
    

    AbstractProtocol#exporterMap

    protected final Map<String, Exporter<?>> exporterMap;

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        //获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成如${group}/copm.my.practice.dubbo.ISayHelloService:${version}:20880
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        return exporter;
    }
    

    invoker.invoke(inv)

    invoker=InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))

    AbstractProxyInvoker

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        /**
         * proxy:接口的实现类的对象
         * type:接口的类型
         * url: registry://ip:port...
         */
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    
    public Object invokeMethod (Object o, String n, Class[]p, Object[]v) throws
    java.lang.reflect.InvocationTargetException {
        com.my.dubbo.PayServiceImpl w;
        try {
            w = ((com.my.dubbo.PayServiceImpl) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("pay".equals($2) && $3.length == 1) {
                return ($w) w.pay((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.my.dubbo.PayServiceImpl.");
    }
    

    相关文章

      网友评论

          本文标题:dubbo服务端接收消息处理

          本文链接:https://www.haomeiwen.com/subject/mmixxktx.html