美文网首页dubbo
dubbo - server的bind过程分析

dubbo - server的bind过程分析

作者: 晴天哥_王志 | 来源:发表于2019-05-21 23:56 被阅读37次

    开篇

    这篇文章主要是为了讲清楚dubbo server端在bind过程中整个调用链,之前在dubbo服务发布的流程中已经讲解过在dubbo的服务发布过程中底层最终是通过bind()方法来实现监听的。
    这篇文章会对bind的过程进行细化讲解,包括核心的Exchangers、HeaderExchanger、HeaderExchangeServer等类。

    调用链

    服务调用链路图
    说明:
    • 重点关注类Exchangers、HeaderExchanger、HeaderExchangeServer、Transporters、NettyTransporter、NettyServer。

    DubboProtocol

    public class DubboProtocol extends AbstractProtocol {
    
       public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            // 省略相关代码
            openServer(url);
            optimizeSerialization(url);
            return exporter;
        }
    
        private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client can export a service which's only for server to invoke
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                } else {
                    // server supports reset, use together with override
                    server.reset(url);
                }
            }
        }
    
        private ExchangeServer createServer(URL url) {
            // 省略相关代码
            ExchangeServer server;
            try {
                // 核心的代码绑定
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    }
    

    说明:

    • DubboProtocol类的createServer()方法调用Exchangers.bind()进入Exchangers类。

    Exchangers & HeaderExchanger

    public class Exchangers {
    
        public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    
            // 绑定过程的入口
            return getExchanger(url).bind(url, handler);
        }
    
    
        public static Exchanger getExchanger(URL url) {
            String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
            return getExchanger(type);
        }
    
        // header=com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
        public static Exchanger getExchanger(String type) {
            return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
        }
    }
    
    
    
    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }
    

    说明:

    • Exchangers的bind()方法内部调用getExchanger(url).bind()。
    • getExchanger()方法内部返回HeaderExchanger对象,涉及SPI机制。
    • getExchanger(url).bind()执行HeaderExchanger的bind()方法。
    • HeaderExchanger的bind内部返回HeaderExchangeServer对象。
    • HeaderExchangeServer类的构造函数参数是Transporters.bind()返回的server值。
    • Transporters.bind()方法返回NettyServer对象。

    HeaderExchangeServer

    public class HeaderExchangeServer implements ExchangeServer {
    
        private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
                new NamedThreadFactory(
                        "dubbo-remoting-server-heartbeat",
                        true));
        private final Server server;
        // heartbeat timer
        private ScheduledFuture<?> heatbeatTimer;
        // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
        private int heartbeat;
        private int heartbeatTimeout;
        private AtomicBoolean closed = new AtomicBoolean(false);
    
        public HeaderExchangeServer(Server server) {
            if (server == null) {
                throw new IllegalArgumentException("server == null");
            }
            this.server = server;
            this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            startHeatbeatTimer();
        }
    
        public void reset(URL url) {
            server.reset(url);
            try {
                if (url.hasParameter(Constants.HEARTBEAT_KEY)
                        || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
                    int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
                    int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
                    if (t < h * 2) {
                        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
                    }
                    if (h != heartbeat || t != heartbeatTimeout) {
                        heartbeat = h;
                        heartbeatTimeout = t;
                        startHeatbeatTimer();
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    
        @Deprecated
        public void reset(com.alibaba.dubbo.common.Parameters parameters) {
            reset(getUrl().addParameters(parameters.getParameters()));
        }
    
        public void send(Object message) throws RemotingException {
            if (closed.get()) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
            }
            server.send(message);
        }
    
        public void send(Object message, boolean sent) throws RemotingException {
            if (closed.get()) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
            }
            server.send(message, sent);
        }
    }
    

    说明:

    • HeaderExchangeServer类包含变量Server server,server为NettyServer对象。
    • HeaderExchangeServer对相当于对Server类进行了一层分装,所有对Server层的操作都通过HeaderExchangeServer进行封装并对外提供服务。

    Transporters

    public class Transporters {
    
        public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handlers == null || handlers.length == 0) {
                throw new IllegalArgumentException("handlers == null");
            }
            ChannelHandler handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
    
            return getTransporter().bind(url, handler);
        }
    
        // netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
        // netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
        // mina=com.alibaba.dubbo.remoting.transport.mina.MinaTransporter
        public static Transporter getTransporter() {
            return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
        }
    }
    

    说明:

    • Transporters.bind()方法通过getTransporter()返回NettyTransporter对象,涉及SPI机制。
    • getTransporter().bind()方法执行到NettyTransporter.bind()方法,返回NettyServer对象。

    NettyTransporter

    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
    }
    
    
    
    public class NettyServer extends AbstractServer implements Server {
    
        private Map<String, Channel> channels; // <ip:port, channel>
    
        private ServerBootstrap bootstrap;
    
        private org.jboss.netty.channel.Channel channel;
    
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    
        @Override
        protected void doOpen() throws Throwable {
    
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
    
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
    
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
    
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }
    
        @Override
        protected void doClose() throws Throwable {
            try {
                if (channel != null) {
                    // unbind.
                    channel.close();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
                if (channels != null && channels.size() > 0) {
                    for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                        try {
                            channel.close();
                        } catch (Throwable e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                if (bootstrap != null) {
                    // release external resource.
                    bootstrap.releaseExternalResources();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                if (channels != null) {
                    channels.clear();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        }
    
        public Collection<Channel> getChannels() {
            Collection<Channel> chs = new HashSet<Channel>();
            for (Channel channel : this.channels.values()) {
                if (channel.isConnected()) {
                    chs.add(channel);
                } else {
                    channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
                }
            }
            return chs;
        }
    
        public Channel getChannel(InetSocketAddress remoteAddress) {
            return channels.get(NetUtils.toAddressString(remoteAddress));
        }
    
        public boolean isBound() {
            return channel.isBound();
        }
    
    }
    

    说明:

    • NettyTransporter的bind()返回NettyServer对象。
    • NettyTransporter的connect()返回NettyClient对象。
    • NettyServer内部绑定的流程都是Netty相关的服务。

    GrizzlyTransporter

    public class GrizzlyTransporter implements Transporter {
    
        public static final String NAME = "grizzly";
    
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new GrizzlyServer(url, listener);
        }
    
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new GrizzlyClient(url, listener);
        }
    
    }
    
    
    
    public class GrizzlyServer extends AbstractServer {
    
        private static final Logger logger = LoggerFactory.getLogger(GrizzlyServer.class);
    
        private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
    
        private TCPNIOTransport transport;
    
        public GrizzlyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
        }
    
        @Override
        protected void doOpen() throws Throwable {
            FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
            filterChainBuilder.add(new TransportFilter());
    
            filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
            filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
            TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
            ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
            config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
            String threadpool = getUrl().getParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_THREADPOOL);
            if (Constants.DEFAULT_THREADPOOL.equals(threadpool)) {
                int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
                config.setCorePoolSize(threads).setMaxPoolSize(threads)
                        .setKeepAliveTime(0L, TimeUnit.SECONDS);
            } else if ("cached".equals(threadpool)) {
                int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
                config.setCorePoolSize(0).setMaxPoolSize(threads)
                        .setKeepAliveTime(60L, TimeUnit.SECONDS);
            } else {
                throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
            }
            builder.setKeepAlive(true).setReuseAddress(false)
                    .setIOStrategy(SameThreadIOStrategy.getInstance());
            transport = builder.build();
            transport.setProcessor(filterChainBuilder.build());
            transport.bind(getBindAddress());
            transport.start();
        }
    
        @Override
        protected void doClose() throws Throwable {
            try {
                transport.stop();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        }
    
        public boolean isBound() {
            return !transport.isStopped();
        }
    
        public Collection<Channel> getChannels() {
            return channels.values();
        }
    
        public Channel getChannel(InetSocketAddress remoteAddress) {
            return channels.get(NetUtils.toAddressString(remoteAddress));
        }
    
        @Override
        public void connected(Channel ch) throws RemotingException {
            channels.put(NetUtils.toAddressString(ch.getRemoteAddress()), ch);
            super.connected(ch);
        }
    
        @Override
        public void disconnected(Channel ch) throws RemotingException {
            channels.remove(NetUtils.toAddressString(ch.getRemoteAddress()));
            super.disconnected(ch);
        }
    
    }
    

    说明:

    • GrizzlyTransporter和NettyTransporter对象的实现逻辑是一致的。



    HeaderExchangeServer类关系图



    Transport 依赖图



    Server 实现类图



    Client 实现类图

    相关文章

      网友评论

        本文标题:dubbo - server的bind过程分析

        本文链接:https://www.haomeiwen.com/subject/eifczqtx.html