美文网首页sofabolt
SOFABolt 源码分析12 - Connection 连接管

SOFABolt 源码分析12 - Connection 连接管

作者: 原水寒 | 来源:发表于2018-10-15 17:11 被阅读127次
image.png

上图中仅列出主要的几个类,连接管理包含以下部分:

  • Connection 连接元数据:包裹了 Netty channel 实例
  • ConnectionFactory 连接工厂:创建连接、检测连接等
  • ConnectionPool 连接池:存储 { uniqueKey, List<Connection> } ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection
  • ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
  • ConnectionManager 连接管理器:是对外的门面,包含所有与 Connection 相关的对外的接口操作
  • Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务

一、Connection 元数据

/**
 * An abstraction of socket channel.
 */
public class Connection {
    // netty channel
    private Channel channel;
    // {requestId, InvokeFuture}
    private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);
    private ProtocolCode protocolCode;
    // protocolVersion
    private byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
    // 数据总线
    private Url url;
    // {id, poolKey}
    private final ConcurrentHashMap<Integer, String> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);
    private Set<String> poolKeys = new ConcurrentHashSet<String>();

    public Connection(Channel channel) {
        this.channel = channel;
        // 将当前的 Connection 设置到对应的 netty channel 的附属属性中
        this.channel.attr(CONNECTION).set(this);
    }

    public Connection(Channel channel, Url url) {
        this(channel);
        this.url = url;
        // url.getUniqueKey() 默认为 ip:port
        this.poolKeys.add(url.getUniqueKey());
    }

    public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url url) {
        this(channel, url);
        this.protocolCode = protocolCode;
        this.version = version;
        this.init();
    }

    private void init() {
        // 添加一系列的附属属性:PROTOCOL 和 VERSION 会用在 Codec 编解码中;HEARTBEAT_COUNT 和 HEARTBEAT_SWITCH 会用在心跳
        this.channel.attr(HEARTBEAT_COUNT).set(new Integer(0));
        this.channel.attr(PROTOCOL).set(this.protocolCode);
        this.channel.attr(VERSION).set(this.version);
        this.channel.attr(HEARTBEAT_SWITCH).set(true);
    }

    public boolean isFine() {
        // channel 可用
        return this.channel != null && this.channel.isActive();
    }

    public InvokeFuture getInvokeFuture(int id) {
        return this.invokeFutureMap.get(id);
    }

    public InvokeFuture addInvokeFuture(InvokeFuture future) {
        return this.invokeFutureMap.putIfAbsent(future.invokeId(), future);
    }

    public InvokeFuture removeInvokeFuture(int id) {
        return this.invokeFutureMap.remove(id);
    }

    /**
     * Do something when closing.
     */
    public void onClose() {
        // 遍历 invokeFutureMap,对每一个 InvokeFuture
        // 1. createConnectionClosedResponse 并设置到 InvokeFuture中,唤醒阻塞线程
        // 2. 取消超时任务
        // 3. 异步执行回调
        Iterator<Entry<Integer, InvokeFuture>> iter = invokeFutureMap.entrySet().iterator();
        while (iter.hasNext()) {
            Entry<Integer, InvokeFuture> entry = iter.next();
            iter.remove();
            InvokeFuture future = entry.getValue();
            if (future != null) {
                future.putResponse(future.createConnectionClosedResponse(this.getRemoteAddress()));
                future.cancelTimeout();
                future.tryAsyncExecuteInvokeCallbackAbnormally();
            }
        }
    }

    /**
     * Close the connection.
     */
    public void close() {
        this.getChannel().close();
    }

    /**
    * Whether invokeFutures is completed
    */
    public boolean isInvokeFutureMapFinish() {
        return invokeFutureMap.isEmpty();
    }
}

二、ConnectionFactory 连接工厂

================================= ConnectionFactory =================================
/**
 * Factory that creates connections.
 */
public interface ConnectionFactory {

    /**
     * Initialize the factory.
     */
    void init(ConnectionEventHandler connectionEventHandler);

    /**
     * Create a connection use #BoltUrl
     */
    Connection createConnection(Url url) throws Exception;

    /**
     * Create a connection according to the IP and port.
     * Note: The default protocol is RpcProtocol.
     */
    Connection createConnection(String targetIP, int targetPort, int connectTimeout) throws Exception;

    /**
     * Create a connection according to the IP and port.
     * Note: The default protocol is RpcProtocolV2, and you can specify the version
     */
    Connection createConnection(String targetIP, int targetPort, byte version, int connectTimeout) throws Exception;
}

================================= ConnectionFactory =================================
public abstract class AbstractConnectionFactory implements ConnectionFactory {
    private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1, new NamedThreadFactory("bolt-netty-client-worker", true));
    // RpcServer or RpcClient
    private final ConfigurableInstance  confInstance;
    private final Codec                 codec;
    private final ChannelHandler        heartbeatHandler;
    // 业务逻辑处理器
    private final ChannelHandler        handler;
    protected Bootstrap                 bootstrap;

    @Override
    public void init(final ConnectionEventHandler connectionEventHandler) {
         // 初始化 netty 一系列配置
    }

    @Override
    public Connection createConnection(Url url) throws Exception {
        // 创建 netty channel
        Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
        // 创建 Connection
        Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
        // 发布 ConnectionEventType.CONNECT 事件
        channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
        return conn;
    }

    protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) throws Exception {
        // prevent unreasonable value, at least 1000
        connectTimeout = Math.max(connectTimeout, 1000);
        String address = targetIP + ":" + targetPort;
        // 设置连接超时时间
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
        // 发起连接
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
        future.awaitUninterruptibly();
        ...
        return future.channel();
    }
}

================================= DefaultConnectionFactory =================================
public class DefaultConnectionFactory extends AbstractConnectionFactory {
    public DefaultConnectionFactory(Codec codec, ChannelHandler heartbeatHandler, ChannelHandler handler, ConfigurableInstance configInstance) {
        super(codec, heartbeatHandler, handler, configInstance);
    }
}

================================= ConnectionFactory =================================
public class RpcConnectionFactory extends DefaultConnectionFactory {
    public RpcConnectionFactory(ConcurrentHashMap<String, UserProcessor<?>> userProcessors, ConfigurableInstance configInstance) {
        // 创建 RpcCodec 编解码器工厂类
        // 创建心跳处理器
        // 创建业务逻辑处理器
        super(new RpcCodec(), new HeartbeatHandler(), new RpcHandler(userProcessors), configInstance);
    }
}

三、ConnectionSelectStrategy 连接选择器

================================= ConnectionSelectStrategy =================================
/**
 * Select strategy from connection pool
 */
public interface ConnectionSelectStrategy {
    Connection select(List<Connection> conns);
}

================================= RandomSelectStrategy =================================
/**
 * Select a connection randomly
 */
public class RandomSelectStrategy implements ConnectionSelectStrategy {
    /** max retry times */
    private static final int    MAX_TIMES = 5;
    private final Random        random    = new Random();
    private GlobalSwitch        globalSwitch;

    @Override
    public Connection select(List<Connection> conns) {
        Connection result = null;
        // 如果开启了 连接监控,则从状态为健康的连接中获取连接;否则直接从 conns 全连接中随机获取
        if (null != this.globalSwitch && this.globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
            ...
            result = randomGet(serviceStatusOnConns);
        } else {
            result = randomGet(conns);
        }
        return result;
    }

    private Connection randomGet(List<Connection> conns) {
        ...
        int tries = 0;
        Connection result = null;
        // 不断重试获取状态为可用的 Connection
        while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) {
            result = conns.get(this.random.nextInt(size));
        }

        if (result != null && !result.isFine()) {
            result = null;
        }
        return result;
    }
}

四、ConnectionPool 连接池

================================= ConnectionPool =================================
public class ConnectionPool implements Scannable {
    /** connections */
    private CopyOnWriteArrayList<Connection> conns = new CopyOnWriteArrayList<Connection>();
    /** strategy */
    private ConnectionSelectStrategy strategy;

    public void add(Connection connection) {
        boolean res = this.conns.addIfAbsent(connection);
    }

    public void removeAndTryClose(Connection connection) {
        boolean res = this.conns.remove(connection);
        // 如果该连接没有引用了,则 close 连接
        if (connection.noRef()) {
            connection.close();
        }
    }

    public Connection get() {
        List<Connection> snapshot = new ArrayList<Connection>(this.conns);
        // 使用 ConnectionSelectStrategy 从 List<Connection> 选一个 Connection
        return this.strategy.select(snapshot);
    }
}

五、ConnectionManager 连接管理器

================================= ConnectionManager =================================
/**
 * Connection manager of connection pool
 */
public interface ConnectionManager extends Scannable {
    // 初始化操作
    void init();
    // 添加 Connection 到 ConnectionPool(根据poolKey)
    void add(Connection connection);
    // 根据 poolKey 从 ConnectionPool 中获取 Connection
    Connection get(String poolKey);
    // 根据 poolKey 从 ConnectionPool 中删除 Connection 并 close
    void remove(Connection connection);
    // 检测 connection 是否可用
    void check(Connection connection) throws RemotingException;
    // 根据 url 从 ConnectionPool 中获取 Connection,如果 ConnectionPool 为null,则创建 ConnectionPool,并创建 Connection 到 ConnectionPool
    // 创建的 Connection 的数量由 Url#getConnNum() 指定,也可以直接在 addr 上拼接 "_CONNECTIONNUM=10"
    Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException;
    // 创建连接
    Connection create(Url url) throws RemotingException;
}

================================= DefaultConnectionManager =================================
public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager, Scannable {
    private Executor asyncCreateConnectionExecutor;
    private GlobalSwitch globalSwitch;
    // connection pool initialize tasks
    protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;
    protected ConnectionSelectStrategy connectionSelectStrategy;
    protected ConnectionFactory connectionFactory;

    protected ConnectionEventHandler connectionEventHandler;
    protected ConnectionEventListener connectionEventListener;

    public DefaultConnectionManager() {
        this.connTasks = new ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>>();
        this.connectionSelectStrategy = new RandomSelectStrategy(globalSwitch);
    }

    @Override
    public void init() {
        this.connectionEventHandler.setConnectionManager(this);
        this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
        this.connectionFactory.init(connectionEventHandler);
    }

    @Override
    public void add(Connection connection) {
        Set<String> poolKeys = connection.getPoolKeys();
        for (String poolKey : poolKeys) {
            this.add(connection, poolKey);
        }
    }

    @Override
    public void add(Connection connection, String poolKey) {
        // 获取或者创建一个 ConnectionPool
        ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
        // 将 connection 放入 ConnectionPool
        pool.add(connection);
    }

    @Override
    public Connection get(String poolKey) {
        // 获取 ConnectionPool
        ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
        // 从 ConnectionPool 获取 Connection
        return null == pool ? null : pool.get();
    }

    @Override
    public void check(Connection connection) throws RemotingException {
        if (connection == null || connection.getChannel() == null || !connection.getChannel().isActive() || !connection.getChannel().isWritable()) {
            throw e;
        } 
    }

    // If no task cached, create one and initialize the connections.
    @Override
    public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
        // get and create a connection pool with initialized connections.
        ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url));
        // 从 ConnectionPool 获取 Connection
        return pool.get();
    }

    // 直接使用 connectionFactory 创建 Connection
    @Override
    public Connection create(Url url) throws RemotingException {
        Connection conn = this.connectionFactory.createConnection(url);
        return conn;
    }

    /**
     * @see com.alipay.remoting.ConnectionHeartbeatManager#disableHeartbeat(com.alipay.remoting.Connection)
     */
    @Override
    public void disableHeartbeat(Connection connection) {
        if (null != connection) {
            connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(false);
        }
    }

    /**
     * @see com.alipay.remoting.ConnectionHeartbeatManager#enableHeartbeat(com.alipay.remoting.Connection)
     */
    @Override
    public void enableHeartbeat(Connection connection) {
        if (null != connection) {
            connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(true);
        }
    }

    // get connection pool from future task
    private ConnectionPool getConnectionPool(RunStateRecordedFutureTask<ConnectionPool> task) {
        return FutureTaskUtil.getFutureTaskResult(task, logger);
    }

    private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey, Callable<ConnectionPool> callable) {
        RunStateRecordedFutureTask<ConnectionPool> initialTask = null;
        ConnectionPool pool = null;

        for (int i = 0; (i < retry) && (pool == null); ++i) {
            // 1. 根据 poolKey 从 connTasks 获取 RunStateRecordedFutureTask 实例
            initialTask = this.connTasks.get(poolKey);
            // 2. 如果为 null,创建一个 RunStateRecordedFutureTask 实例,并设置 {poolKey, RunStateRecordedFutureTask 实例} 到 connTasks 中
            if (null == initialTask) {
                initialTask = new RunStateRecordedFutureTask<ConnectionPool>(callable);
                initialTask = this.connTasks.putIfAbsent(poolKey, initialTask);
                // 注意:这里为什么做二次判断?
                // 在高并发的情况下,有可能同一个 poolKey 下的两个 RpcClient 同时走到这里(我们无法预判用户会怎样使用 Bolt),那么在 putIfAbsent 的时候只有一个可以成功(否则就会创建双倍的预期连接数),
                // 则先成功的返回 null,后成功的返回旧值,也就是前边插入的 initialTask 实例,一定不为 null
                if (null == initialTask) {
                    initialTask = this.connTasks.get(poolKey);
                    // 3. 直接运行 RunStateRecordedFutureTask 实例
                    initialTask.run();
                }
            }
            // 从 RunStateRecordedFutureTask 实例中获取 ConnectionPool
            pool = initialTask.get();
        }
        return pool;
    }

    // a callable definition for initialize ConnectionPool
    private class ConnectionPoolCall implements Callable<ConnectionPool> {
        @Override
        public ConnectionPool call() throws Exception {
            // 创建一个 ConnectionPool
            final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
            // 创建 Connection 并添加到 ConnectionPool
            doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
            // 返回 ConnectionPool
            return pool;
        }
    }

    /**
     * do create connections
     * @param syncCreateNumWhenNotWarmup 指定了同步创建的个数,默认为1,即需要同步创建一个 Connection,其他的都异步创建
     */
    private void doCreate(Url url, ConnectionPool pool, String taskName, int syncCreateNumWhenNotWarmup) {
        // 池中已有连接数
        final int actualNum = pool.size();
        // 期盼总共的连接数
        final int expectNum = url.getConnNum();
        if (actualNum < expectNum) {
            // 是否配置了连接预热(即需要同步创建好所有的 Connection)
            if (url.isConnWarmup()) {
                for (int i = actualNum; i < expectNum; ++i) {
                    // 创建 Connection
                    Connection connection = create(url);
                    // 将 Connection 塞入 ConnectionPool
                    pool.add(connection);
                }
            // 没有配置连接预热,默认同步创建一个 Connection,剩余的 Connection 异步创建
            } else {
                // 同步创建 Connection,syncCreateNumWhenNotWarmup 指定了同步创建的个数
                if (syncCreateNumWhenNotWarmup > 0) {
                    for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
                        Connection connection = create(url);
                        pool.add(connection);
                    }
                    if (syncCreateNumWhenNotWarmup == url.getConnNum()) {
                        return;
                    }
                }
                // 创建异步创建 Connection 的连接池
                initializeExecutor();
                pool.markAsyncCreationStart();// mark the start of async
                    this.asyncCreateConnectionExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                for (int i = pool.size(); i < url.getConnNum(); ++i) {
                                    Connection conn =  create(url);
                                    pool.add(conn);
                                }
                            } finally {
                                pool.markAsyncCreationDone();// mark the end of async
                            }
                        }
                    });
                } 
            } // end of NOT warm up
        } // end of if
    }

    /**
     * initialize executor
     */
    private void initializeExecutor() {
        if (!this.executorInitialized) {
            this.executorInitialized = true;
            this.asyncCreateConnectionExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize),
                new NamedThreadFactory("Bolt-conn-warmup-executor", true));
        }
    }
}

================================= RunStateRecordedFutureTask =================================
public class RunStateRecordedFutureTask<V> extends FutureTask<V> {
    private AtomicBoolean hasRun = new AtomicBoolean();

    public RunStateRecordedFutureTask(Callable<V> callable) {
        super(callable);
    }

    @Override
    public void run() {
        this.hasRun.set(true);
        super.run();
    }

    public V getAfterRun() throws InterruptedException, ExecutionException,
                          FutureTaskNotRunYetException {
        if (!hasRun.get()) {
            throw new FutureTaskNotRunYetException();
        }
        return super.get();
    }
}

================================= FutureTask =================================
    // 结果值
    private Object outcome;
    public V get() throws InterruptedException, ExecutionException {
        // 如果没有完成,等待完成
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        // 完成之后,返回结果值
        return report(s);
    }

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

关于多连接:通常来说,点对点的直连通信,客户端和服务端,一个 IP 一个连接对象就够用了。不管是吞吐能力还是并发度,都能满足一般业务的通信需求。而有一些场景,比如不是点对点直连通信,而是经过了 LVS VIP,或者 F5 设备的连接,此时,为了负载均衡和容错,会针对一个 URL 地址建立多个连接。

以上内容摘自:SOFABolt 用户手册

六、客户端

public class RpcClient extends AbstractConfigurableInstance {
    private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);
    private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(switches());
    private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory, connectionEventHandler, connectionEventListener, switches());

    public void init() {
        this.connectionManager.init();
        // 开启 连接监听 开关
        if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
            ...
        }
        // 开启 重连 开关
        if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
            ...
        }
    }

    public void oneway(String addr, Object request)  {
        this.rpcRemoting.oneway(addr, request, null);
    }

    public Connection createStandaloneConnection(String ip, int port, int connectTimeout) {
        return this.connectionManager.create(ip, port, connectTimeout);
    }

    public void closeStandaloneConnection(Connection conn) {
        if (null != conn) {
            conn.close();
        }
    }
 
    public Connection getConnection(Url url, int connectTimeout) {
        url.setConnectTimeout(connectTimeout);
        return this.connectionManager.getAndCreateIfAbsent(url);
    }
   
    public void enableConnHeartbeat(String addr) {
        Url url = this.addressParser.parse(addr);
        this.enableConnHeartbeat(url);
    }

    // enable connection reconnect switch on
    public void enableReconnectSwitch() {
        this.switches().turnOn(GlobalSwitch.CONN_RECONNECT_SWITCH);
    }

    // enable connection monitor switch on
    public void enableConnectionMonitorSwitch() {
        this.switches().turnOn(GlobalSwitch.CONN_MONITOR_SWITCH);
    }

客户端真正创建连接的时候,是在发起第一次调用的时候。

    public void oneway(Url url, Object request, InvokeContext invokeContext) {
        // 获取或者创建 Connection
        Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
        // 检测连接
        this.connectionManager.check(conn);
        this.oneway(conn, request, invokeContext);
    }

    protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
        long start = System.currentTimeMillis();
        Connection conn;
        try {
            // 使用 connectionManager 获取或者创建连接
            conn = this.connectionManager.getAndCreateIfAbsent(url);
        } finally {
            // 记录连接获取或者创建的时间消耗
            if (null != invokeContext) {
                invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
            }
        }
        return conn;
    }

七、服务端

public class RpcServer extends AbstractRemotingServer {
    private ChannelFuture                               channelFuture;
    private DefaultConnectionManager                    connectionManager;
    

    /**
     * You can enable connection management feature by specify @param manageConnection true.
     * 1. When connection management feature enabled, you can use all invoke methods with params {@link String}, {@link Url}, {@link Connection} methods.
     * 2. When connection management feature disabled, you can only use invoke methods with params {@link Connection}, otherwise {@link UnsupportedOperationException} will be thrown.
     */
    public RpcServer(String ip, int port, boolean manageConnection) {
        super(ip, port);
        // 是否开启服务端连接管理功能
        if (manageConnection) {
            this.switches().turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);
        }
    }

    @Override
    protected void doInit() {
        // 服务端是否开启连接管理功能
        if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
            this.connectionEventHandler = new RpcConnectionEventHandler(switches());
            this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
            this.connectionEventHandler.setConnectionManager(this.connectionManager);
            this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
        } else {
            this.connectionEventHandler = new ConnectionEventHandler(switches());
            this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
        }
        
        ...
        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel channel) {
                ...
                createConnection(channel);
            }

            private void createConnection(SocketChannel channel) {
                Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                // 是否开启了服务端连接管理功能
                if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                    // 如果开启了,创建 Connection,并添加到 connectionManager
                    connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                } else {
                    // 否则,直接创建 Connection
                    new Connection(channel, url);
                }
                // 发布 ConnectionEventType.CONNECT 事件
                channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
            }
        });
    }

    public void oneway(final String addr, final Object request) {
        // 1. 查看是否启动了服务端连接管理功能,如果没有,直接抛出异常
        check();
        this.rpcRemoting.oneway(addr, request, null);
    }
    
    // 如果没有开启服务端连接管理功能,只能通过 Connection 对客户端发起调用,无法通过 Url 或 addr 等发起调用
    public void oneway(final Connection conn, final Object request) {
        this.rpcRemoting.oneway(conn, request, null);
    }

    public boolean isConnected(Url url) {
        Connection conn = this.rpcRemoting.connectionManager.get(url.getUniqueKey());
        if (null != conn) {
            return conn.isFine();
        }
        return false;
    }
    // 查看是否启动了服务端连接管理功能,如果没有,直接抛出异常
    private void check() {
        if (!this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
            throw new UnsupportedOperationException(
                "Please enable connection manage feature of Rpc Server before call this method! See comments in constructor RpcServer(int port, boolean manageConnection) to find how to enable!");
        }
    }
}

服务端创建 Connection 只有一个时机:netty 连接刚刚建立时

特别注意:服务端主动发起请求时(注意,这里是不会创建连接的,而客户端 RpcClientRemoting 发起请求是会获取或者创建连接的)

    public void oneway(Url url, Object request, InvokeContext invokeContext) {
        // 从服务端连接池获取 Connection
        Connection conn = this.connectionManager.get(url.getUniqueKey());
        // 检查连接
        this.connectionManager.check(conn);
        // 发起调用
        this.oneway(conn, request, invokeContext);
    }

八、连接预热、连接超时与期望创建的连接数的指定

如下两种方式:

--------------------------String addr------------------------------
String addr = "127.0.0.1:8888?_CONNECTIONNUM=10&_CONNECTIONWARMUP=true&_CONNECTTIMEOUT=3000";
String res = (String) client.invokeSync(addr, req, 3000);

--------------------------Url url------------------------------
Url url = new Url(ip, port);
url.setProtocol(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setVersion(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setConnNum(10); // 期望连接数
url.setConnWarmup(true); // 是否预热
url.setConnectTimeout(3000); // 连接超时,3000 ms
String res = (String) client.invokeSync(url, req, 3000);

相关文章

网友评论

    本文标题:SOFABolt 源码分析12 - Connection 连接管

    本文链接:https://www.haomeiwen.com/subject/eqsnoftx.html