
如上图所示,整个 UserProcessor 自定义处理器从两个角度进行分类:
- interest(感兴趣的请求数据类型)是单个还是多个
- UserProcessor 是异步还是同步
两种分类组合成四种底层抽象类,由用户选择实现。其中 AbstractUserProcessor 和 AbstractMultiInterestUserProcessor 可以看做是接口的适配器(为接口的每一个方法做了默认实现,使得最底层的四个抽象仅仅实现自己需要实现的方法就可以),所以个人更倾向于取名为 UserProcessorAdapter 和 MultiInterestUserProcessorAdapter。
一、同步用户处理器
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(request);
response.setResp("from server -> " + request.getReq());
}
return response;
}
@Override
public String interest() {
return MyRequest.class.getName();
}
}
二、异步用户处理器
public class MyAsyncServerUserProcessor extends AsyncUserProcessor<MyRequest> {
private static final Executor executor = Executors.newCachedThreadPool();
@Override
public void handleRequest(BizContext bizCtx, final AsyncContext asyncCtx, MyRequest request) {
executor.execute(new AsyncTask(bizCtx, asyncCtx, request));
}
@Override
public String interest() {
return MyRequest.class.getName();
}
class AsyncTask implements Runnable {
private BizContext bizCtx;
private AsyncContext asyncCtx;
private MyRequest request;
public AsyncTask(BizContext bizCtx, AsyncContext asyncCtx, MyRequest request) {
this.bizCtx = bizCtx;
this.asyncCtx = asyncCtx;
this.request = request;
}
@Override
public void run() {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(bizCtx.getRemoteAddress() + " == " + request);
response.setResp("from server -> " + request.getReq());
}
asyncCtx.sendResponse(response);
}
}
}
异步处理器中提供了一个自定义的线程池 executor
然后将业务逻辑的执行封装为一个 AsyncTask 任务
之后使用自定义线程池执行该任务,执行完毕之后,使用AsyncContext
返回响应。
看一下 RpcAsyncContext
public class RpcAsyncContext implements AsyncContext {
private RemotingContext ctx;
private RpcRequestCommand cmd;
private RpcRequestProcessor processor;
public RpcAsyncContext(final RemotingContext ctx, final RpcRequestCommand cmd,
final RpcRequestProcessor processor) {
this.ctx = ctx;
this.cmd = cmd;
this.processor = processor;
}
public void sendResponse(Object responseObject) {
processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
}
}
RpcAsyncContext#sendResponse 依旧是使用 RpcRequestProcessor#sendResponseIfNecessary 方法进行响应的发送。
通过 SOFABolt 源码分析8 - RemotingCommand 命令协议的设计我们知道最终请求会执行到 RpcRequestProcessor#dispatchToUserProcessor 方法,该方法中调用了 UserProcessor 自定义处理器。
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
final int id = cmd.getId();
final byte type = cmd.getType();
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
// 异步自定义处理器
if (processor instanceof AsyncUserProcessor) {
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
...
} else { // 同步自定义处理器
Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
...
}
}
public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
final RemotingCommand response) {
final int id = response.getId();
if (type != RpcCommandType.REQUEST_ONEWAY) {
RemotingCommand serializedResponse = response;
response.serialize();
ctx.writeAndFlush(serializedResponse);
}
}
- 先进行预处理:将 RemotingContext 封装到 DefaultBizContext 中,避免用户直接操作 RemotingContext,用户后续可以操作 BizContext。preHandleRequest在 MyAsyncServerUserProcessor 中做没有覆盖和异步处理,所以是同步执行;
public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
return new DefaultBizContext(remotingCtx);
}
- 进行业务逻辑处理:这一步在 MyAsyncServerUserProcessor 中异步处理;
- 如果是异步处理,到此结束;如果是同步处理,此处还需要等待业务逻辑处理完成之后,执行发送响应代码
三、多 interest 用户处理器
public class MyMultiInterestServerUserProcessor extends SyncMutiInterestUserProcessor {
@Override
public Object handleRequest(BizContext bizCtx, Object request) throws Exception {
if (request instanceof MyRequest) {
System.out.println("MyRequest: " + request);
return newResponse("from server - MyRequest: " + request);
}
if (request instanceof String) {
System.out.println("String: " + request);
return newResponse("from server - String: " + request);
}
return null;
}
@Override
public List<String> multiInterest() {
List<String> interestList = new ArrayList<String>(2);
interestList.add(MyRequest.class.getName());
interestList.add(String.class.getName());
return interestList;
}
private MyResponse newResponse(String resp) {
MyResponse response = new MyResponse();
response.setResp(resp);
return response;
}
}
更加实用的例子见 SOFABolt github
多 interest 用户处理器只在注册的时候有所不同(同一个 MyMultiInterestServerUserProcessor 实例会注册到两个 multiInterest中的每一个 key 上)。注册代码大致轮廓。
public static void registerUserProcessor(UserProcessor<?> processor,
ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
if (processor instanceof MultiInterestUserProcessor) {
registerUserProcessor((MultiInterestUserProcessor) processor, userProcessors);
} else {
// interest 不能 blank
if (StringUtils.isBlank(processor.interest())) {
throw new RuntimeException("Processor interest should not be blank!");
}
UserProcessor<?> preProcessor = userProcessors.putIfAbsent(processor.interest(), processor);
// 不能为同一个 interest 注册两个 processor + 不能重复注册
if (preProcessor != null) {
String errMsg = "Processor with interest key ["
+ processor.interest()
+ "] has already been registered to rpc server, can not register again!";
throw new RuntimeException(errMsg);
}
}
}
private static void registerUserProcessor(MultiInterestUserProcessor<?> processor, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
// // multiInterest 不能 epmty
if (null == processor.multiInterest() || processor.multiInterest().isEmpty()) {
throw new RuntimeException("Processor interest should not be blank!");
}
for (String interest : processor.multiInterest()) {
UserProcessor<?> preProcessor = userProcessors.putIfAbsent(interest, processor);
// 不能为同一个 interest 注册两个 processor + 不能重复注册
if (preProcessor != null) {
String errMsg = "Processor with interest key ["
+ interest
+ "] has already been registered to rpc server, can not register again!";
throw new RuntimeException(errMsg);
}
}
}
}
网友评论