美文网首页dubbo
Dubbo 连接数控制

Dubbo 连接数控制

作者: 晴天哥_王志 | 来源:发表于2020-01-22 13:21 被阅读0次

    客户端连接控制

    <dubbo:reference interface="com.foo.BarService" connections="10" />
    
    或
    
    <dubbo:service interface="com.foo.BarService" connections="10" />
    
    • 限制客户端服务使用连接不能超过 10 个。
    • 如果 <dubbo:service> 和 <dubbo:reference> 都配了 connections,<dubbo:reference> 优先。
    • 配置参数为connections。
    public class DubboProtocol extends AbstractProtocol {
    
        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            // String CONNECTIONS_KEY = "connections";
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
            // 如果设置了最大连接数,就不走共享连接,直接创建指定个数的连接。
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
    
    
        private ExchangeClient initClient(URL url) {
    
            // client type setting.
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    client = Exchangers.connect(url, requestHandler);
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
            return client;
        }
    }
    
    • 客户端的连接数控制是通过创建ExchangeClient的过程中进行控制。
    • 如果设置了connections的变量,那么就只创建指定数量的的ExchangeClient对象。

    服务端连接控制

    <dubbo:provider protocol="dubbo" accepts="10" />
    
    或
    
    <dubbo:protocol name="dubbo" accepts="10" />
    
    • 限制服务器端接受的连接不能超过 10 个。
    • 配置参数为accepts。
    public class NettyServer extends AbstractServer implements Server {
    
        private Map<String, Channel> channels; // <ip:port, channel>
        private ServerBootstrap bootstrap;
        private io.netty.channel.Channel channel;
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
    
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    }
    
    
    public abstract class AbstractServer extends AbstractEndpoint implements Server {
    
        protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
        private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
        ExecutorService executor;
        private InetSocketAddress localAddress;
        private InetSocketAddress bindAddress;
        // 服务端连接控制
        private int accepts;
        private int idleTimeout = 600; //600 seconds
    
        public void connected(Channel ch) throws RemotingException {
            // If the server has entered the shutdown process, reject any new connection
            if (this.isClosing() || this.isClosed()) {
                logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
                ch.close();
                return;
            }
    
            Collection<Channel> channels = getChannels();
            // 超过服务端连接控制,直接关闭连接。
            if (accepts > 0 && channels.size() > accepts) {
                logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
                ch.close();
                return;
            }
            super.connected(ch);
        }
    }
    
    • 服务端的连接控制数通过NettyServer的connected()方法在处理连接事件的时候进行判断。
    • 如果连接数的数量超过了accepts的值那么就抛出异常并关闭连接数。

    连接数说明

    • 客户端没有配置connections,使用共享连接,以 ip+port 确定一个连接。
    • 客户端配置了connections,客户端配置了两个引用(两个reference)且一个配置了connections=“2”,另一个没有配置connections,则没有配置connects的引用使用共享连接产生一个Client连接,配置connections=“2”使用独立连接,且连接数为2,则产生两个Client连接,所以一共产生三个Client连接。

    参考

    《Dubbo进阶二》——RPC协议之网络传输原理

    相关文章

      网友评论

        本文标题:Dubbo 连接数控制

        本文链接:https://www.haomeiwen.com/subject/vcvuzctx.html