上图中仅列出主要的几个类,连接管理包含以下部分:
- Connection 连接元数据:包裹了 Netty channel 实例
- ConnectionFactory 连接工厂:创建连接、检测连接等
- ConnectionPool 连接池:存储 { uniqueKey, List<Connection> } ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection
- ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
- ConnectionManager 连接管理器:是对外的门面,包含所有与 Connection 相关的对外的接口操作
- Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务
一、Connection 元数据
/**
* An abstraction of socket channel.
*/
public class Connection {
// netty channel
private Channel channel;
// {requestId, InvokeFuture}
private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);
private ProtocolCode protocolCode;
// protocolVersion
private byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
// 数据总线
private Url url;
// {id, poolKey}
private final ConcurrentHashMap<Integer, String> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);
private Set<String> poolKeys = new ConcurrentHashSet<String>();
public Connection(Channel channel) {
this.channel = channel;
// 将当前的 Connection 设置到对应的 netty channel 的附属属性中
this.channel.attr(CONNECTION).set(this);
}
public Connection(Channel channel, Url url) {
this(channel);
this.url = url;
// url.getUniqueKey() 默认为 ip:port
this.poolKeys.add(url.getUniqueKey());
}
public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url url) {
this(channel, url);
this.protocolCode = protocolCode;
this.version = version;
this.init();
}
private void init() {
// 添加一系列的附属属性:PROTOCOL 和 VERSION 会用在 Codec 编解码中;HEARTBEAT_COUNT 和 HEARTBEAT_SWITCH 会用在心跳
this.channel.attr(HEARTBEAT_COUNT).set(new Integer(0));
this.channel.attr(PROTOCOL).set(this.protocolCode);
this.channel.attr(VERSION).set(this.version);
this.channel.attr(HEARTBEAT_SWITCH).set(true);
}
public boolean isFine() {
// channel 可用
return this.channel != null && this.channel.isActive();
}
public InvokeFuture getInvokeFuture(int id) {
return this.invokeFutureMap.get(id);
}
public InvokeFuture addInvokeFuture(InvokeFuture future) {
return this.invokeFutureMap.putIfAbsent(future.invokeId(), future);
}
public InvokeFuture removeInvokeFuture(int id) {
return this.invokeFutureMap.remove(id);
}
/**
* Do something when closing.
*/
public void onClose() {
// 遍历 invokeFutureMap,对每一个 InvokeFuture
// 1. createConnectionClosedResponse 并设置到 InvokeFuture中,唤醒阻塞线程
// 2. 取消超时任务
// 3. 异步执行回调
Iterator<Entry<Integer, InvokeFuture>> iter = invokeFutureMap.entrySet().iterator();
while (iter.hasNext()) {
Entry<Integer, InvokeFuture> entry = iter.next();
iter.remove();
InvokeFuture future = entry.getValue();
if (future != null) {
future.putResponse(future.createConnectionClosedResponse(this.getRemoteAddress()));
future.cancelTimeout();
future.tryAsyncExecuteInvokeCallbackAbnormally();
}
}
}
/**
* Close the connection.
*/
public void close() {
this.getChannel().close();
}
/**
* Whether invokeFutures is completed
*/
public boolean isInvokeFutureMapFinish() {
return invokeFutureMap.isEmpty();
}
}
二、ConnectionFactory 连接工厂
================================= ConnectionFactory =================================
/**
* Factory that creates connections.
*/
public interface ConnectionFactory {
/**
* Initialize the factory.
*/
void init(ConnectionEventHandler connectionEventHandler);
/**
* Create a connection use #BoltUrl
*/
Connection createConnection(Url url) throws Exception;
/**
* Create a connection according to the IP and port.
* Note: The default protocol is RpcProtocol.
*/
Connection createConnection(String targetIP, int targetPort, int connectTimeout) throws Exception;
/**
* Create a connection according to the IP and port.
* Note: The default protocol is RpcProtocolV2, and you can specify the version
*/
Connection createConnection(String targetIP, int targetPort, byte version, int connectTimeout) throws Exception;
}
================================= ConnectionFactory =================================
public abstract class AbstractConnectionFactory implements ConnectionFactory {
private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() + 1, new NamedThreadFactory("bolt-netty-client-worker", true));
// RpcServer or RpcClient
private final ConfigurableInstance confInstance;
private final Codec codec;
private final ChannelHandler heartbeatHandler;
// 业务逻辑处理器
private final ChannelHandler handler;
protected Bootstrap bootstrap;
@Override
public void init(final ConnectionEventHandler connectionEventHandler) {
// 初始化 netty 一系列配置
}
@Override
public Connection createConnection(Url url) throws Exception {
// 创建 netty channel
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
// 创建 Connection
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
// 发布 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
return conn;
}
protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) throws Exception {
// prevent unreasonable value, at least 1000
connectTimeout = Math.max(connectTimeout, 1000);
String address = targetIP + ":" + targetPort;
// 设置连接超时时间
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
// 发起连接
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
future.awaitUninterruptibly();
...
return future.channel();
}
}
================================= DefaultConnectionFactory =================================
public class DefaultConnectionFactory extends AbstractConnectionFactory {
public DefaultConnectionFactory(Codec codec, ChannelHandler heartbeatHandler, ChannelHandler handler, ConfigurableInstance configInstance) {
super(codec, heartbeatHandler, handler, configInstance);
}
}
================================= ConnectionFactory =================================
public class RpcConnectionFactory extends DefaultConnectionFactory {
public RpcConnectionFactory(ConcurrentHashMap<String, UserProcessor<?>> userProcessors, ConfigurableInstance configInstance) {
// 创建 RpcCodec 编解码器工厂类
// 创建心跳处理器
// 创建业务逻辑处理器
super(new RpcCodec(), new HeartbeatHandler(), new RpcHandler(userProcessors), configInstance);
}
}
三、ConnectionSelectStrategy 连接选择器
================================= ConnectionSelectStrategy =================================
/**
* Select strategy from connection pool
*/
public interface ConnectionSelectStrategy {
Connection select(List<Connection> conns);
}
================================= RandomSelectStrategy =================================
/**
* Select a connection randomly
*/
public class RandomSelectStrategy implements ConnectionSelectStrategy {
/** max retry times */
private static final int MAX_TIMES = 5;
private final Random random = new Random();
private GlobalSwitch globalSwitch;
@Override
public Connection select(List<Connection> conns) {
Connection result = null;
// 如果开启了 连接监控,则从状态为健康的连接中获取连接;否则直接从 conns 全连接中随机获取
if (null != this.globalSwitch && this.globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
...
result = randomGet(serviceStatusOnConns);
} else {
result = randomGet(conns);
}
return result;
}
private Connection randomGet(List<Connection> conns) {
...
int tries = 0;
Connection result = null;
// 不断重试获取状态为可用的 Connection
while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) {
result = conns.get(this.random.nextInt(size));
}
if (result != null && !result.isFine()) {
result = null;
}
return result;
}
}
四、ConnectionPool 连接池
================================= ConnectionPool =================================
public class ConnectionPool implements Scannable {
/** connections */
private CopyOnWriteArrayList<Connection> conns = new CopyOnWriteArrayList<Connection>();
/** strategy */
private ConnectionSelectStrategy strategy;
public void add(Connection connection) {
boolean res = this.conns.addIfAbsent(connection);
}
public void removeAndTryClose(Connection connection) {
boolean res = this.conns.remove(connection);
// 如果该连接没有引用了,则 close 连接
if (connection.noRef()) {
connection.close();
}
}
public Connection get() {
List<Connection> snapshot = new ArrayList<Connection>(this.conns);
// 使用 ConnectionSelectStrategy 从 List<Connection> 选一个 Connection
return this.strategy.select(snapshot);
}
}
五、ConnectionManager 连接管理器
================================= ConnectionManager =================================
/**
* Connection manager of connection pool
*/
public interface ConnectionManager extends Scannable {
// 初始化操作
void init();
// 添加 Connection 到 ConnectionPool(根据poolKey)
void add(Connection connection);
// 根据 poolKey 从 ConnectionPool 中获取 Connection
Connection get(String poolKey);
// 根据 poolKey 从 ConnectionPool 中删除 Connection 并 close
void remove(Connection connection);
// 检测 connection 是否可用
void check(Connection connection) throws RemotingException;
// 根据 url 从 ConnectionPool 中获取 Connection,如果 ConnectionPool 为null,则创建 ConnectionPool,并创建 Connection 到 ConnectionPool
// 创建的 Connection 的数量由 Url#getConnNum() 指定,也可以直接在 addr 上拼接 "_CONNECTIONNUM=10"
Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException;
// 创建连接
Connection create(Url url) throws RemotingException;
}
================================= DefaultConnectionManager =================================
public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager, Scannable {
private Executor asyncCreateConnectionExecutor;
private GlobalSwitch globalSwitch;
// connection pool initialize tasks
protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;
protected ConnectionSelectStrategy connectionSelectStrategy;
protected ConnectionFactory connectionFactory;
protected ConnectionEventHandler connectionEventHandler;
protected ConnectionEventListener connectionEventListener;
public DefaultConnectionManager() {
this.connTasks = new ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>>();
this.connectionSelectStrategy = new RandomSelectStrategy(globalSwitch);
}
@Override
public void init() {
this.connectionEventHandler.setConnectionManager(this);
this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
this.connectionFactory.init(connectionEventHandler);
}
@Override
public void add(Connection connection) {
Set<String> poolKeys = connection.getPoolKeys();
for (String poolKey : poolKeys) {
this.add(connection, poolKey);
}
}
@Override
public void add(Connection connection, String poolKey) {
// 获取或者创建一个 ConnectionPool
ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
// 将 connection 放入 ConnectionPool
pool.add(connection);
}
@Override
public Connection get(String poolKey) {
// 获取 ConnectionPool
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
// 从 ConnectionPool 获取 Connection
return null == pool ? null : pool.get();
}
@Override
public void check(Connection connection) throws RemotingException {
if (connection == null || connection.getChannel() == null || !connection.getChannel().isActive() || !connection.getChannel().isWritable()) {
throw e;
}
}
// If no task cached, create one and initialize the connections.
@Override
public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
// get and create a connection pool with initialized connections.
ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url));
// 从 ConnectionPool 获取 Connection
return pool.get();
}
// 直接使用 connectionFactory 创建 Connection
@Override
public Connection create(Url url) throws RemotingException {
Connection conn = this.connectionFactory.createConnection(url);
return conn;
}
/**
* @see com.alipay.remoting.ConnectionHeartbeatManager#disableHeartbeat(com.alipay.remoting.Connection)
*/
@Override
public void disableHeartbeat(Connection connection) {
if (null != connection) {
connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(false);
}
}
/**
* @see com.alipay.remoting.ConnectionHeartbeatManager#enableHeartbeat(com.alipay.remoting.Connection)
*/
@Override
public void enableHeartbeat(Connection connection) {
if (null != connection) {
connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(true);
}
}
// get connection pool from future task
private ConnectionPool getConnectionPool(RunStateRecordedFutureTask<ConnectionPool> task) {
return FutureTaskUtil.getFutureTaskResult(task, logger);
}
private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey, Callable<ConnectionPool> callable) {
RunStateRecordedFutureTask<ConnectionPool> initialTask = null;
ConnectionPool pool = null;
for (int i = 0; (i < retry) && (pool == null); ++i) {
// 1. 根据 poolKey 从 connTasks 获取 RunStateRecordedFutureTask 实例
initialTask = this.connTasks.get(poolKey);
// 2. 如果为 null,创建一个 RunStateRecordedFutureTask 实例,并设置 {poolKey, RunStateRecordedFutureTask 实例} 到 connTasks 中
if (null == initialTask) {
initialTask = new RunStateRecordedFutureTask<ConnectionPool>(callable);
initialTask = this.connTasks.putIfAbsent(poolKey, initialTask);
// 注意:这里为什么做二次判断?
// 在高并发的情况下,有可能同一个 poolKey 下的两个 RpcClient 同时走到这里(我们无法预判用户会怎样使用 Bolt),那么在 putIfAbsent 的时候只有一个可以成功(否则就会创建双倍的预期连接数),
// 则先成功的返回 null,后成功的返回旧值,也就是前边插入的 initialTask 实例,一定不为 null
if (null == initialTask) {
initialTask = this.connTasks.get(poolKey);
// 3. 直接运行 RunStateRecordedFutureTask 实例
initialTask.run();
}
}
// 从 RunStateRecordedFutureTask 实例中获取 ConnectionPool
pool = initialTask.get();
}
return pool;
}
// a callable definition for initialize ConnectionPool
private class ConnectionPoolCall implements Callable<ConnectionPool> {
@Override
public ConnectionPool call() throws Exception {
// 创建一个 ConnectionPool
final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
// 创建 Connection 并添加到 ConnectionPool
doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
// 返回 ConnectionPool
return pool;
}
}
/**
* do create connections
* @param syncCreateNumWhenNotWarmup 指定了同步创建的个数,默认为1,即需要同步创建一个 Connection,其他的都异步创建
*/
private void doCreate(Url url, ConnectionPool pool, String taskName, int syncCreateNumWhenNotWarmup) {
// 池中已有连接数
final int actualNum = pool.size();
// 期盼总共的连接数
final int expectNum = url.getConnNum();
if (actualNum < expectNum) {
// 是否配置了连接预热(即需要同步创建好所有的 Connection)
if (url.isConnWarmup()) {
for (int i = actualNum; i < expectNum; ++i) {
// 创建 Connection
Connection connection = create(url);
// 将 Connection 塞入 ConnectionPool
pool.add(connection);
}
// 没有配置连接预热,默认同步创建一个 Connection,剩余的 Connection 异步创建
} else {
// 同步创建 Connection,syncCreateNumWhenNotWarmup 指定了同步创建的个数
if (syncCreateNumWhenNotWarmup > 0) {
for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
Connection connection = create(url);
pool.add(connection);
}
if (syncCreateNumWhenNotWarmup == url.getConnNum()) {
return;
}
}
// 创建异步创建 Connection 的连接池
initializeExecutor();
pool.markAsyncCreationStart();// mark the start of async
this.asyncCreateConnectionExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = pool.size(); i < url.getConnNum(); ++i) {
Connection conn = create(url);
pool.add(conn);
}
} finally {
pool.markAsyncCreationDone();// mark the end of async
}
}
});
}
} // end of NOT warm up
} // end of if
}
/**
* initialize executor
*/
private void initializeExecutor() {
if (!this.executorInitialized) {
this.executorInitialized = true;
this.asyncCreateConnectionExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize),
new NamedThreadFactory("Bolt-conn-warmup-executor", true));
}
}
}
================================= RunStateRecordedFutureTask =================================
public class RunStateRecordedFutureTask<V> extends FutureTask<V> {
private AtomicBoolean hasRun = new AtomicBoolean();
public RunStateRecordedFutureTask(Callable<V> callable) {
super(callable);
}
@Override
public void run() {
this.hasRun.set(true);
super.run();
}
public V getAfterRun() throws InterruptedException, ExecutionException,
FutureTaskNotRunYetException {
if (!hasRun.get()) {
throw new FutureTaskNotRunYetException();
}
return super.get();
}
}
================================= FutureTask =================================
// 结果值
private Object outcome;
public V get() throws InterruptedException, ExecutionException {
// 如果没有完成,等待完成
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 完成之后,返回结果值
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
关于多连接:通常来说,点对点的直连通信,客户端和服务端,一个 IP 一个连接对象就够用了。不管是吞吐能力还是并发度,都能满足一般业务的通信需求。而有一些场景,比如不是点对点直连通信,而是经过了 LVS VIP,或者 F5 设备的连接,此时,为了负载均衡和容错,会针对一个 URL 地址建立多个连接。
以上内容摘自:SOFABolt 用户手册
六、客户端
public class RpcClient extends AbstractConfigurableInstance {
private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);
private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(switches());
private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory, connectionEventHandler, connectionEventListener, switches());
public void init() {
this.connectionManager.init();
// 开启 连接监听 开关
if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
...
}
// 开启 重连 开关
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
...
}
}
public void oneway(String addr, Object request) {
this.rpcRemoting.oneway(addr, request, null);
}
public Connection createStandaloneConnection(String ip, int port, int connectTimeout) {
return this.connectionManager.create(ip, port, connectTimeout);
}
public void closeStandaloneConnection(Connection conn) {
if (null != conn) {
conn.close();
}
}
public Connection getConnection(Url url, int connectTimeout) {
url.setConnectTimeout(connectTimeout);
return this.connectionManager.getAndCreateIfAbsent(url);
}
public void enableConnHeartbeat(String addr) {
Url url = this.addressParser.parse(addr);
this.enableConnHeartbeat(url);
}
// enable connection reconnect switch on
public void enableReconnectSwitch() {
this.switches().turnOn(GlobalSwitch.CONN_RECONNECT_SWITCH);
}
// enable connection monitor switch on
public void enableConnectionMonitorSwitch() {
this.switches().turnOn(GlobalSwitch.CONN_MONITOR_SWITCH);
}
客户端真正创建连接的时候,是在发起第一次调用的时候。
public void oneway(Url url, Object request, InvokeContext invokeContext) {
// 获取或者创建 Connection
Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
// 检测连接
this.connectionManager.check(conn);
this.oneway(conn, request, invokeContext);
}
protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) {
long start = System.currentTimeMillis();
Connection conn;
try {
// 使用 connectionManager 获取或者创建连接
conn = this.connectionManager.getAndCreateIfAbsent(url);
} finally {
// 记录连接获取或者创建的时间消耗
if (null != invokeContext) {
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start));
}
}
return conn;
}
七、服务端
public class RpcServer extends AbstractRemotingServer {
private ChannelFuture channelFuture;
private DefaultConnectionManager connectionManager;
/**
* You can enable connection management feature by specify @param manageConnection true.
* 1. When connection management feature enabled, you can use all invoke methods with params {@link String}, {@link Url}, {@link Connection} methods.
* 2. When connection management feature disabled, you can only use invoke methods with params {@link Connection}, otherwise {@link UnsupportedOperationException} will be thrown.
*/
public RpcServer(String ip, int port, boolean manageConnection) {
super(ip, port);
// 是否开启服务端连接管理功能
if (manageConnection) {
this.switches().turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);
}
}
@Override
protected void doInit() {
// 服务端是否开启连接管理功能
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
this.connectionEventHandler = new RpcConnectionEventHandler(switches());
this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
this.connectionEventHandler.setConnectionManager(this.connectionManager);
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
} else {
this.connectionEventHandler = new ConnectionEventHandler(switches());
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
}
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
createConnection(channel);
}
private void createConnection(SocketChannel channel) {
Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
// 是否开启了服务端连接管理功能
if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
// 如果开启了,创建 Connection,并添加到 connectionManager
connectionManager.add(new Connection(channel, url), url.getUniqueKey());
} else {
// 否则,直接创建 Connection
new Connection(channel, url);
}
// 发布 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
}
});
}
public void oneway(final String addr, final Object request) {
// 1. 查看是否启动了服务端连接管理功能,如果没有,直接抛出异常
check();
this.rpcRemoting.oneway(addr, request, null);
}
// 如果没有开启服务端连接管理功能,只能通过 Connection 对客户端发起调用,无法通过 Url 或 addr 等发起调用
public void oneway(final Connection conn, final Object request) {
this.rpcRemoting.oneway(conn, request, null);
}
public boolean isConnected(Url url) {
Connection conn = this.rpcRemoting.connectionManager.get(url.getUniqueKey());
if (null != conn) {
return conn.isFine();
}
return false;
}
// 查看是否启动了服务端连接管理功能,如果没有,直接抛出异常
private void check() {
if (!this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
throw new UnsupportedOperationException(
"Please enable connection manage feature of Rpc Server before call this method! See comments in constructor RpcServer(int port, boolean manageConnection) to find how to enable!");
}
}
}
服务端创建 Connection 只有一个时机:netty 连接刚刚建立时
特别注意:服务端主动发起请求时(注意,这里是不会创建连接的,而客户端 RpcClientRemoting 发起请求是会获取或者创建连接的)
public void oneway(Url url, Object request, InvokeContext invokeContext) {
// 从服务端连接池获取 Connection
Connection conn = this.connectionManager.get(url.getUniqueKey());
// 检查连接
this.connectionManager.check(conn);
// 发起调用
this.oneway(conn, request, invokeContext);
}
八、连接预热、连接超时与期望创建的连接数的指定
如下两种方式:
--------------------------String addr------------------------------
String addr = "127.0.0.1:8888?_CONNECTIONNUM=10&_CONNECTIONWARMUP=true&_CONNECTTIMEOUT=3000";
String res = (String) client.invokeSync(addr, req, 3000);
--------------------------Url url------------------------------
Url url = new Url(ip, port);
url.setProtocol(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setVersion(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setConnNum(10); // 期望连接数
url.setConnWarmup(true); // 是否预热
url.setConnectTimeout(3000); // 连接超时,3000 ms
String res = (String) client.invokeSync(url, req, 3000);
网友评论