美文网首页sofabolt
SOFABolt 源码分析9 - UserProcessor 自

SOFABolt 源码分析9 - UserProcessor 自

作者: 原水寒 | 来源:发表于2018-10-06 16:51 被阅读96次
    image.png

    如上图所示,整个 UserProcessor 自定义处理器从两个角度进行分类:

    • interest(感兴趣的请求数据类型)是单个还是多个
    • UserProcessor 是异步还是同步
      两种分类组合成四种底层抽象类,由用户选择实现。其中 AbstractUserProcessor 和 AbstractMultiInterestUserProcessor 可以看做是接口的适配器(为接口的每一个方法做了默认实现,使得最底层的四个抽象仅仅实现自己需要实现的方法就可以),所以个人更倾向于取名为 UserProcessorAdapter 和 MultiInterestUserProcessorAdapter。

    一、同步用户处理器

    public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
        @Override
        public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
            MyResponse response = new MyResponse();
            if (request != null) {
                System.out.println(request);
                response.setResp("from server -> " + request.getReq());
            }
            return response;
        }
    
        @Override
        public String interest() {
            return MyRequest.class.getName();
        }
    }
    

    二、异步用户处理器

    public class MyAsyncServerUserProcessor extends AsyncUserProcessor<MyRequest> {
        private static final Executor executor = Executors.newCachedThreadPool();
    
        @Override
        public void handleRequest(BizContext bizCtx, final AsyncContext asyncCtx, MyRequest request) {
            executor.execute(new AsyncTask(bizCtx, asyncCtx, request));
        }
    
        @Override
        public String interest() {
            return MyRequest.class.getName();
        }
    
        class AsyncTask implements Runnable {
            private BizContext   bizCtx;
            private AsyncContext asyncCtx;
            private MyRequest request;
    
            public AsyncTask(BizContext bizCtx, AsyncContext asyncCtx, MyRequest request) {
                this.bizCtx = bizCtx;
                this.asyncCtx = asyncCtx;
                this.request = request;
            }
    
            @Override
            public void run() {
                MyResponse response = new MyResponse();
                if (request != null) {
                    System.out.println(bizCtx.getRemoteAddress() + " == " + request);
                    response.setResp("from server -> " + request.getReq());
                }
                asyncCtx.sendResponse(response);
            }
        }
    }
    

    异步处理器中提供了一个自定义的线程池 executor
    然后将业务逻辑的执行封装为一个 AsyncTask 任务
    之后使用自定义线程池执行该任务,执行完毕之后,使用 AsyncContext 返回响应。

    看一下 RpcAsyncContext

    public class RpcAsyncContext implements AsyncContext {
        private RemotingContext     ctx;
        private RpcRequestCommand   cmd;
        private RpcRequestProcessor processor;
    
        public RpcAsyncContext(final RemotingContext ctx, final RpcRequestCommand cmd,
                               final RpcRequestProcessor processor) {
            this.ctx = ctx;
            this.cmd = cmd;
            this.processor = processor;
        }
    
        public void sendResponse(Object responseObject) {
            processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
        }
    }
    

    RpcAsyncContext#sendResponse 依旧是使用 RpcRequestProcessor#sendResponseIfNecessary 方法进行响应的发送。

    通过 SOFABolt 源码分析8 - RemotingCommand 命令协议的设计我们知道最终请求会执行到 RpcRequestProcessor#dispatchToUserProcessor 方法,该方法中调用了 UserProcessor 自定义处理器。

        private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
            final int id = cmd.getId();
            final byte type = cmd.getType();
            UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
            // 异步自定义处理器
            if (processor instanceof AsyncUserProcessor) {
                processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
                ...
            } else {  // 同步自定义处理器
                Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
                sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
                ...
            }
        }
    
        public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
                                            final RemotingCommand response) {
            final int id = response.getId();
            if (type != RpcCommandType.REQUEST_ONEWAY) {
                RemotingCommand serializedResponse = response;
                response.serialize();
    
                ctx.writeAndFlush(serializedResponse);
            } 
        }
    
    • 先进行预处理:将 RemotingContext 封装到 DefaultBizContext 中,避免用户直接操作 RemotingContext,用户后续可以操作 BizContext。preHandleRequest在 MyAsyncServerUserProcessor 中做没有覆盖和异步处理,所以是同步执行;
        public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
            return new DefaultBizContext(remotingCtx);
        }
    
    • 进行业务逻辑处理:这一步在 MyAsyncServerUserProcessor 中异步处理;
    • 如果是异步处理,到此结束;如果是同步处理,此处还需要等待业务逻辑处理完成之后,执行发送响应代码

    三、多 interest 用户处理器

    public class MyMultiInterestServerUserProcessor extends SyncMutiInterestUserProcessor {
        @Override
        public Object handleRequest(BizContext bizCtx, Object request) throws Exception {
            if (request instanceof MyRequest) {
                System.out.println("MyRequest: " + request);
                return newResponse("from server - MyRequest: " + request);
            }
            if (request instanceof String) {
                System.out.println("String: " + request);
                return newResponse("from server - String: " + request);
            }
            return null;
        }
    
        @Override
        public List<String> multiInterest() {
            List<String> interestList = new ArrayList<String>(2);
            interestList.add(MyRequest.class.getName());
            interestList.add(String.class.getName());
            return interestList;
        }
    
        private MyResponse newResponse(String resp) {
            MyResponse response = new MyResponse();
            response.setResp(resp);
            return response;
        }
    }
    

    更加实用的例子见 SOFABolt github

    多 interest 用户处理器只在注册的时候有所不同(同一个 MyMultiInterestServerUserProcessor 实例会注册到两个 multiInterest中的每一个 key 上)。注册代码大致轮廓。

        public static void registerUserProcessor(UserProcessor<?> processor,
                                                 ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
            if (processor instanceof MultiInterestUserProcessor) {
                registerUserProcessor((MultiInterestUserProcessor) processor, userProcessors);
            } else {
                // interest 不能 blank
                if (StringUtils.isBlank(processor.interest())) {
                    throw new RuntimeException("Processor interest should not be blank!");
                }
                UserProcessor<?> preProcessor = userProcessors.putIfAbsent(processor.interest(), processor);
                // 不能为同一个 interest 注册两个 processor + 不能重复注册
                if (preProcessor != null) {
                    String errMsg = "Processor with interest key ["
                                    + processor.interest()
                                    + "] has already been registered to rpc server, can not register again!";
                    throw new RuntimeException(errMsg);
                }
            }
        }
    
        private static void registerUserProcessor(MultiInterestUserProcessor<?> processor, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
            // // multiInterest 不能 epmty
            if (null == processor.multiInterest() || processor.multiInterest().isEmpty()) {
                throw new RuntimeException("Processor interest should not be blank!");
            }
            for (String interest : processor.multiInterest()) {
                UserProcessor<?> preProcessor = userProcessors.putIfAbsent(interest, processor);
                 // 不能为同一个 interest 注册两个 processor + 不能重复注册
                if (preProcessor != null) {
                    String errMsg = "Processor with interest key ["
                                    + interest
                                    + "] has already been registered to rpc server, can not register again!";
                    throw new RuntimeException(errMsg);
                }
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:SOFABolt 源码分析9 - UserProcessor 自

        本文链接:https://www.haomeiwen.com/subject/opdhaftx.html