美文网首页
DUBBO-组件 protocol export

DUBBO-组件 protocol export

作者: C_99f1 | 来源:发表于2018-11-09 23:25 被阅读22次
    @SPI("dubbo")
    public interface Protocol {
        int getDefaultPort();
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
        void destroy();
    }
    
    
    public abstract class AbstractProtocol implements Protocol {
        protected final Logger logger = LoggerFactory.getLogger(getClass());
        protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
        //TODO SOFEREFENCE
        protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
        protected static String serviceKey(URL url) {
            int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
            return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
                    url.getParameter(Constants.GROUP_KEY));
        }
        protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
            return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
        }
    
        @Override
        public void destroy()  
    }
    
    
    
    
    /***
    *抽象代理协议 会调用子类的doexport()方法  子类i有 http  rest webservice 
    * RMI  hessian
     */
    public abstract class AbstractProxyProtocol extends AbstractProtocol {
    
    }
    
    
    
    
    /**
     * dubbo protocol support.
     */
    public class DubboProtocol extends AbstractProtocol {
        public static final String NAME = "dubbo";
        public static final int DEFAULT_PORT = 20880;
        private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
        private static DubboProtocol INSTANCE;
        private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
        private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
        private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
        private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
        private final Set<String> optimizers = new ConcurrentHashSet<String>();
        //consumer side export a stub service for dispatching event
        //servicekey-stubmethods
        private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
            @Override
            public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {Invoker<?> invoker = getInvoker(channel, inv);}
    
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
                } else {
                    super.received(channel, message);
                }
            }
      }
    
    
    
        public DubboProtocol() {
            INSTANCE = this;
        }
    
        public static DubboProtocol getDubboProtocol() {
            if (INSTANCE == null) {
                ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); // load
            }
            return INSTANCE;
        }
    
        public Collection<ExchangeServer> getServers() {
            return Collections.unmodifiableCollection(serverMap.values());
        }
    
        public Collection<Exporter<?>> getExporters() {
            return Collections.unmodifiableCollection(exporterMap.values());
        }
    
        Map<String, Exporter<?>> getExporterMap() {
            return exporterMap;
        }
    
        private boolean isClientSide(Channel channel) {
            InetSocketAddress address = channel.getRemoteAddress();
            URL url = channel.getUrl();
            return url.getPort() == address.getPort() &&
                    NetUtils.filterLocalHost(channel.getUrl().getIp())
                            .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
        }
    
        Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
            DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
           return exporter.getInvoker();
        }
    
    
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
            openServer(url);
            optimizeSerialization(url);
            return exporter;
        }
    
        private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    synchronized (this) {
                        server = serverMap.get(key);
                        if (server == null) {
                                                              ~~~~~~~~~~~~~~~~~~~~~
                            serverMap.put(key, createServer(url));
                        }
                    }
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }
    
        private ExchangeServer createServer(URL url) {
            // send readonly event when server closes, it's enabled by default
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    
    
    
        @Override
        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
    
        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
    
        /**
         * Get shared connection
         */
        private ExchangeClient getSharedClient(URL url) {
            String key = url.getAddress();
            ReferenceCountExchangeClient client = referenceClientMap.get(key);
            if (client != null) {
                if (!client.isClosed()) {
                    client.incrementAndGetCount();
                    return client;
                } else {
                    referenceClientMap.remove(key);
                }
            }
    
            locks.putIfAbsent(key, new Object());
            synchronized (locks.get(key)) {
                if (referenceClientMap.containsKey(key)) {
                    return referenceClientMap.get(key);
                }
    
                ExchangeClient exchangeClient = initClient(url);
                client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
                referenceClientMap.put(key, client);
                ghostClientMap.remove(key);
                locks.remove(key);
                return client;
            }
        }
    
        /**
         * Create new connection
         */
        private ExchangeClient initClient(URL url) {
    
            // client type setting.
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    client = Exchangers.connect(url, requestHandler);
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
            return client;
        }
    }
    
    --------------------------------------------------------------------------------------------------------------------------------------------
        /**
         * Exporter  一个简单封装invoke的实体类   主要的方法是getinvoker 和unexport
         *
         */
    public interface Exporter<T> {
        Invoker<T> getInvoker();
        void unexport();
    }
    
    
        /**
         * 
         *Exporter 实现类  下面的子类回调super.
         */
     public abstract class AbstractExporter<T> implements Exporter<T> {
        private final Invoker<T> invoker;
        private volatile boolean unexported = false;
        public AbstractExporter(Invoker<T> invoker) {
            if (invoker == null)
                throw new IllegalStateException("service invoker == null");
            if (invoker.getInterface() == null)
                throw new IllegalStateException("service type == null");
            if (invoker.getUrl() == null)
                throw new IllegalStateException("service url == null");
            this.invoker = invoker;
        }
    
        @Override
        public void unexport() {
            if (unexported) {
                return;
            }
            unexported = true;
            getInvoker().destroy();
        }
    
    
    }
    
    
    
    /**
     * DubboExporter
     */
    public class DubboExporter<T> extends AbstractExporter<T> {
    
        private final String key;
    
        private final Map<String, Exporter<?>> exporterMap;
    
        public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
            super(invoker);
            this.key = key;
            this.exporterMap = exporterMap;
        }
    
        @Override
        public void unexport() {
            super.unexport();
            exporterMap.remove(key);
        }
    
    }
    
    
    

    Protocol -协议下面有很多的实现类 总体来说2个功能 通过接口class和url new出一个invoke
    rest协议和dubbo协议实现类型不一样
    refer返回一个invoke invoke上篇写道是一个target类的保证可以调invoke方法调用正在的现实类

    export 需要的是一个invoker作为参数 invoker可以获取url 通过url具体可以暴露出服务 不管rest服务也好 还是dubbo服务

    Exporter 一个简单封装invoke的实体类 主要的方法是getinvoker 和unexport

    相关文章

      网友评论

          本文标题:DUBBO-组件 protocol export

          本文链接:https://www.haomeiwen.com/subject/jceaxqtx.html