美文网首页
dubbo provider端接收request与发送respo

dubbo provider端接收request与发送respo

作者: 雕兄L | 来源:发表于2019-06-23 11:42 被阅读0次

    Dubbo接收request的过程其实和client端接收response的过程有点像。建议先阅读上一篇文章。
    从NettyHandler开始受到tcp请求,再一直传递到HeaderExchangeHandler的过程和client端接收response过程基本一样,不再详细描述.

    @Override
       public void received(Channel channel, Object message) throws RemotingException {
           channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
           final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
           try {
               if (message instanceof Request) {
                   // handle request.
                   Request request = (Request) message;
                   if (request.isEvent()) {
                       handlerEvent(channel, request);
                   } else {
                       if (request.isTwoWay()) {
                         //处理request请求
                           handleRequest(exchangeChannel, request);
                       } else {
                           handler.received(exchangeChannel, request.getData());
                       }
                   }
               } else if (message instanceof Response) {
                 //处理response
                   handleResponse(channel, (Response) message);
               } else if (message instanceof String) {
                   if (isClientSide(channel)) {
                       Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                       logger.error(e.getMessage(), e);
                   } else {
                       String echo = handler.telnet(channel, (String) message);
                       if (echo != null && echo.length() > 0) {
                           channel.send(echo);
                       }
                   }
               } else {
                   handler.received(exchangeChannel, message);
               }
           } finally {
               HeaderExchangeChannel.removeChannelIfDisconnected(channel);
           }
       }
    
       void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
            Response res = new Response(req.getId(), req.getVersion());
            //暂时不理解这个Broken是什么意思
            if (req.isBroken()) {
                Object data = req.getData();
    
                String msg;
                if (data == null) msg = null;
                else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
                else msg = data.toString();
                res.setErrorMessage("Fail to decode request due to: " + msg);
                res.setStatus(Response.BAD_REQUEST);
    
                channel.send(res);
                return;
            }
            // find handler by message class.
            Object msg = req.getData();
            try {
                // handle data.
                //代码执行到这里,就是开始调用我们实现的业务代码去获取数据
                //这个handler就是DubboProtocol里的handler对象,调用其reply()方法执行,通过放射执行业务代码获取数据
                CompletableFuture<Object> future = handler.reply(channel, msg);
                if (future.isDone()) {
                    res.setStatus(Response.OK);
                    res.setResult(future.get());
                    channel.send(res);
                    return;
                }
                future.whenComplete((result, t) -> {
                    try {
                        if (t == null) {
                            res.setStatus(Response.OK);
                            res.setResult(result);
                        } else {
                            res.setStatus(Response.SERVICE_ERROR);
                            res.setErrorMessage(StringUtils.toString(t));
                        }
                        //把拿到的response写出去
                        channel.send(res);
                    } catch (RemotingException e) {
                        logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                    } finally {
                        // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                    }
                });
            } catch (Throwable e) {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(e));
                channel.send(res);
            }
        }
    

    当拿到response之后就会把response写出去,这里的channel对象就是HeaderExchangeChannel,后面会转发NettyChannel对象把数据写出去.

    下面看下DubboProtocol里的handler的reply实现:

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
            @Override
            public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    // need to consider backward-compatibility if it's a callback
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || !methodsStr.contains(",")) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                    + " not found in callback service interface ,invoke will be ignored."
                                    + " please update the api interface. url is:"
                                    + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext rpcContext = RpcContext.getContext();
                    boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
                    if (supportServerAsync) {
                        CompletableFuture<Object> future = new CompletableFuture<>();
                        rpcContext.setAsyncContext(new AsyncContextImpl(future));
                    }
                    rpcContext.setRemoteAddress(channel.getRemoteAddress());
                    //获取Invoker对象执行invoke
                    //这里会把filter执行一遍然后拿到结果
                    Result result = invoker.invoke(inv);
    
                    if (result instanceof AsyncRpcResult) {
                        return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
                    } else {
                        return CompletableFuture.completedFuture(result);
                    }
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    ...省略
        };
    

    相关文章

      网友评论

          本文标题:dubbo provider端接收request与发送respo

          本文链接:https://www.haomeiwen.com/subject/cfsofctx.html