美文网首页sofabolt
SOFABolt 源码分析9 - UserProcessor 自

SOFABolt 源码分析9 - UserProcessor 自

作者: 原水寒 | 来源:发表于2018-10-06 16:51 被阅读96次
image.png

如上图所示,整个 UserProcessor 自定义处理器从两个角度进行分类:

  • interest(感兴趣的请求数据类型)是单个还是多个
  • UserProcessor 是异步还是同步
    两种分类组合成四种底层抽象类,由用户选择实现。其中 AbstractUserProcessor 和 AbstractMultiInterestUserProcessor 可以看做是接口的适配器(为接口的每一个方法做了默认实现,使得最底层的四个抽象仅仅实现自己需要实现的方法就可以),所以个人更倾向于取名为 UserProcessorAdapter 和 MultiInterestUserProcessorAdapter。

一、同步用户处理器

public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
    @Override
    public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
        MyResponse response = new MyResponse();
        if (request != null) {
            System.out.println(request);
            response.setResp("from server -> " + request.getReq());
        }
        return response;
    }

    @Override
    public String interest() {
        return MyRequest.class.getName();
    }
}

二、异步用户处理器

public class MyAsyncServerUserProcessor extends AsyncUserProcessor<MyRequest> {
    private static final Executor executor = Executors.newCachedThreadPool();

    @Override
    public void handleRequest(BizContext bizCtx, final AsyncContext asyncCtx, MyRequest request) {
        executor.execute(new AsyncTask(bizCtx, asyncCtx, request));
    }

    @Override
    public String interest() {
        return MyRequest.class.getName();
    }

    class AsyncTask implements Runnable {
        private BizContext   bizCtx;
        private AsyncContext asyncCtx;
        private MyRequest request;

        public AsyncTask(BizContext bizCtx, AsyncContext asyncCtx, MyRequest request) {
            this.bizCtx = bizCtx;
            this.asyncCtx = asyncCtx;
            this.request = request;
        }

        @Override
        public void run() {
            MyResponse response = new MyResponse();
            if (request != null) {
                System.out.println(bizCtx.getRemoteAddress() + " == " + request);
                response.setResp("from server -> " + request.getReq());
            }
            asyncCtx.sendResponse(response);
        }
    }
}

异步处理器中提供了一个自定义的线程池 executor
然后将业务逻辑的执行封装为一个 AsyncTask 任务
之后使用自定义线程池执行该任务,执行完毕之后,使用 AsyncContext 返回响应。

看一下 RpcAsyncContext

public class RpcAsyncContext implements AsyncContext {
    private RemotingContext     ctx;
    private RpcRequestCommand   cmd;
    private RpcRequestProcessor processor;

    public RpcAsyncContext(final RemotingContext ctx, final RpcRequestCommand cmd,
                           final RpcRequestProcessor processor) {
        this.ctx = ctx;
        this.cmd = cmd;
        this.processor = processor;
    }

    public void sendResponse(Object responseObject) {
        processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
    }
}

RpcAsyncContext#sendResponse 依旧是使用 RpcRequestProcessor#sendResponseIfNecessary 方法进行响应的发送。

通过 SOFABolt 源码分析8 - RemotingCommand 命令协议的设计我们知道最终请求会执行到 RpcRequestProcessor#dispatchToUserProcessor 方法,该方法中调用了 UserProcessor 自定义处理器。

    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        final int id = cmd.getId();
        final byte type = cmd.getType();
        UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
        // 异步自定义处理器
        if (processor instanceof AsyncUserProcessor) {
            processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
            ...
        } else {  // 同步自定义处理器
            Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
            ...
        }
    }

    public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
                                        final RemotingCommand response) {
        final int id = response.getId();
        if (type != RpcCommandType.REQUEST_ONEWAY) {
            RemotingCommand serializedResponse = response;
            response.serialize();

            ctx.writeAndFlush(serializedResponse);
        } 
    }
  • 先进行预处理:将 RemotingContext 封装到 DefaultBizContext 中,避免用户直接操作 RemotingContext,用户后续可以操作 BizContext。preHandleRequest在 MyAsyncServerUserProcessor 中做没有覆盖和异步处理,所以是同步执行;
    public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
        return new DefaultBizContext(remotingCtx);
    }
  • 进行业务逻辑处理:这一步在 MyAsyncServerUserProcessor 中异步处理;
  • 如果是异步处理,到此结束;如果是同步处理,此处还需要等待业务逻辑处理完成之后,执行发送响应代码

三、多 interest 用户处理器

public class MyMultiInterestServerUserProcessor extends SyncMutiInterestUserProcessor {
    @Override
    public Object handleRequest(BizContext bizCtx, Object request) throws Exception {
        if (request instanceof MyRequest) {
            System.out.println("MyRequest: " + request);
            return newResponse("from server - MyRequest: " + request);
        }
        if (request instanceof String) {
            System.out.println("String: " + request);
            return newResponse("from server - String: " + request);
        }
        return null;
    }

    @Override
    public List<String> multiInterest() {
        List<String> interestList = new ArrayList<String>(2);
        interestList.add(MyRequest.class.getName());
        interestList.add(String.class.getName());
        return interestList;
    }

    private MyResponse newResponse(String resp) {
        MyResponse response = new MyResponse();
        response.setResp(resp);
        return response;
    }
}

更加实用的例子见 SOFABolt github

多 interest 用户处理器只在注册的时候有所不同(同一个 MyMultiInterestServerUserProcessor 实例会注册到两个 multiInterest中的每一个 key 上)。注册代码大致轮廓。

    public static void registerUserProcessor(UserProcessor<?> processor,
                                             ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
        if (processor instanceof MultiInterestUserProcessor) {
            registerUserProcessor((MultiInterestUserProcessor) processor, userProcessors);
        } else {
            // interest 不能 blank
            if (StringUtils.isBlank(processor.interest())) {
                throw new RuntimeException("Processor interest should not be blank!");
            }
            UserProcessor<?> preProcessor = userProcessors.putIfAbsent(processor.interest(), processor);
            // 不能为同一个 interest 注册两个 processor + 不能重复注册
            if (preProcessor != null) {
                String errMsg = "Processor with interest key ["
                                + processor.interest()
                                + "] has already been registered to rpc server, can not register again!";
                throw new RuntimeException(errMsg);
            }
        }
    }

    private static void registerUserProcessor(MultiInterestUserProcessor<?> processor, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
        // // multiInterest 不能 epmty
        if (null == processor.multiInterest() || processor.multiInterest().isEmpty()) {
            throw new RuntimeException("Processor interest should not be blank!");
        }
        for (String interest : processor.multiInterest()) {
            UserProcessor<?> preProcessor = userProcessors.putIfAbsent(interest, processor);
             // 不能为同一个 interest 注册两个 processor + 不能重复注册
            if (preProcessor != null) {
                String errMsg = "Processor with interest key ["
                                + interest
                                + "] has already been registered to rpc server, can not register again!";
                throw new RuntimeException(errMsg);
            }
        }
    }
}

相关文章

网友评论

    本文标题:SOFABolt 源码分析9 - UserProcessor 自

    本文链接:https://www.haomeiwen.com/subject/opdhaftx.html