美文网首页程序员
Dubbo系列2-初始化(二)

Dubbo系列2-初始化(二)

作者: 梦孤 | 来源:发表于2018-08-28 18:04 被阅读0次

    spring容器启动的最后一步,就是实例化Bean;从上一节我们看到,dubbo中的service在注册到spring容器中时是一个ServiceBean,这个ServiceBean可以理解成一个适配器,避免了dubbo中的ServcieConfig和spring直接集成。

    public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
        private static final long serialVersionUID = 213195494150089726L;
    
        private static transient ApplicationContext SPRING_CONTEXT;
    
        private final transient Service service;
    
        private transient ApplicationContext applicationContext;
    
        private transient String beanName;
    
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                export();
            }
        }
        /** spring容器在实例化Bean的过程中会调用这个方法 */
        public void afterPropertiesSet() throws Exception {
            ...
            // 前面一大段都先不看,都是用来确保service发布的环境已经初始化好,后面再回过来细看,核心是下面这个export()方法
            if (!isDelay()) {
                export();
            }
        }
    
    }
    

    export方法在ServiceBean的父类ServiceConfig中,而在ServiceConfig中,又依次调用方法export() -> doExport() -> doExportUrls():

    private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);
            for (ProtocolConfig protocolConfig : protocols) {
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
    }
    

    从这个方法可以看出来,一个service是同时支持多个协议的,在Service类上的标注可以这么写:@Service(protocol = {"dubbo","rest"}),继续往下看:

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 前面的一大段还是忽略,主要是为service的export提供参数的,包括所有的方法名称、重试次数、接口版本号等
    ...
        // 关键是这里,调用具体协议的export方法
        Exporter<?> exporter = protocol.export(wrapperInvoker);
    ...
    

    先看看Dubbo协议是怎么export Service的,进入DubboProtocal类的export方法:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
            // 主要看这个方法
            openServer(url);
            optimizeSerialization(url);
            return exporter;
        }
    ...
    private ExchangeServer createServer(URL url) {
            // send readonly event when server closes, it's enabled by default
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            ExchangeServer server;
            try {
                // 进入交换层
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    

    dubbo协议使用的是默认的HeaderExchanger:

    /**
     * DefaultMessenger
     *
     *
     */
    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        @Override
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    
        @Override
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }
    

    可以看到,在bind方法中通过调用Transport层的bind方法来创建ExchangerServer,来看看Transporters类的bind方法:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handlers == null || handlers.length == 0) {
                throw new IllegalArgumentException("handlers == null");
            }
            ChannelHandler handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().bind(url, handler);
        }
    

    由于dubbo协议默认使用netty来进行远程调用,我们看看NettyTransporter:

    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        @Override
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        @Override
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
    }
    

    可以看到,最终就是启动了NettyServer。

    相关文章

      网友评论

        本文标题:Dubbo系列2-初始化(二)

        本文链接:https://www.haomeiwen.com/subject/wdbwiftx.html