美文网首页sofabolt
SOFABolt 源码分析12 - Connection 连接管

SOFABolt 源码分析12 - Connection 连接管

作者: 原水寒 | 来源:发表于2018-10-15 17:11 被阅读127次
    image.png

    上图中仅列出主要的几个类,连接管理包含以下部分:

    • Connection 连接元数据:包裹了 Netty channel 实例
    • ConnectionFactory 连接工厂:创建连接、检测连接等
    • ConnectionPool 连接池:存储 { uniqueKey, List<Connection> } ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection
    • ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
    • ConnectionManager 连接管理器:是对外的门面,包含所有与 Connection 相关的对外的接口操作
    • Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务

    一、Connection 元数据

    /**
     * An abstraction of socket channel.
     */
    public class Connection {
        // netty channel
        private Channel channel;
        // {requestId, InvokeFuture}
        private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);
        private ProtocolCode protocolCode;
        // protocolVersion
        private byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
        // 数据总线
        private Url url;
        // {id, poolKey}
        private final ConcurrentHashMap<Integer, String> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);
        private Set<String> poolKeys = new ConcurrentHashSet<String>();
    
        public Connection(Channel channel) {
            this.channel = channel;
            // 将当前的 Connection 设置到对应的 netty channel 的附属属性中
            this.channel.attr(CONNECTION).set(this);
        }
    
        public Connection(Channel channel, Url url) {
            this(channel);
            this.url = url;
            // url.getUniqueKey() 默认为 ip:port
            this.poolKeys.add(url.getUniqueKey());
        }
    
        public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url url) {
            this(channel, url);
            this.protocolCode = protocolCode;
            this.version = version;
            this.init();
        }
    
        private void init() {
            // 添加一系列的附属属性:PROTOCOL 和 VERSION 会用在 Codec 编解码中;HEARTBEAT_COUNT 和 HEARTBEAT_SWITCH 会用在心跳
            this.channel.attr(HEARTBEAT_COUNT).set(new Integer(0));
            this.channel.attr(PROTOCOL).set(this.protocolCode);
            this.channel.attr(VERSION).set(this.version);
            this.channel.attr(HEARTBEAT_SWITCH).set(true);
        }
    
        public boolean isFine() {
            // channel 可用
            return this.channel != null && this.channel.isActive();
        }
    
        public InvokeFuture getInvokeFuture(int id) {
            return this.invokeFutureMap.get(id);
        }
    
        public InvokeFuture addInvokeFuture(InvokeFuture future) {
            return this.invokeFutureMap.putIfAbsent(future.invokeId(), future);
        }
    
        public InvokeFuture removeInvokeFuture(int id) {
            return this.invokeFutureMap.remove(id);
        }
    
        /**
         * Do something when closing.
         */
        public void onClose() {
            // 遍历 invokeFutureMap,对每一个 InvokeFuture
            // 1. createConnectionClosedResponse 并设置到 InvokeFuture中,唤醒阻塞线程
            // 2. 取消超时任务
            // 3. 异步执行回调
            Iterator<Entry<Integer, InvokeFuture>> iter = invokeFutureMap.entrySet().iterator();
            while (iter.hasNext()) {
                Entry<Integer, InvokeFuture> entry = iter.next();
                iter.remove();
                InvokeFuture future = entry.getValue();
                if (future != null) {
                    future.putResponse(future.createConnectionClosedResponse(this.getRemoteAddress()));
                    future.cancelTimeout();
                    future.tryAsyncExecuteInvokeCallbackAbnormally();
                }
            }
        }
    
        /**
         * Close the connection.
         */
        public void close() {
            this.getChannel().close();
        }
    
        /**
        * Whether invokeFutures is completed
        */
        public boolean isInvokeFutureMapFinish() {
            return invokeFutureMap.isEmpty();
        }
    }
    
    

    二、ConnectionFactory 连接工厂

    ================================= ConnectionFactory =================================
    /**
     * Factory that creates connections.
     */
    public interface ConnectionFactory {
    
        /**
         * Initialize the factory.
         */
        void init(ConnectionEventHandler connectionEventHandler);
    
        /**
         * Create a connection use #BoltUrl
         */
        Connection createConnection(Url url) throws Exception;
    
        /**
         * Create a connection according to the IP and port.
         * Note: The default protocol is RpcProtocol.
         */
        Connection createConnection(String targetIP, int targetPort, int connectTimeout) throws Exception;
    
        /**
         * Create a connection according to the IP and port.
         * Note: The default protocol is RpcProtocolV2, and you can specify the version
         */
        Connection createConnection(String targetIP, int targetPort, byte version, int connectTimeout) throws Exception;
    }
    
    ================================= ConnectionFactory =================================
    public abstract class AbstractConnectionFactory implements ConnectionFactory {
        private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1, new NamedThreadFactory("bolt-netty-client-worker", true));
        // RpcServer or RpcClient
        private final ConfigurableInstance  confInstance;
        private final Codec                 codec;
        private final ChannelHandler        heartbeatHandler;
        // 业务逻辑处理器
        private final ChannelHandler        handler;
        protected Bootstrap                 bootstrap;
    
        @Override
        public void init(final ConnectionEventHandler connectionEventHandler) {
             // 初始化 netty 一系列配置
        }
    
        @Override
        public Connection createConnection(Url url) throws Exception {
            // 创建 netty channel
            Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
            // 创建 Connection
            Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
            // 发布 ConnectionEventType.CONNECT 事件
            channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
            return conn;
        }
    
        protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) throws Exception {
            // prevent unreasonable value, at least 1000
            connectTimeout = Math.max(connectTimeout, 1000);
            String address = targetIP + ":" + targetPort;
            // 设置连接超时时间
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
            // 发起连接
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
            future.awaitUninterruptibly();
            ...
            return future.channel();
        }
    }
    
    ================================= DefaultConnectionFactory =================================
    public class DefaultConnectionFactory extends AbstractConnectionFactory {
        public DefaultConnectionFactory(Codec codec, ChannelHandler heartbeatHandler, ChannelHandler handler, ConfigurableInstance configInstance) {
            super(codec, heartbeatHandler, handler, configInstance);
        }
    }
    
    ================================= ConnectionFactory =================================
    public class RpcConnectionFactory extends DefaultConnectionFactory {
        public RpcConnectionFactory(ConcurrentHashMap<String, UserProcessor<?>> userProcessors, ConfigurableInstance configInstance) {
            // 创建 RpcCodec 编解码器工厂类
            // 创建心跳处理器
            // 创建业务逻辑处理器
            super(new RpcCodec(), new HeartbeatHandler(), new RpcHandler(userProcessors), configInstance);
        }
    }
    

    三、ConnectionSelectStrategy 连接选择器

    ================================= ConnectionSelectStrategy =================================
    /**
     * Select strategy from connection pool
     */
    public interface ConnectionSelectStrategy {
        Connection select(List<Connection> conns);
    }
    
    ================================= RandomSelectStrategy =================================
    /**
     * Select a connection randomly
     */
    public class RandomSelectStrategy implements ConnectionSelectStrategy {
        /** max retry times */
        private static final int    MAX_TIMES = 5;
        private final Random        random    = new Random();
        private GlobalSwitch        globalSwitch;
    
        @Override
        public Connection select(List<Connection> conns) {
            Connection result = null;
            // 如果开启了 连接监控,则从状态为健康的连接中获取连接;否则直接从 conns 全连接中随机获取
            if (null != this.globalSwitch && this.globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
                ...
                result = randomGet(serviceStatusOnConns);
            } else {
                result = randomGet(conns);
            }
            return result;
        }
    
        private Connection randomGet(List<Connection> conns) {
            ...
            int tries = 0;
            Connection result = null;
            // 不断重试获取状态为可用的 Connection
            while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) {
                result = conns.get(this.random.nextInt(size));
            }
    
            if (result != null && !result.isFine()) {
                result = null;
            }
            return result;
        }
    }
    
    

    四、ConnectionPool 连接池

    ================================= ConnectionPool =================================
    public class ConnectionPool implements Scannable {
        /** connections */
        private CopyOnWriteArrayList<Connection> conns = new CopyOnWriteArrayList<Connection>();
        /** strategy */
        private ConnectionSelectStrategy strategy;
    
        public void add(Connection connection) {
            boolean res = this.conns.addIfAbsent(connection);
        }
    
        public void removeAndTryClose(Connection connection) {
            boolean res = this.conns.remove(connection);
            // 如果该连接没有引用了,则 close 连接
            if (connection.noRef()) {
                connection.close();
            }
        }
    
        public Connection get() {
            List<Connection> snapshot = new ArrayList<Connection>(this.conns);
            // 使用 ConnectionSelectStrategy 从 List<Connection> 选一个 Connection
            return this.strategy.select(snapshot);
        }
    }
    

    五、ConnectionManager 连接管理器

    ================================= ConnectionManager =================================
    /**
     * Connection manager of connection pool
     */
    public interface ConnectionManager extends Scannable {
        // 初始化操作
        void init();
        // 添加 Connection 到 ConnectionPool(根据poolKey)
        void add(Connection connection);
        // 根据 poolKey 从 ConnectionPool 中获取 Connection
        Connection get(String poolKey);
        // 根据 poolKey 从 ConnectionPool 中删除 Connection 并 close
        void remove(Connection connection);
        // 检测 connection 是否可用
        void check(Connection connection) throws RemotingException;
        // 根据 url 从 ConnectionPool 中获取 Connection,如果 ConnectionPool 为null,则创建 ConnectionPool,并创建 Connection 到 ConnectionPool
        // 创建的 Connection 的数量由 Url#getConnNum() 指定,也可以直接在 addr 上拼接 "_CONNECTIONNUM=10"
        Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException;
        // 创建连接
        Connection create(Url url) throws RemotingException;
    }
    
    ================================= DefaultConnectionManager =================================
    public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager, Scannable {
        private Executor asyncCreateConnectionExecutor;
        private GlobalSwitch globalSwitch;
        // connection pool initialize tasks
        protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;
        protected ConnectionSelectStrategy connectionSelectStrategy;
        protected ConnectionFactory connectionFactory;
    
        protected ConnectionEventHandler connectionEventHandler;
        protected ConnectionEventListener connectionEventListener;
    
        public DefaultConnectionManager() {
            this.connTasks = new ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>>();
            this.connectionSelectStrategy = new RandomSelectStrategy(globalSwitch);
        }
    
        @Override
        public void init() {
            this.connectionEventHandler.setConnectionManager(this);
            this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
            this.connectionFactory.init(connectionEventHandler);
        }
    
        @Override
        public void add(Connection connection) {
            Set<String> poolKeys = connection.getPoolKeys();
            for (String poolKey : poolKeys) {
                this.add(connection, poolKey);
            }
        }
    
        @Override
        public void add(Connection connection, String poolKey) {
            // 获取或者创建一个 ConnectionPool
            ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
            // 将 connection 放入 ConnectionPool
            pool.add(connection);
        }
    
        @Override
        public Connection get(String poolKey) {
            // 获取 ConnectionPool
            ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
            // 从 ConnectionPool 获取 Connection
            return null == pool ? null : pool.get();
        }
    
        @Override
        public void check(Connection connection) throws RemotingException {
            if (connection == null || connection.getChannel() == null || !connection.getChannel().isActive() || !connection.getChannel().isWritable()) {
                throw e;
            } 
        }
    
        // If no task cached, create one and initialize the connections.
        @Override
        public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
            // get and create a connection pool with initialized connections.
            ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url));
            // 从 ConnectionPool 获取 Connection
            return pool.get();
        }
    
        // 直接使用 connectionFactory 创建 Connection
        @Override
        public Connection create(Url url) throws RemotingException {
            Connection conn = this.connectionFactory.createConnection(url);
            return conn;
        }
    
        /**
         * @see com.alipay.remoting.ConnectionHeartbeatManager#disableHeartbeat(com.alipay.remoting.Connection)
         */
        @Override
        public void disableHeartbeat(Connection connection) {
            if (null != connection) {
                connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(false);
            }
        }
    
        /**
         * @see com.alipay.remoting.ConnectionHeartbeatManager#enableHeartbeat(com.alipay.remoting.Connection)
         */
        @Override
        public void enableHeartbeat(Connection connection) {
            if (null != connection) {
                connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(true);
            }
        }
    
        // get connection pool from future task
        private ConnectionPool getConnectionPool(RunStateRecordedFutureTask<ConnectionPool> task) {
            return FutureTaskUtil.getFutureTaskResult(task, logger);
        }
    
        private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey, Callable<ConnectionPool> callable) {
            RunStateRecordedFutureTask<ConnectionPool> initialTask = null;
            ConnectionPool pool = null;
    
            for (int i = 0; (i < retry) && (pool == null); ++i) {
                // 1. 根据 poolKey 从 connTasks 获取 RunStateRecordedFutureTask 实例
                initialTask = this.connTasks.get(poolKey);
                // 2. 如果为 null,创建一个 RunStateRecordedFutureTask 实例,并设置 {poolKey, RunStateRecordedFutureTask 实例} 到 connTasks 中
                if (null == initialTask) {
                    initialTask = new RunStateRecordedFutureTask<ConnectionPool>(callable);
                    initialTask = this.connTasks.putIfAbsent(poolKey, initialTask);
                    // 注意:这里为什么做二次判断?
                    // 在高并发的情况下,有可能同一个 poolKey 下的两个 RpcClient 同时走到这里(我们无法预判用户会怎样使用 Bolt),那么在 putIfAbsent 的时候只有一个可以成功(否则就会创建双倍的预期连接数),
                    // 则先成功的返回 null,后成功的返回旧值,也就是前边插入的 initialTask 实例,一定不为 null
                    if (null == initialTask) {
                        initialTask = this.connTasks.get(poolKey);
                        // 3. 直接运行 RunStateRecordedFutureTask 实例
                        initialTask.run();
                    }
                }
                // 从 RunStateRecordedFutureTask 实例中获取 ConnectionPool
                pool = initialTask.get();
            }
            return pool;
        }
    
        // a callable definition for initialize ConnectionPool
        private class ConnectionPoolCall implements Callable<ConnectionPool> {
            @Override
            public ConnectionPool call() throws Exception {
                // 创建一个 ConnectionPool
                final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
                // 创建 Connection 并添加到 ConnectionPool
                doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
                // 返回 ConnectionPool
                return pool;
            }
        }
    
        /**
         * do create connections
         * @param syncCreateNumWhenNotWarmup 指定了同步创建的个数,默认为1,即需要同步创建一个 Connection,其他的都异步创建
         */
        private void doCreate(Url url, ConnectionPool pool, String taskName, int syncCreateNumWhenNotWarmup) {
            // 池中已有连接数
            final int actualNum = pool.size();
            // 期盼总共的连接数
            final int expectNum = url.getConnNum();
            if (actualNum < expectNum) {
                // 是否配置了连接预热(即需要同步创建好所有的 Connection)
                if (url.isConnWarmup()) {
                    for (int i = actualNum; i < expectNum; ++i) {
                        // 创建 Connection
                        Connection connection = create(url);
                        // 将 Connection 塞入 ConnectionPool
                        pool.add(connection);
                    }
                // 没有配置连接预热,默认同步创建一个 Connection,剩余的 Connection 异步创建
                } else {
                    // 同步创建 Connection,syncCreateNumWhenNotWarmup 指定了同步创建的个数
                    if (syncCreateNumWhenNotWarmup > 0) {
                        for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
                            Connection connection = create(url);
                            pool.add(connection);
                        }
                        if (syncCreateNumWhenNotWarmup == url.getConnNum()) {
                            return;
                        }
                    }
                    // 创建异步创建 Connection 的连接池
                    initializeExecutor();
                    pool.markAsyncCreationStart();// mark the start of async
                        this.asyncCreateConnectionExecutor.execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    for (int i = pool.size(); i < url.getConnNum(); ++i) {
                                        Connection conn =  create(url);
                                        pool.add(conn);
                                    }
                                } finally {
                                    pool.markAsyncCreationDone();// mark the end of async
                                }
                            }
                        });
                    } 
                } // end of NOT warm up
            } // end of if
        }
    
        /**
         * initialize executor
         */
        private void initializeExecutor() {
            if (!this.executorInitialized) {
                this.executorInitialized = true;
                this.asyncCreateConnectionExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
                    keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize),
                    new NamedThreadFactory("Bolt-conn-warmup-executor", true));
            }
        }
    }
    
    ================================= RunStateRecordedFutureTask =================================
    public class RunStateRecordedFutureTask<V> extends FutureTask<V> {
        private AtomicBoolean hasRun = new AtomicBoolean();
    
        public RunStateRecordedFutureTask(Callable<V> callable) {
            super(callable);
        }
    
        @Override
        public void run() {
            this.hasRun.set(true);
            super.run();
        }
    
        public V getAfterRun() throws InterruptedException, ExecutionException,
                              FutureTaskNotRunYetException {
            if (!hasRun.get()) {
                throw new FutureTaskNotRunYetException();
            }
            return super.get();
        }
    }
    
    ================================= FutureTask =================================
        // 结果值
        private Object outcome;
        public V get() throws InterruptedException, ExecutionException {
            // 如果没有完成,等待完成
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            // 完成之后,返回结果值
            return report(s);
        }
    
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    关于多连接:通常来说,点对点的直连通信,客户端和服务端,一个 IP 一个连接对象就够用了。不管是吞吐能力还是并发度,都能满足一般业务的通信需求。而有一些场景,比如不是点对点直连通信,而是经过了 LVS VIP,或者 F5 设备的连接,此时,为了负载均衡和容错,会针对一个 URL 地址建立多个连接。

    以上内容摘自:SOFABolt 用户手册

    六、客户端

    public class RpcClient extends AbstractConfigurableInstance {
        private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);
        private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(switches());
        private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory, connectionEventHandler, connectionEventListener, switches());
    
        public void init() {
            this.connectionManager.init();
            // 开启 连接监听 开关
            if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
                ...
            }
            // 开启 重连 开关
            if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
                ...
            }
        }
    
        public void oneway(String addr, Object request)  {
            this.rpcRemoting.oneway(addr, request, null);
        }
    
        public Connection createStandaloneConnection(String ip, int port, int connectTimeout) {
            return this.connectionManager.create(ip, port, connectTimeout);
        }
    
        public void closeStandaloneConnection(Connection conn) {
            if (null != conn) {
                conn.close();
            }
        }
     
        public Connection getConnection(Url url, int connectTimeout) {
            url.setConnectTimeout(connectTimeout);
            return this.connectionManager.getAndCreateIfAbsent(url);
        }
       
        public void enableConnHeartbeat(String addr) {
            Url url = this.addressParser.parse(addr);
            this.enableConnHeartbeat(url);
        }
    
        // enable connection reconnect switch on
        public void enableReconnectSwitch() {
            this.switches().turnOn(GlobalSwitch.CONN_RECONNECT_SWITCH);
        }
    
        // enable connection monitor switch on
        public void enableConnectionMonitorSwitch() {
            this.switches().turnOn(GlobalSwitch.CONN_MONITOR_SWITCH);
        }
    

    客户端真正创建连接的时候,是在发起第一次调用的时候。

        public void oneway(Url url, Object request, InvokeContext invokeContext) {
            // 获取或者创建 Connection
            Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
            // 检测连接
            this.connectionManager.check(conn);
            this.oneway(conn, request, invokeContext);
        }
    
        protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
            long start = System.currentTimeMillis();
            Connection conn;
            try {
                // 使用 connectionManager 获取或者创建连接
                conn = this.connectionManager.getAndCreateIfAbsent(url);
            } finally {
                // 记录连接获取或者创建的时间消耗
                if (null != invokeContext) {
                    invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
                }
            }
            return conn;
        }
    

    七、服务端

    public class RpcServer extends AbstractRemotingServer {
        private ChannelFuture                               channelFuture;
        private DefaultConnectionManager                    connectionManager;
        
    
        /**
         * You can enable connection management feature by specify @param manageConnection true.
         * 1. When connection management feature enabled, you can use all invoke methods with params {@link String}, {@link Url}, {@link Connection} methods.
         * 2. When connection management feature disabled, you can only use invoke methods with params {@link Connection}, otherwise {@link UnsupportedOperationException} will be thrown.
         */
        public RpcServer(String ip, int port, boolean manageConnection) {
            super(ip, port);
            // 是否开启服务端连接管理功能
            if (manageConnection) {
                this.switches().turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);
            }
        }
    
        @Override
        protected void doInit() {
            // 服务端是否开启连接管理功能
            if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                this.connectionEventHandler = new RpcConnectionEventHandler(switches());
                this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
                this.connectionEventHandler.setConnectionManager(this.connectionManager);
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            } else {
                this.connectionEventHandler = new ConnectionEventHandler(switches());
                this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
            }
            
            ...
            this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel channel) {
                    ...
                    createConnection(channel);
                }
    
                private void createConnection(SocketChannel channel) {
                    Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                    // 是否开启了服务端连接管理功能
                    if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                        // 如果开启了,创建 Connection,并添加到 connectionManager
                        connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                    } else {
                        // 否则,直接创建 Connection
                        new Connection(channel, url);
                    }
                    // 发布 ConnectionEventType.CONNECT 事件
                    channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
                }
            });
        }
    
        public void oneway(final String addr, final Object request) {
            // 1. 查看是否启动了服务端连接管理功能,如果没有,直接抛出异常
            check();
            this.rpcRemoting.oneway(addr, request, null);
        }
        
        // 如果没有开启服务端连接管理功能,只能通过 Connection 对客户端发起调用,无法通过 Url 或 addr 等发起调用
        public void oneway(final Connection conn, final Object request) {
            this.rpcRemoting.oneway(conn, request, null);
        }
    
        public boolean isConnected(Url url) {
            Connection conn = this.rpcRemoting.connectionManager.get(url.getUniqueKey());
            if (null != conn) {
                return conn.isFine();
            }
            return false;
        }
        // 查看是否启动了服务端连接管理功能,如果没有,直接抛出异常
        private void check() {
            if (!this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                throw new UnsupportedOperationException(
                    "Please enable connection manage feature of Rpc Server before call this method! See comments in constructor RpcServer(int port, boolean manageConnection) to find how to enable!");
            }
        }
    }
    

    服务端创建 Connection 只有一个时机:netty 连接刚刚建立时

    特别注意:服务端主动发起请求时(注意,这里是不会创建连接的,而客户端 RpcClientRemoting 发起请求是会获取或者创建连接的)

        public void oneway(Url url, Object request, InvokeContext invokeContext) {
            // 从服务端连接池获取 Connection
            Connection conn = this.connectionManager.get(url.getUniqueKey());
            // 检查连接
            this.connectionManager.check(conn);
            // 发起调用
            this.oneway(conn, request, invokeContext);
        }
    

    八、连接预热、连接超时与期望创建的连接数的指定

    如下两种方式:

    --------------------------String addr------------------------------
    String addr = "127.0.0.1:8888?_CONNECTIONNUM=10&_CONNECTIONWARMUP=true&_CONNECTTIMEOUT=3000";
    String res = (String) client.invokeSync(addr, req, 3000);
    
    --------------------------Url url------------------------------
    Url url = new Url(ip, port);
    url.setProtocol(RpcProtocolV2.PROTOCOL_VERSION_2);
    url.setVersion(RpcProtocolV2.PROTOCOL_VERSION_2);
    url.setConnNum(10); // 期望连接数
    url.setConnWarmup(true); // 是否预热
    url.setConnectTimeout(3000); // 连接超时,3000 ms
    String res = (String) client.invokeSync(url, req, 3000);
    

    相关文章

      网友评论

        本文标题:SOFABolt 源码分析12 - Connection 连接管

        本文链接:https://www.haomeiwen.com/subject/eqsnoftx.html