美文网首页
dubbo技术内幕十二 Endpoint 之 Server

dubbo技术内幕十二 Endpoint 之 Server

作者: 牧羊人刘俏 | 来源:发表于2021-04-21 15:18 被阅读0次

    在上一篇对整个Client端的调用逻辑我们做了分析,这一章对Server端的源码做一些分析,整个Server端的类继承关系如下。


    image.png

    整个的继承关系还是主要分两个分支
    其中AbstractServer 主要工作在transport层
    ExchangeServer 主要工作在exchange层。
    我们从下往上的进行分析。
    AbstractServer的源码如下

    public abstract class AbstractServer extends AbstractEndpoint implements Server {
       //server对应的线程池的名字
        protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
        private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
        //对应的线程池
        ExecutorService executor;
        //local地址
        private InetSocketAddress localAddress;
        //bind的地址
        private InetSocketAddress bindAddress;
        //可以连接的客户端数(也就是Channel的数量)
        private int accepts;
        //idle超时时间
        private int idleTimeout = 600; //600 seconds
    
        public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            //绑定的本地地址(由url进行获取)
            localAddress = getUrl().toInetSocketAddress();
            //绑定的ip
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            //绑定的端口号
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = NetUtils.ANYHOST;
            }
            //由ip和port得到绑定的地址
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            //得到容许连接的客户端数量
            this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
            //idle的超时时间
            this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
            try {
                //不用说,有子类实现
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            //fixme replace this with better method
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
           //得到共享线程池
            executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
        }
    
        protected abstract void doOpen() throws Throwable;
    
        protected abstract void doClose() throws Throwable;
        
        //通过url重置server的属性
        @Override
        public void reset(URL url) {
            if (url == null) {
                return;
            }
            try {
                if (url.hasParameter(Constants.ACCEPTS_KEY)) {
                    int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
                    if (a > 0) {
                       //重置最大连接数
                        this.accepts = a;
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
            try {
                if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
                    int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
                    if (t > 0) {
                        //重置idle超时时间
                        this.idleTimeout = t;
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
            try {
                //重置线程池参数
                if (url.hasParameter(Constants.THREADS_KEY)
                        && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
                    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                    int threads = url.getParameter(Constants.THREADS_KEY, 0);
                    int max = threadPoolExecutor.getMaximumPoolSize();
                    int core = threadPoolExecutor.getCorePoolSize();
                    if (threads > 0 && (threads != max || threads != core)) {
                        if (threads < core) {
                            threadPoolExecutor.setCorePoolSize(threads);
                            if (core == max) {
                                threadPoolExecutor.setMaximumPoolSize(threads);
                            }
                        } else {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                            if (core == max) {
                                threadPoolExecutor.setCorePoolSize(threads);
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
            super.setUrl(getUrl().addParameters(url.getParameters()));
        }
        
       
    //向连接的所有的Client端的Channel发送消息
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            Collection<Channel> channels = getChannels();
            for (Channel channel : channels) {
                if (channel.isConnected()) {
                    channel.send(message, sent);
                }
            }
        }
        
       //关闭服务端
        @Override
        public void close() {
            if (logger.isInfoEnabled()) {
                logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
            ExecutorUtil.shutdownNow(executor, 100);
            try {
                super.close();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                doClose();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        }
    
        @Override
        public void close(int timeout) {
            ExecutorUtil.gracefulShutdown(executor, timeout);
            close();
        }
    
        @Override
        public InetSocketAddress getLocalAddress() {
            return localAddress;
        }
    
        public InetSocketAddress getBindAddress() {
            return bindAddress;
        }
    
        public int getAccepts() {
            return accepts;
        }
    
        public int getIdleTimeout() {
            return idleTimeout;
        }
         
     //处理Channel的connected 事件 进行检查,是否超过了accepts数量
        @Override
        public void connected(Channel ch) throws RemotingException {
            // If the server has entered the shutdown process, reject any new connection
            if (this.isClosing() || this.isClosed()) {
                logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
                ch.close();
                return;
            }
    
            Collection<Channel> channels = getChannels();
            if (accepts > 0 && channels.size() > accepts) {
                logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
                ch.close();
                return;
            }
            super.connected(ch);
        }
       
       //处理Channel的disconnected事件
        @Override
        public void disconnected(Channel ch) throws RemotingException {
            Collection<Channel> channels = getChannels();
            if (channels.isEmpty()) {
                logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
            }
            super.disconnected(ch);
        }
    
    }
    

    还是选取NettyServer作为AbstractServer的实现类来进行分析。源码如下

    public class NettyServer extends AbstractServer implements Server {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
        //远程Client与Channel的映射关系,这里的Channel是dubbo的Channel
        private Map<String, Channel> channels; // <ip:port, channel>
        //服务端的启动类
        private ServerBootstrap bootstrap;
        //服务端对应的channel
        private io.netty.channel.Channel channel;
        //主Group
        private EventLoopGroup bossGroup;
        //工作Group
        private EventLoopGroup workerGroup;
    
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
           //直接通过nettyServerHandler拿到所有关联的channels
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    //将最后创建的nettyServerHandler注册为handler
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            //拿到server对应的channel
            channel = channelFuture.channel();
    
        }
    
        @Override
        protected void doClose() throws Throwable {
            try {
                if (channel != null) {
                    // unbind.
                    channel.close();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
                if (channels != null && channels.size() > 0) {
                    for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                        try {
                            channel.close();
                        } catch (Throwable e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                if (bootstrap != null) {
                    bossGroup.shutdownGracefully();
                    workerGroup.shutdownGracefully();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                if (channels != null) {
                    channels.clear();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        }
    
        @Override
        public Collection<Channel> getChannels() {
            Collection<Channel> chs = new HashSet<Channel>();
            for (Channel channel : this.channels.values()) {
                if (channel.isConnected()) {
                    chs.add(channel);
                } else {
                    channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
                }
            }
            return chs;
        }
    
        @Override
        public Channel getChannel(InetSocketAddress remoteAddress) {
            return channels.get(NetUtils.toAddressString(remoteAddress));
        }
    
        @Override
        public boolean isBound() {
            return channel.isActive();
        }
    
    }
    

    我们比较好奇NettyServer里面的channels是怎么来的,这个channels是在NettyServerHandler里面进行维护的,那NettyServerHandler的channels是怎么来的呢,因为NettyServerHandler是NettyServer出入站处理器,所以所有的netty事件的处理都由他负责,于是dubbo的开发者们通过装饰模式,缓存了所有的生效的channels,关键源码如下

    //缓存的channel信息
     private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
    
    //重写channelActive方法
    @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
           //为什么不用super调用??? 
            ctx.fireChannelActive();
           //如果对应的Channel生效,转换成dubbo的channel并缓存
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                if (channel != null) {
                   //在这里进一步缓存
                    channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
                }
                handler.connected(channel);
            } finally {
                //如果失效,进行缓存的移除
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
     //重写失效方法
    @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            //移除缓存
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                //移除
                channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
                handler.disconnected(channel);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    

    如上我们通过装饰模式,就可以很方便的将所有的channel进行缓存,并返回给上层进行使用了。
    刚才的AbstractServer都是工作在transport层,我们接着分析exchange层的ExchangeServer。

    public interface ExchangeServer extends Server {
    
        //返回所有关联的ExchangeChannel
        Collection<ExchangeChannel> getExchangeChannels();
    
        //返回指定remoteAddress(也就是客户端)关联的ExchangeChannel
        ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
    }
    

    HeaderExchangeServer

    public class HeaderExchangeServer implements ExchangeServer {
    
        protected final Logger logger = LoggerFactory.getLogger(getClass());
        //这个线程池是实时的刷新关联的Client端的Channel
        private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
                new NamedThreadFactory(
                        "dubbo-remoting-server-heartbeat",
                        true));
        private final Server server;
        // heartbeat timer
        private ScheduledFuture<?> heartbeatTimer;
        // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
        private int heartbeat;
        private int heartbeatTimeout;
        private AtomicBoolean closed = new AtomicBoolean(false);
    
        public HeaderExchangeServer(Server server) {
            if (server == null) {
                throw new IllegalArgumentException("server == null");
            }
            this.server = server;
            this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
            this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            //开始刷新关联的Client端的Channel
            startHeartbeatTimer();
        }
    
        public Server getServer() {
            return server;
        }
    
        @Override
        public boolean isClosed() {
            return server.isClosed();
        }
    
        private boolean isRunning() {
            Collection<Channel> channels = getChannels();
            for (Channel channel : channels) {
                //只要有一个客户端的channel连着,都算是runnning状态
                if (channel.isConnected()) {
                    return true;
                }
            }
            return false;
        }
    
        @Override
        public void close() {
            doClose();
            server.close();
        }
    
        @Override
        public void close(final int timeout) {
            startClose();
            if (timeout > 0) {
                final long max = (long) timeout;
                final long start = System.currentTimeMillis();
                if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
                    sendChannelReadOnlyEvent();
                }
                while (HeaderExchangeServer.this.isRunning()
                        && System.currentTimeMillis() - start < max) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
            doClose();
            server.close(timeout);
        }
    
        @Override
        public void startClose() {
            server.startClose();
        }
        //向所有关联的Client端的Channel发送准备关闭的消息
        private void sendChannelReadOnlyEvent() {
            Request request = new Request();
            request.setEvent(Request.READONLY_EVENT);
            request.setTwoWay(false);
            request.setVersion(Version.getProtocolVersion());
    
            Collection<Channel> channels = getChannels();
            for (Channel channel : channels) {
                try {
                    if (channel.isConnected())
                        channel.send(request, getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true));
                } catch (RemotingException e) {
                    logger.warn("send cannot write message error.", e);
                }
            }
        }
    
        private void doClose() {
            if (!closed.compareAndSet(false, true)) {
                return;
            }
            //停止刷新
            stopHeartbeatTimer();
            try {
                scheduled.shutdown();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
         
      //将transport层的channel封装成exchange层的channel
         @Override
        public Collection<ExchangeChannel> getExchangeChannels() {
            Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
           //其实这个server就是NettyServer
            Collection<Channel> channels = server.getChannels();
            if (channels != null && !channels.isEmpty()) {
                for (Channel channel : channels) {
                    exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
                }
            }
            return exchangeChannels;
        }
    
        @Override
        public ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress) {
            Channel channel = server.getChannel(remoteAddress);
            return HeaderExchangeChannel.getOrAddChannel(channel);
        }
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Collection<Channel> getChannels() {
            return (Collection) getExchangeChannels();
        }
    
        @Override
        public Channel getChannel(InetSocketAddress remoteAddress) {
            return getExchangeChannel(remoteAddress);
        }
    
        @Override
        public boolean isBound() {
            return server.isBound();
        }
    
        @Override
        public InetSocketAddress getLocalAddress() {
            return server.getLocalAddress();
        }
    
        @Override
        public URL getUrl() {
            return server.getUrl();
        }
    
        @Override
        public ChannelHandler getChannelHandler() {
            return server.getChannelHandler();
        }
    
        @Override
        public void reset(URL url) {
            server.reset(url);
            try {
                if (url.hasParameter(Constants.HEARTBEAT_KEY)
                        || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
                    int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
                    int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
                    if (t < h * 2) {
                        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
                    }
                    if (h != heartbeat || t != heartbeatTimeout) {
                        heartbeat = h;
                        heartbeatTimeout = t;
                        startHeartbeatTimer();
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    
        @Override
        @Deprecated
        public void reset(com.alibaba.dubbo.common.Parameters parameters) {
            reset(getUrl().addParameters(parameters.getParameters()));
        }
    
        @Override
        public void send(Object message) throws RemotingException {
            if (closed.get()) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
            }
            server.send(message);
        }
    
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (closed.get()) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
            }
            server.send(message, sent);
        }
    
        private void startHeartbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                heartbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                            @Override
                            public Collection<Channel> getChannels() {
                                return Collections.unmodifiableCollection(
                                        HeaderExchangeServer.this.getChannels());
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS);
            }
        }
    
        private void stopHeartbeatTimer() {
            try {
                ScheduledFuture<?> timer = heartbeatTimer;
                if (timer != null && !timer.isCancelled()) {
                    timer.cancel(true);
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            } finally {
                heartbeatTimer = null;
            }
        }
    
    }
    

    通过这样的分析之后,我们可以看到Server端的实现与Client的实现一脉相承,如果认真点看,难度不大

    相关文章

      网友评论

          本文标题:dubbo技术内幕十二 Endpoint 之 Server

          本文链接:https://www.haomeiwen.com/subject/jlfzlltx.html