美文网首页
打造自己的通信框架四——NettyServer搭建

打造自己的通信框架四——NettyServer搭建

作者: alonwang | 来源:发表于2020-08-26 19:42 被阅读0次

    前言

    从客户端发出一条消息到服务端接收并处理这条消息,大概可以分成下面的流程

    image.png

    黄色部分为客户端逻辑,蓝色为网络传输,红色为服务端逻辑,本文关注的是服务端逻辑。

    正文

    将二进制解码为特定格式,将protobuf封装为自定义格式都是这个处理链的一个单元。
    在Netty中,ChannelHandler充当了单元,ChannelPipeline充当处理链。处理链的构造如下

            ChannelPipeline pipeline = ch.pipeline();
            //闲置链接监听
            pipeline.addLast(new IdleStateHandler(60, 60, 0));
            //闲置链接处理单元
            pipeline.addLast(idleEventHandler);
            //1 protobuf解码单元
            pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
            pipeline.addLast(protobufDecoder);
            //2 protobuf封装为自定义协议Request单元
            pipeline.addLast(protobufRequestDecoder);
            //3 Request分发执行单元
            pipeline.addLast(requestDispatchHandler);
            //two protobuuf编码单元
            pipeline.addLast(new LengthFieldPrepender(4));
            pipeline.addLast(protobufEncoder);
            //one Response转换为protobuf单元
            pipeline.addLast(responseEncoder);
    

    标号1,2,3为我们需要关注的逻辑: 1.将二进制数据解码为protobuf -> 2.将protobuf编码为Request -> 3.Request分发执行
    在具体实现上对应下面三点

    1. 从二进制数据转换为proto生成的java类Base.Request
    2. 从Base.Request转换为AbstractRequest的具体实现类
    3. 根据AbstractRequest找到对应的方法,通过反射调用

    下面我们详解这三个过程

    将二进制数据解码为protobuf

    在这一步我们将二进制数据转换成之前定义的Base.Request
    这里使用的两个ChannelHandler都是Netty提供的,不过多叙述

    • io.netty.handler.codec.LengthFieldBasedFrameDecoder
    • io.netty.handler.codec.protobuf.ProtobufDecoder

    将protobuf编码为Request

    在这一步,MessageFactory.createRequest()根据moduleId和commandId找出具体的Request,并用proto对象去填充数据。再将创建的AbstractMessage传递给下一层
    MessageRegistry稍后解释)。

    public class ProtobufRequestDecoder extends MessageToMessageDecoder<Request> {
    
    
        @Override
        protected void decode(ChannelHandlerContext ctx, Request msg, List<Object> out) throws Exception {
            int moduleId = msg.getModuleId();
            int commandId = msg.getCommandId();
            AbstractRequest request = MessageFactory.createRequest(moduleId, commandId, msg.getData());
            out.add(request);
        }
    }
    
    public static <T extends AbstractRequest> T createRequest(int moduleId, int commandId, Object body) {
            Class<? extends AbstractMessage> messageClazz = Context.getMessageRegistry().getMessage(moduleId, commandId);
            Preconditions.checkNotNull(messageClazz, "moduleId({}),commandId({}) no relate Message", moduleId, commandId);
            Preconditions.checkArgument(AbstractRequest.class.isAssignableFrom(messageClazz));
            try {
                Constructor<? extends AbstractMessage> constructor = messageClazz.getConstructor();
                AbstractMessage abstractMessage = constructor.newInstance();
                AbstractRequest abstractRequest = (AbstractRequest) abstractMessage;
                MessageHeader header = new RequestHeader(moduleId, commandId);
                abstractRequest.setHeader(header);
                abstractRequest.setBody(body);
                abstractRequest.decode();
                return (T) abstractRequest;
            } catch (Exception e) {
                log.error("create request error", e);
            }
            return null;
        }
    

    Request分发执行

    这一步将Channel转换为User,将转发的职责交给RequestDispatchService
    channel2SessionMap记录了Channel和User的对应关系。User是客户端的唯一标识,里面记录了客户端的信息,并承载了异步串行无锁化的功能,具体实现后文会详述。

    public class RequestDispatchHandler extends SimpleChannelInboundHandler<AbstractRequest> {
        private static Map<Channel, User> channel2SessionMap = new ConcurrentHashMap<>();
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, AbstractRequest msg) throws Exception {
            User user = channel2SessionMap.compute(ctx.channel(), (c, s) -> {
                if (null == s) {
                    User createUser = new User();
                    createUser.setChannel(c);
                    return createUser;
                } else {
                    return s;
                }
    
            });
    
            Context.getRequestDispatchService().dispatch(user, msg);
        }
    }
    

    RequestDispatchService也很简单,找到AbstractRequest对应的方法,再通过反射调用去执行。
    user.execute(...)将这条消息放到用户的消息队列去执行

    @Component
    public class RequestDispatchService {
        @Autowired
        private MessageRegistry messageRegistry;
    
        public void dispatch(User user, AbstractRequest request) {
            MethodWrapper wrapper = messageRegistry.getWrapper(request.header().getModuleId(), request.header().getCommandId());
            Preconditions.checkNotNull(wrapper);
            user.execute(new MessageTask(wrapper, request));
    
        }
    }
    

    至此,服务端逻辑结束,下面来看一下这个过程中最核心的MessageRegistry

    MessageRegistry

    MessageRegistry有两个功能

    • 计算并存储moduleId+commandId到AbstractMessage的映射
    • 计算并存储AbstractMessage到具体方法的映射
      这样就从消息映射到了方法执行,处理消息 ~= 使用数据执行特定方法

    moduleId+commandId到AbstractMessage的映射

    举个例子
    所有AbstractMessage的具体实现,都会带有@MessageWrapper注解,以此将具体的Message和moduleId+commandId关联起来,例如

     @MessageWrapper(moduleId = 1, commandId = 1)
     public class HelloRequest extends AbstractRequest {
        
     }
    

    MessageRegistry会扫描类信息,将其关系记录下来。

            Reflections reflections = new Reflections(TransportStarter.class.getPackage().getName());
            //记录所有的Message
            Map<String, Class<? extends AbstractMessage>> tempMessageMap = new HashMap<>();
            Set<Class<?>> wrappersClasses = reflections.getTypesAnnotatedWith(MessageWrapper.class);
            for (Class<?> wrapperClazz : wrappersClasses) {
                Preconditions.checkArgument(!Modifier.isAbstract(wrapperClazz.getModifiers()), "{} illegal,@MessageWrapper annotated class can't be abstract", wrapperClazz.getSimpleName());
                Preconditions.checkArgument(AbstractMessage.class.isAssignableFrom(wrapperClazz), "{} illegal,@MessageWrapper annotated class must be sub type of AbstractCSMessage", wrapperClazz.getSimpleName());
                MessageWrapper wrapperAnnotation = wrapperClazz.getAnnotation(MessageWrapper.class);
                Preconditions.checkArgument(wrapperAnnotation.moduleId() > 0 && wrapperAnnotation.commandId() > 0);
                String key = getKey(wrapperAnnotation.moduleId(), wrapperAnnotation.commandId());
                if (tempMessageMap.containsKey(key)) {
                    Class<? extends AbstractMessage> old = tempMessageMap.get(key);
                    throw new IllegalArgumentException(old.getSimpleName() + " and " + wrapperClazz.getSimpleName() + " conflict,please check @MessageWrapper's moduleId and commandId");
                }
                tempMessageMap.put(key, (Class<? extends AbstractMessage>) wrapperClazz);
            }
            messages = Collections.unmodifiableMap(tempMessageMap);
    
    

    计算并存储AbstractMessage到具体方法的映射

    一个消息必定有一个对应的方法,@MessageHandler注解标识某个接口中的某些方法是和消息相关联的

    @MessageHandler
    public interface HelloService {
        void hello(User user, HelloRequest request);
    
    }
    

    MessageRegistry再将这个方法封装一下,和消息关联起来

            //解析所有message对应的方法
            Map<Class<? extends AbstractMessage>, MethodWrapper> tempMethodWrappers = new HashMap<>();
            Set<Class<?>> handlerClasses = reflections.getTypesAnnotatedWith(MessageHandler.class, true);
            for (Class<?> handlerClazz : handlerClasses) {
                Preconditions.checkArgument(Modifier.isInterface(handlerClazz.getModifiers()), "{} illegal,@MessageHandler annotated class must be interface");
                Object bean = applicationContext.getBean(handlerClazz);
                Preconditions.checkNotNull(bean, "{} annotated with @MessageHandler but no instance");
                var methods = handlerClazz.getDeclaredMethods();
                for (Method method : methods) {
                    var parameterTypes = method.getParameterTypes();
                    List<Class<?>> satisfyParameters = Arrays.stream(parameterTypes).filter(AbstractMessage.class::isAssignableFrom).filter(type -> messages.containsValue(type)).collect(Collectors.toList());
                    if (satisfyParameters.isEmpty()) {
                        continue;
                    }
                    Preconditions.checkArgument(satisfyParameters.size() == 1, "method {} signature illegal,parameters should only contain exactly one AbstractCSMessage", method);
                    Class<? extends AbstractMessage> messageClazz = (Class<? extends AbstractMessage>) satisfyParameters.get(0);
                    Preconditions.checkArgument(!tempMethodWrappers.containsKey(messageClazz), "parameter illegal,{} appear in different methods", messageClazz);
                    MethodWrapper methodWrapper = new MethodWrapper(method, bean);
                    tempMethodWrappers.put(messageClazz, methodWrapper);
                }
            }
            messageMethods = Collections.unmodifiableMap(tempMethodWrappers);
    

    后记

    通过这套机制,打通了从消息接收到执行的一整套逻辑,开发者不需要关注消息是如何处理的,只要定义好必备的东西就好。

    相关文章

      网友评论

          本文标题:打造自己的通信框架四——NettyServer搭建

          本文链接:https://www.haomeiwen.com/subject/vhvfsktx.html