美文网首页dubbo
Dubbo telnet过程介绍

Dubbo telnet过程介绍

作者: 晴天哥_王志 | 来源:发表于2019-10-28 22:55 被阅读0次

    开篇

     从 2.0.5 版本开始,dubbo 开始支持通过 telnet 命令来进行服务治理。通过telnet ip port连接上server端进行调试,具体的命令可以参考Telnet 命令参考手册

    Netty基础

    • Netty本身支持Telnet协议,同时Dubbo的NettyServer的编解码Codec支持Telnet的编解码。

    编解码支持

    TelnetCodec
    • 负责Telnet请求的编解码

    HeaderExchangeHandler

    Handler封装流程图
    • 在Dubbo协议export()过程中会对handler进行封装,HeaderExchangeHandler内部封装ExchangeHandlerAdapter对象。
    public abstract class ExchangeHandlerAdapter extends TelnetHandlerAdapter implements ExchangeHandler {
    
        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) throws RemotingException {
            return null;
        }
    }
    
    • ExchangeHandlerAdapter继承了TelnetHandlerAdapter,TelnetHandlerAdapter包含telnet()方法。
    • HeaderExchangeHandler在处理telnet命令的时候会调用TelnetHandlerAdapter的telnet()方法。
    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                   // 省略相关代码
                } else if (message instanceof Response) {
                  // 省略相关代码
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        // 省略相关代码
                    } else {
                        // handler指代的是requestHandler = new ExchangeHandlerAdapter()
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    }
    
    • HeaderExchangeHandler的received()方法针对String类型的参数执行handler.telnet()方法。
    public class TelnetHandlerAdapter extends ChannelHandlerAdapter implements TelnetHandler {
    
        private final ExtensionLoader<TelnetHandler> extensionLoader = 
                       ExtensionLoader.getExtensionLoader(TelnetHandler.class);
    
        @Override
        public String telnet(Channel channel, String message) throws RemotingException {
            String prompt = channel.getUrl().getParameterAndDecoded(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
            boolean noprompt = message.contains("--no-prompt");
            message = message.replace("--no-prompt", "");
            StringBuilder buf = new StringBuilder();
            message = message.trim();
            String command;
            if (message.length() > 0) {
                int i = message.indexOf(' ');
                if (i > 0) {
                    command = message.substring(0, i).trim();
                    message = message.substring(i + 1).trim();
                } else {
                    command = message;
                    message = "";
                }
            } else {
                command = "";
            }
    
            // 解析命令行
            if (command.length() > 0) {
                if (extensionLoader.hasExtension(command)) {
                    if (commandEnabled(channel.getUrl(), command)) {
                        try {
                            String result = extensionLoader.getExtension(command).telnet(channel, message);
                            if (result == null) {
                                return null;
                            }
                            buf.append(result);
                        } catch (Throwable t) {
                            buf.append(t.getMessage());
                        }
                    } else {
                        buf.append("Command: ");
                        buf.append(command);
                        buf.append(" disabled");
                    }
                } else {
                    buf.append("Unsupported command: ");
                    buf.append(command);
                }
            }
            if (buf.length() > 0) {
                buf.append("\r\n");
            }
            if (StringUtils.isNotEmpty(prompt) && !noprompt) {
                buf.append(prompt);
            }
            return buf.toString();
        }
    }
    
    com.alibaba.dubbo.remoting.telnet.TelnetHandler文件
    
    clear=com.alibaba.dubbo.remoting.telnet.support.command.ClearTelnetHandler
    exit=com.alibaba.dubbo.remoting.telnet.support.command.ExitTelnetHandler
    help=com.alibaba.dubbo.remoting.telnet.support.command.HelpTelnetHandler
    status=com.alibaba.dubbo.remoting.telnet.support.command.StatusTelnetHandler
    log=com.alibaba.dubbo.remoting.telnet.support.command.LogTelnetHandler
    ls=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.ListTelnetHandler
    ps=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.PortTelnetHandler
    cd=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.ChangeTelnetHandler
    pwd=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.CurrentTelnetHandler
    invoke=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.InvokeTelnetHandler
    trace=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.TraceTelnetHandler
    count=com.alibaba.dubbo.rpc.protocol.dubbo.telnet.CountTelnetHandler
    
    • TelnetHandlerAdapter的telnet()方法会解析除命令字符串command。
    • extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class)获取TelnetHandler的ExtensionLoader。
    • extensionLoader.getExtension(command)获取TelnetHandler的扩展类实例,在com.alibaba.dubbo.remoting.telnet.TelnetHandler文件中定义。

    InvokeTelnetHandler

    @Activate
    @Help(parameter = "[service.]method(args) ", summary = "Invoke the service method.",
            detail = "Invoke the service method.")
    public class InvokeTelnetHandler implements TelnetHandler {
    
        public static final String INVOKE_MESSAGE_KEY = "telnet.invoke.method.message";
        public static final String INVOKE_METHOD_LIST_KEY = "telnet.invoke.method.list";
        public static final String INVOKE_METHOD_PROVIDER_KEY = "telnet.invoke.method.provider";
    
        @Override
        @SuppressWarnings("unchecked")
        public String telnet(Channel channel, String message) {
    
            String service = (String) channel.getAttribute(ChangeTelnetHandler.SERVICE_KEY);
            // 解析消息的命令,包括服务、方法、参数等。
            int i = message.indexOf("(");
            String method = message.substring(0, i).trim();
            String args = message.substring(i + 1, message.length() - 1).trim();
            i = method.lastIndexOf(".");
            if (i >= 0) {
                service = method.substring(0, i).trim();
                method = method.substring(i + 1).trim();
            }
    
            List<Object> list;
            try {
                list = JSON.parseArray("[" + args + "]", Object.class);
            } catch (Throwable t) {
                return "Invalid json argument, cause: " + t.getMessage();
            }
            StringBuilder buf = new StringBuilder();
            Method invokeMethod = null;
            ProviderModel selectedProvider = null;
            if (isInvokedSelectCommand(channel)) {
                selectedProvider = (ProviderModel) channel.getAttribute(INVOKE_METHOD_PROVIDER_KEY);
                invokeMethod = (Method) channel.getAttribute(SelectTelnetHandler.SELECT_METHOD_KEY);
            } else {
                // 遍历服务service和method
                for (ProviderModel provider : ApplicationModel.allProviderModels()) {
                    if (isServiceMatch(service, provider)) {
                        selectedProvider = provider;
                        List<Method> methodList = findSameSignatureMethod(provider.getAllMethods(), method, list);
                        if (CollectionUtils.isNotEmpty(methodList)) {
                            if (methodList.size() == 1) {
                                invokeMethod = methodList.get(0);
                            } else {
                                List<Method> matchMethods = findMatchMethods(methodList, list);
                                if (CollectionUtils.isNotEmpty(matchMethods)) {
                                    if (matchMethods.size() == 1) {
                                        invokeMethod = matchMethods.get(0);
                                    } else { //exist overridden method
                                        channel.setAttribute(INVOKE_METHOD_PROVIDER_KEY, provider);
                                        channel.setAttribute(INVOKE_METHOD_LIST_KEY, matchMethods);
                                        channel.setAttribute(INVOKE_MESSAGE_KEY, message);
                                        printSelectMessage(buf, matchMethods);
                                        return buf.toString();
                                    }
                                }
                            }
                        }
                        break;
                    }
                }
            }
    
    
            if (!StringUtils.isEmpty(service)) {
                buf.append("Use default service ").append(service).append(".");
            }
            if (selectedProvider != null) {
                if (invokeMethod != null) {
                    try {
                        Object[] array = realize(list.toArray(), invokeMethod.getParameterTypes(),
                                invokeMethod.getGenericParameterTypes());
                        long start = System.currentTimeMillis();
                        AppResponse result = new AppResponse();
                        try {
                            // 找对应方法执行invoke()命令
                            Object o = invokeMethod.invoke(selectedProvider.getServiceInstance(), array);
                            result.setValue(o);
                        } catch (Throwable t) {
                            result.setException(t);
                        }
                        long end = System.currentTimeMillis();
                        buf.append("\r\nresult: ");
                        buf.append(JSON.toJSONString(result.recreate()));
                        buf.append("\r\nelapsed: ");
                        buf.append(end - start);
                        buf.append(" ms.");
                    } catch (Throwable t) {
                        return "Failed to invoke method " + invokeMethod.getName() + ", cause: " + StringUtils.toString(t);
                    }
                } else {
                    buf.append("\r\nNo such method ").append(method).append(" in service ").append(service);
                }
            } else {
                buf.append("\r\nNo such service ").append(service);
            }
            return buf.toString();
        }
    }
    
    • 根据service 和 method 获取对应的provider对象。
    • 通过method.invoke()通过反射调用实现方法调用。

    参考

    Dubbo之telnet实现

    相关文章

      网友评论

        本文标题:Dubbo telnet过程介绍

        本文链接:https://www.haomeiwen.com/subject/mwdhvctx.html