
SOFABolt 提供了双工通信能力,使得不仅客户端可以调用服务端,服务端也可以主动调用客户端(当然,客户端也就需要可以注册 UserProcessor 的功能)。
SOFABolt 四种调用模式:oneway / sync / future / callback
SOFABolt 三种调用链路:addr / url / connection,
注意:在整个调用过程中,调用链路会发生如下转化:addr -> url -> connection
如上图所示,整个调用辅助类包含:
- BaseRemoting:提供了四种基本调用模式的方法(基于 Connection 的,值得注意的是,Connection 也是三种调用链路最底层的),这四种调用模式也提供了基本的调用模板;
- RpcRemoting
- 实现了 RpcProtocol 的初始化
- 实现了基于 addr 链路的四种基本调用模式模板(内部实际调用基于 url 链路模式)
- 提供了将请求对象 Object 封装为 RemotingCommand 的方法
- RpcClientRemoting
- 实现了基于 url 链路的四种基本调用模式模板(内部实际调用基于 connection 链路模式,根据 url 调用建连接口创建 connection)
- 提供了建连操作
- RpcServerRemoting
实现了基于 url 链路的四种基本调用模式模板(内部实际调用基于 connection 链路模式,但是不会根据 url 创建 connection,只会从连接管理器根据 url 获取连接 - 所以
要想使用基于 url 链路的功能,必须开启服务端连接管理功能,而基于 addr 链路的方式底层又是转化为 url 链路方式,所以基于 addr 链路的功能,也必须开启服务端连接管理功能
)
注意:
- 客户端调用服务端会主动建连,服务端调用客户端不会主动建连
- 服务端想使用基于 url 链路或者基于 addr 链路的调用功能,必须开启服务端连接管理功能(实际上还需要保存 addr 链路地址,通常通过 UserProcessor.handleRequest 中的 BizContext 来获取 remoteAddr 并存储在 UserProcessor 中)
- 服务端如果没有开启服务端连接管理功能,只能通过 connection 链路进行调用,此时要保存好连接建立好的时候创建的 connection 对象(通常使用 ConnectionEventType.CONNECT 的连接事件处理器做这件事)
一、使用姿势
1.1、基于 addr 链路模式
服务端
public class MyServer {
public static void main(String[] args) throws RemotingException, InterruptedException {
RpcServer server= new RpcServer(8888);
MyServerUserProcessor serverUserProcessor = new MyServerUserProcessor();
server.registerUserProcessor(serverUserProcessor);
// 打开服务端连接管理功能
server.switches().turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);
if (server.start()) {
System.out.println("server start success!");
// 模拟去其他事情
Thread.sleep(10000);
MyRequest request = new MyRequest();
request.setReq("hi, bolt-client");
// 向 serverUserProcessor 存储的 RemoteAddr 发起请求
MyResponse resp = (MyResponse)server.invokeSync(serverUserProcessor.getRemoteAddr(), request, 10000);
System.out.println(resp.getResp());
} else {
System.out.println("server start fail!");
}
}
}
=========================== 服务端业务逻辑处理器 ===========================
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
// 存储 client 端地址,用于发起远程调用
private String remoteAddr;
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
remoteAddr = bizCtx.getRemoteAddress(); // 此处也可以存储 Connection:bizCtx.getConnection();
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(request);
response.setResp("from server -> " + request.getReq());
}
return response;
}
@Override
public String interest() {
return MyRequest.class.getName();
}
public String getRemoteAddr() {
return remoteAddr;
}
}
客户端
public class MyClient {
private static RpcClient client;
private static CountDownLatch latch = new CountDownLatch(1);
public static void start() {
client = new RpcClient();
// 注册业务逻辑处理器
client.registerUserProcessor(new MyClientUserProcessor());
client.init();
}
public static void main(String[] args) throws RemotingException, InterruptedException {
MyClient.start();
MyRequest request = new MyRequest();
request.setReq("hello, bolt-server");
MyResponse response = (MyResponse) client.invokeSync("127.0.0.1:8888", request, 300 * 1000);
System.out.println(response);
latch.await();
}
}
=========================== 客户端业务逻辑处理器 ===========================
public class MyClientUserProcessor extends SyncUserProcessor<MyRequest> {
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(request);
response.setResp("from client -> " + request.getReq());
}
return response;
}
@Override
public String interest() {
return MyRequest.class.getName();
}
}
1.2、基于 connection 链路模式
服务端
public class MyServer {
public static void main(String[] args) throws RemotingException, InterruptedException {
RpcServer server= new RpcServer(8888);
MyServerUserProcessor serverUserProcessor = new MyServerUserProcessor();
server.registerUserProcessor(serverUserProcessor);
// 创建并注册 ConnectionEventType.CONNECT 连接事件处理器
MyCONNECTEventProcessor connectEventProcessor = new MyCONNECTEventProcessor();
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, connectEventProcessor);
if (server.start()) {
System.out.println("server start success!");
// 模拟去其他事情
Thread.sleep(10000);
MyRequest request = new MyRequest();
request.setReq("hi, bolt-client");
// 向 connectEventProcessor 存储的 connection 发起请求
MyResponse resp = (MyResponse)server.invokeSync(connectEventProcessor.getConnection(), request, 10000);
System.out.println(resp.getResp());
} else {
System.out.println("server start fail!");
}
}
}
=========================== 连接事件处理器 ===========================
public class MyCONNECTEventProcessor implements ConnectionEventProcessor {
// 存储连接,用于服务端向客户端发起远程通信
private Connection connection;
@Override
public void onEvent(String remoteAddr, Connection conn) {
this.connection = conn;
System.out.println("hello, " + remoteAddr);
}
public Connection getConnection() {
return connection;
}
}
客户端
同 1.1
二、源码解析
建连
======================================== RpcServer ========================================
protected void doInit() {
// 开启服务端连接管理功能
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
this.connectionEventHandler = new RpcConnectionEventHandler(switches());
// 与客户端一样,创建连接管理器,并设置到 connectionEventHandler 中
this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
this.connectionEventHandler.setConnectionManager(this.connectionManager);
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
} else {
this.connectionEventHandler = new ConnectionEventHandler(switches());
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
}
initRpcRemoting();
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
createConnection(channel);
}
private void createConnection(SocketChannel channel) {
Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
// 如果开启了连接管理功能,则新建 Connection 并加入 连接管理器;
// 如果没有开启,则直接新建 Connection
if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
connectionManager.add(new Connection(channel, url), url.getUniqueKey());
} else {
new Connection(channel, url);
}
// 发布建连事件,此时进行连接的保存
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
}
});
}
======================================== DefaultConnectionManager ========================================
public void add(Connection connection, String poolKey) {
ConnectionPool pool = null;
// 如果有 ConnectionPool 直接获取,没有就创建一个新的
// 与客户端不同的是,该 add 方法只创建一个空的 ConnectionPool,不会初始化连接;而客户端会初始化连接
// 连接是否初始化,取决于 ConnectionPoolCall 构造器参数
pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
// 连接加入连接池
pool.add(connection);
}
private class ConnectionPoolCall implements Callable<ConnectionPool> {
// 是否初始化连接
private boolean whetherInitConnection;
private Url url;
// 默认构造器:只创建 pool,不建连(服务端用的这个)
public ConnectionPoolCall() {
this.whetherInitConnection = false;
}
// 创建 pool + 建连(客户端用的这个)
public ConnectionPoolCall(Url url) {
this.whetherInitConnection = true;
this.url = url;
}
@Override
public ConnectionPool call() throws Exception {
// 创建 pool
final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
if (whetherInitConnection) {
// 建连
doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
}
return pool;
}
}
调用
======================================== RpcRemoting ========================================
public Object invokeSync(String addr, Object request, InvokeContext invokeContext, int timeoutMillis) {
// addr => url
Url url = this.addressParser.parse(addr);
// 以 url 链路方式进行调用
return this.invokeSync(url, request, invokeContext, timeoutMillis);
}
======================================== RpcServerRemoting ========================================
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 直接从 connectionManager 获取连接(仅仅是获取,不建连)
Connection conn = this.connectionManager.get(url.getUniqueKey());
// 检查连接
this.connectionManager.check(conn);
// 以 connection 链路方式进行调用
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
public Connection DefaultConnectionManager#get(String poolKey) {
// 获取连接池,如果没有返回null,否则从连接池获取一个 connection。整个过程只是单纯的获取,不做建连操作
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
return null == pool ? null : pool.get();
}
======================================== RpcClientRemoting ========================================
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 获取连接,如果没有就建连
final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
// 检查连接
this.connectionManager.check(conn);
// 以 connection 链路方式进行调用
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
======================================== RpcRemoting ========================================
public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 构造请求统一体
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
// 预处理 invokeContext
preProcessInvokeContext(invokeContext, requestCommand, conn);
// 发起调用,返回响应统一体
ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
// 设置 invokeContext 到响应统一体
responseCommand.setInvokeContext(invokeContext);
// 从响应统一体解析出真正的响应消息
Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
return responseObject;
}
======================================== BaseRemoting ========================================
protected RemotingCommand invokeSync(Connection conn, RemotingCommand request, int timeoutMillis) {
// 创建 InvokeFuture
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
// 将 InvokeFuture 添加到 Connection 的 InvokeFutureMap 映射中
conn.addInvokeFuture(future);
// 发起 netty 请求
conn.getChannel().writeAndFlush(request);
// 阻塞等待响应结果
RemotingCommand response = future.waitResponse(timeoutMillis);
// 被唤醒后返回响应结果
return response;
}
网友评论