美文网首页
DUBBO-组件 Invocation,Invoker,Dire

DUBBO-组件 Invocation,Invoker,Dire

作者: C_99f1 | 来源:发表于2018-11-03 17:22 被阅读95次

    dubbo-重要组件

    /**  封装请求实体类的信息和参数 包含Invoker
     * Invocation. (API, Prototype, NonThreadSafe)
     */
    public interface Invocation {
        String getMethodName();
        Class<?>[] getParameterTypes();
        Object[] getArguments();
        Map<String, String> getAttachments();
        String getAttachment(String key);
        String getAttachment(String key, String defaultValue);
        Invoker<?> getInvoker();
    }
    
    /**  Invocation 基础实现类
     * RpcInvocation. 
     */
    public class RpcInvocation implements Invocation, Serializable {
       private String methodName;
       private Class<?>[] parameterTypes;
       private Object[] arguments;
       private Map<String, String> attachments;
       private transient Invoker<?> invoker;
       public RpcInvocation() 
       public RpcInvocation(Invocation invocation, Invoker<?> invoker) 
       public RpcInvocation(Invocation invocation)
       public RpcInvocation(Method method, Object[] arguments)
       public RpcInvocation(Method method, Object[] arguments, Map<String, String> attachment) 
       public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments) 
       public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] 
       arguments,Map<String,String> attachments) 
       public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, 
       String> attachments, Invoker<?> invoker) 
    }
    
    
    /**  RpcInvocation 基础实现类 增加一个功能decode  decode请求实体
     * RpcInvocation. 
     */
    public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    
        private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class);
        private Channel channel;
        private byte serializationType;
        private InputStream inputStream;
        private Request request;
        private volatile boolean hasDecoded;
        public DecodeableRpcInvocation(Channel channel, Request request, InputStream is, byte id)   
        public void decode() throws Exception { }
        @Override
        public Object decode(Channel channel, InputStream input) throws IOException 
         
    }
    
    
    
    
    /**  
     *一个节点
     */
    interface Node extends Node
         URL getUrl(); 
         boolean isAvailable();
         void destroy();
    
    /**  
     *Invoker   =  protocol  ref(Class<T> type, URL url)
     */
    interface Invoker{
        Class getInterface();
        Result invoke(Invocation invocation)throws RpcException;
    }
    /**  
     *Invoker 基础实现类
     */
    abstract class AbstractInvoker  implements Invoker<T>   
        public AbstractInvoker(Class<T> type, URL url)
        public AbstractInvoker(Class<T> type, URL url, String[] keys)
        public AbstractInvoker(Class<T> type, URL url, Map<String, String> attachment) 
       private static Map<String, String> convertAttachment(URL url, String[] keys) { }
        @Override
        public Class<T> getInterface() 
        @Override
        public URL getUrl() {   }
        @Override
        public boolean isAvailable() { }
        protected void setAvailable(boolean available)
        @Override
        public void destroy() {  }
        public boolean isDestroyed() {return destroyed.get();    }
       @Override
        public String toString() {  return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().toString());    }
        @Override
        public Result invoke(Invocation inv) throws RpcException { return doInvoke(invocation);    }
        protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    
     
    
    
    /**  
     *  包装了URL和Invoker
     */
    public class InvokerWrapper<T> implements Invoker<T>  
        private final Invoker<T> invoker;
        private final URL url;
        public InvokerWrapper(Invoker<T> invoker, URL url) 
        @Override
        public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
    
    
    /**  
     *  包装了URL和Invoker
     */
    public static class InvokerDelegete<T> extends InvokerWrapper<T>
            private final Invoker<T> invoker;
            public InvokerDelegete(Invoker<T> invoker, URL url) {
                super(invoker, url);
                this.invoker = invoker;
            }
            public Invoker<T> getInvoker() {  if (invoker instanceof InvokerDelegete) {
                   return ((InvokerDelegete<T>) invoker).getInvoker();
            } else {
                    return invoker;}}
       
    
    /**  
     *  集成了AbstractInvoker  AbstractInvoker父类会被调用invoke(子类doinvoke)
     *   protocol ref- getClients(url)  会调用netty 连接服务端
     */
    public class DubboInvoker<T> extends AbstractInvoker<T> {
        private final ExchangeClient[] clients;
        private final AtomicPositiveInteger index = new AtomicPositiveInteger();
        private final String version;
        private final ReentrantLock destroyLock = new ReentrantLock();
        private final Set<Invoker<?>> invokers;
        public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
            this(serviceType, url, clients, null);}
    
        public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
            super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});}
    
        @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
        }
    }
    
    
    
    /**  
     *  继承Invoker   AbstractInvoker父类会被调用invoke(子类doinvoke)
     *  
     */
    public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
        protected final Directory<T> directory;
        private volatile Invoker<T> stickyInvoker = null;
    
        public AbstractClusterInvoker(Directory<T> directory) {
            this(directory, directory.getUrl());
        }
    
        public AbstractClusterInvoker(Directory<T> directory, URL url) {
        }
        protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException 
        private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException 
        private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                    List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
                throws RpcException 
    
        public Result invoke(final Invocation invocation) throws RpcException {
            return doInvoke(invocation, invokers, loadbalance);    }
    
      
        protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    
        protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            List<Invoker<T>> invokers = directory.list(invocation);
            return invokers;
        }
    }
    
    
    
    
    
    
    
    
    
    /**  
     *  继承AbstractClusterInvoker  AbstractClusterInvoker父类会被调用 invoke( doinvoke)
     *  
     */
    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
    
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
            }
         
        }
    
    }
    
    
    
    
    
    /**  
     * Directory -目录   
     *  
     */
    public interface Directory<T> extends Node {
    
        /**
         * get service type.
         *
         * @return service type.
         */
        Class<T> getInterface();
    
        /**
         * list invokers.     
         *
         * @return invokers
         */
        List<Invoker<T>> list(Invocation invocation) throws RpcException;
    
    }
    
    
    
    
    /**  
     * AbstractDirectory继承Directory
     *  
     */
    public abstract class AbstractDirectory<T> implements Directory<T> {
        private final URL url;
       private volatile boolean destroyed = false;
       private volatile URL consumerUrl;
       private volatile List<Router> routers;
      public AbstractDirectory(URL url) {
            this(url, null);
        } public AbstractDirectory(URL url, List<Router> routers) {
            this(url, url, routers);
        }public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
            if (url == null)
                throw new IllegalArgumentException("url == null");
            this.url = url;
            this.consumerUrl = consumerUrl;
            setRouters(routers);
        }
    
        @Override
        public List<Invoker<T>> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
            List<Invoker<T>> invokers = doList(invocation);
            List<Router> localRouters = this.routers; // local reference
            if (localRouters != null && !localRouters.isEmpty()) {
                for (Router router : localRouters) {
                        if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                            invokers = router.route(invokers, getConsumerUrl(), invocation);
                        }
                }
            }
            return invokers;
        }
    
        protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
    
    }
    
    
    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
       private static final ConfiguratorFactory configuratorFactory = 
       ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
        private final String serviceKey; // Initialization at construction time, assertion not null
        private final Class<T> serviceType; // Initialization at construction time, assertion not null
        private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
        private final URL directoryUrl; 
        private final String[] serviceMethods;
        private final boolean multiGroup;
        private Protocol protocol; // Initialization at the time of injection, the assertion is not null
        private Registry registry; // Initialization at the time of injection, the assertion is not null
        private volatile boolean forbidden = false;
        private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
    private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
    
    
        private volatile Map<String, Invoker<T>> urlInvokerMap; 
    
        private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the   midway may be assigned to null, please use the local variable reference
        private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
    
        public RegistryDirectory(Class<T> serviceType, URL url) {
            super(url);
            if (serviceType == null)
                throw new IllegalArgumentException("service type is null.");
            if (url.getServiceKey() == null || url.getServiceKey().length() == 0)
                throw new IllegalArgumentException("registry serviceKey is null.");
            this.serviceType = serviceType;
            this.serviceKey = url.getServiceKey();
            this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
            String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
            this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
            String methods = queryMap.get(Constants.METHODS_KEY);
            this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
        }
    
    
        public static List<Configurator> toConfigurators(List<URL> urls) {
            if (urls == null || urls.isEmpty()) {
                return Collections.emptyList();
            }
    
            List<Configurator> configurators = new ArrayList<Configurator>(urls.size());
            for (URL url : urls) {
                if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
                    configurators.clear();
                    break;
                }
                Map<String, String> override = new HashMap<String, String>(url.getParameters());
                //The anyhost parameter of override may be added automatically, it can't change the judgement of changing url
                override.remove(Constants.ANYHOST_KEY);
                if (override.size() == 0) {
                    configurators.clear();
                    continue;
                }
                configurators.add(configuratorFactory.getConfigurator(url));
            }
            Collections.sort(configurators);
            return configurators;
        }
    
        public void setProtocol(Protocol protocol) {
            this.protocol = protocol;
        }
    
     
    
     
        @Override
        public synchronized void notify(List<URL> urls) {
            List<URL> invokerUrls = new ArrayList<URL>();
            List<URL> routerUrls = new ArrayList<URL>();
            List<URL> configuratorUrls = new ArrayList<URL>();
            for (URL url : urls) {
                String protocol = url.getProtocol();
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category)
                        || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                    routerUrls.add(url);
                } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                        || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                    configuratorUrls.add(url);
                } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    invokerUrls.add(url);
                } else {
                    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
                }
            }
            // configurators
            if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
                this.configurators = toConfigurators(configuratorUrls);
            }
            // routers
            if (routerUrls != null && !routerUrls.isEmpty()) {
                List<Router> routers = toRouters(routerUrls);
                if (routers != null) { // null - do nothing
                          setRouters(routers);
                }
            }
    
             List<Configurator> localConfigurators = this.configurators; // local reference
            // merge override parameters
            this.overrideDirectoryUrl = directoryUrl;
            if (localConfigurators != null && !localConfigurators.isEmpty()) {
                for (Configurator configurator : localConfigurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
            // providers
            refreshInvoker(invokerUrls);
        }
    
    
        /**
         * Turn urls into invokers, and if url has been refer, will not re-reference.
         *
         * @param urls
         * @return invokers
         */
        private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
            Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
            if (urls == null || urls.isEmpty()) {
                return newUrlInvokerMap;
            }
            Set<String> keys = new HashSet<String>();
            String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
            for (URL providerUrl : urls) {
                // If protocol is configured at the reference side, only the matching protocol is selected
                if (queryProtocols != null && queryProtocols.length() > 0) {
                    boolean accept = false;
                    String[] acceptProtocols = queryProtocols.split(",");
                    for (String acceptProtocol : acceptProtocols) {
                        if (providerUrl.getProtocol().equals(acceptProtocol)) {
                            accept = true;
                            break;
                        }
                    }
                    if (!accept) {
                        continue;
                    }
                }
                if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                    continue;
                }
                if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                    logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                            + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                    continue;
                }
                URL url = mergeUrl(providerUrl);
    
                String key = url.toFullString(); // The parameter urls are sorted
                if (keys.contains(key)) { // Repeated url
                    continue;
                }
                keys.add(key);
                // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
                Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { // Not in the cache, refer again
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(Constants.DISABLED_KEY)) {
                            enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(Constants.ENABLED_KEY, true);
                        }
                        if (enabled) {
                            invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(key, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }
    
    
    
        @Override
        public List<Invoker<T>> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                    "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
            }
            List<Invoker<T>> invokers = null;
            Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
            if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
                String methodName = RpcUtils.getMethodName(invocation);
                Object[] args = RpcUtils.getArguments(invocation);
                if (args != null && args.length > 0 && args[0] != null
                        && (args[0] instanceof String || args[0].getClass().isEnum())) {
                    invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(methodName);
                }
                if (invokers == null) {
                    invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
                }
                if (invokers == null) {
                    Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                    if (iterator.hasNext()) {
                        invokers = iterator.next();
                    }
                }
            }
            return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
        }
        
    }
    
    
    

    本篇总结 -dubbo一个底层通信框架代码架构包含很多的包装设计模式和相互依赖 下面简述一下各个组件的作用和依赖关系

    Invocation-RpcInvocation -DecodeableRpcInvocation
    1. Invocation 是一个请求实体信息接口 包含了调用的方法 参数等等基础信息, RpcInvocation是invocation的基础实现类 ,DecodeableRpcInvocation 主要多了一个功能decode方法 用于在服务端接收请求的时候把字节反序列到本身的对象的字段上 变成一个 readable的对象 ,同时Invocation又包含invoke 通常invoke 不被序列化
    Invoker-AbstractInvoker -DubboInvoker

    2.dubbo之间的相互引用特别多也很复杂 只要记清楚invoker 是需要调用服务target类的一个封装 由框架封装
    AbstractInvoker 在invoke方法里面调用子类的doinvoke方法
    for example dubboInvoke 继承了AbstractInvoker 所以dobboinvoke会有invoke方法 调用invoke的时候会
    调子类的doinvoker doinvoke 会在当前invoke 使用 ExchangeClient 进行远程调用 返回一个result

    Directory -AbstractDirectory-RegistryDirectory

    目录 也可以叫字典 ,和上方同样的设计模式, RegistryDirectory里面包含一个map Map<String, List<Invoker<T>>> methodInvokerMap 通过invocation 参数筛选出合适的invoke

    相关文章

      网友评论

          本文标题:DUBBO-组件 Invocation,Invoker,Dire

          本文链接:https://www.haomeiwen.com/subject/qkokxqtx.html