美文网首页
浅析RPC框架的构建

浅析RPC框架的构建

作者: 谜碌小孩 | 来源:发表于2016-12-12 23:02 被阅读915次

    RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到远端,在远端反序列化之后调用接口的实现类的方法(接口主要是为了使用动态代理)。
    所以我们在实现RPC框架的时候需要选择合适的序列化与反序列化方式

    常见的序列化与反序列化协议

    1. Java内置的序列化
      生成的字节数组较大、不能跨语言、效率低下,不建议使用
    2. Hessian
      简单高效特点而使用广泛
    3. protobuf或者protostuff
      google开源的protobuf采用更为紧凑的二进制数组,表现更加优异,不过其使用方法极其繁琐,需要编写proto文件,然后使用protobuf的编译工具生成pojo类
      protostuff基于protobuf的序列化,在运行只需要添加模式schema即可序列化对象,schema可由RuntimeSchema生成,然后调用ProtostuffIOUtil的toByteArray()方法;反序列化也需要添加模式schema,然后调用ProtostuffIOUtil的mergeFrom()方法即可将字节还原成对象
    4. Kryo
      速度快,序列化后体积小,跨语言支持较复杂。spark推荐的序列化方式,需要注册对象
    5. Avro
      Avro 是属于 Hadoop 的一个子项目
    6. Thrift
      和protobuf一样不支持动态特性

    其中Hessian和Kryo使用最简单,protobuf/protostuff应该是效率最高的,Thrift应该是跨语言支持最好的,目前我知道的在大型开源项目中使用Avro只有Hadoop,感觉应该和Kyro类似。

    基于Netty开发简单的RPC框架

    首先说明,此框架不是我写的,这是Github上某user的思考成果。不偷不抢不吹牛逼,看大神代码,一起成长
    版权请见https://github.com/tang-jie/NettyRPC

    消息体MessageRequest与MessageResponse

    /**
     * @filename 
     * @Description  RPC请求体
     * @Author 
     * @Date 
     */
    public class MessageRequest implements Serializable {
        
        private String messageId;   //唯一ID
        private String className;   //接口名
        private String methodName;  //方法名
        private Class<?>[] typeParameters;  //参数类型
        private Object[] parametersVal;     //参数
    
        public String getMessageId() {
            return messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Class<?>[] getTypeParameters() {
            return typeParameters;
        }
    
        public void setTypeParameters(Class<?>[] typeParameters) {
            this.typeParameters = typeParameters;
        }
    
        public Object[] getParameters() {
            return parametersVal;
        }
    
        public void setParameters(Object[] parametersVal) {
            this.parametersVal = parametersVal;
        }
    
        public String toString() {
            return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"});
        }
    }
    
    /**
     * @filename
     * @Description  RPC响应体
     * @Author
     * @Date
     */
    public class MessageResponse implements Serializable {
    
        private String messageId;   //唯一ID
        private String error;       //错误消息
        private Object resultDesc;  //方法调用结果
    
        public String getMessageId() {
            return messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getError() {
            return error;
        }
    
        public void setError(String error) {
            this.error = error;
        }
    
        public Object getResult() {
            return resultDesc;
        }
    
        public void setResult(Object resultDesc) {
            this.resultDesc = resultDesc;
        }
    
        public String toString() {
            return ReflectionToStringBuilder.toString(this);
        }
    }
    
    /**
     * @filename
     * @Description  RPC服务映射容器
     * @Author
     * @Date
     */
    public class MessageKeyVal {
    
        private Map<String, Object> messageKeyVal;
    
        public void setMessageKeyVal(Map<String, Object> messageKeyVal) {
            this.messageKeyVal = messageKeyVal;
        }
    
        public Map<String, Object> getMessageKeyVal() {
            return messageKeyVal;
        }
    }
    

    自定义线程池和线程工厂

    作者可能觉得java内置的线程池和线程工厂不满足自己的需求,所以实现了自己的线程池和线程工厂,将调度任务放在特定的线程池去工作。
    自定义线程池其实还比较简单,我在http://www.jianshu.com/p/4f368625294b有过简单的介绍,线程池异常策略有AbortPolicy(默认的,直接抛出一个RejectedExecutionException异常)、DiscardPolicy(rejectedExecution直接是空方法,什么也不干,如果队列满了,后续的任务都抛弃掉)、DiscardOldestPolicy(将等待队列里最旧的任务踢走,让新任务得以执行)、CallerRunsPolicy(既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务)。

    /**
     * @filename
     * @Description  自定义线程工厂
     * @Author
     * @Date
     */
    public class NamedThreadFactory implements ThreadFactory {
    
        private static final AtomicInteger threadNumber = new AtomicInteger(1);
    
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
    
        private final String prefix;
    
        private final boolean daemoThread;
    
        private final ThreadGroup threadGroup;
    
        public NamedThreadFactory() {
            this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false);
        }
    
        public NamedThreadFactory(String prefix) {
            this(prefix, false);
        }
    
        public NamedThreadFactory(String prefix, boolean daemo) {
            this.prefix = prefix + "-thread-";
            daemoThread = daemo;
            SecurityManager s = System.getSecurityManager();
            threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
        }
    
        public Thread newThread(Runnable runnable) {
            String name = prefix + mThreadNum.getAndIncrement();
            Thread ret = new Thread(threadGroup, runnable, name, 0);
            ret.setDaemon(daemoThread);
            return ret;
        }
    
        public ThreadGroup getThreadGroup() {
            return threadGroup;
        }
    }
    
    /**
     * @filename
     * @Description  自定义线程池
     * @Author
     * @Date
     */
    public class RpcThreadPool {
        public static Executor getExecutor(int threads, int queues) {
            String name = "RpcThreadPool";
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>()
                            : (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
            
        }
    }
    
    /**
     * @filename
     * @Description  线程池异常策略
     * @Author
     * @Date
     */
    public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
        private final String threadName;
    
        public AbortPolicyWithReport(String threadName) {
            this.threadName = threadName;
        }
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String msg = String.format("RpcServer["
                    + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),"
                    + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]",
                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            System.out.println(msg);
            throw new RejectedExecutionException(msg);
        }
    }
    

    序列化与反序列化接口

    /**
     * @filename
     * @Description  RPC消息序列化/反序列化接口定义
     * @Author
     * @Date
     */
    public interface RpcSerialize {
    
        void serialize(OutputStream output, Object object) throws IOException;
    
        Object deserialize(InputStream input) throws IOException;
    }
    
    /**
     * @filename
     * @Description  RPC消息序序列化协议选择器接口
     * @Author
     * @Date
     */
    public interface RpcSerializeFrame {
    
        void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline);
    }
    
    /**
     * @filename
     * @Description  消息编码器
     * @Author
     * @Date
     */
    public class MessageEncoder extends MessageToByteEncoder<Object> {
    
        private MessageCodecUtil util = null;
    
        public MessageEncoder(final MessageCodecUtil util) {
            this.util = util;
        }
    
        protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception {
            util.encode(out, msg);
        }
    }
    
    /**
     * @filename
     * @Description  消息解码器
     * @Author
     * @Date
     */
    public class MessageDecoder extends ByteToMessageDecoder {
    
        final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH;
        private MessageCodecUtil util = null;
    
        public MessageDecoder(final MessageCodecUtil util) {
            this.util = util;
        }
    
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            if (in.readableBytes() < MessageDecoder.MESSAGE_LENGTH) {
                return;
            }
    
            in.markReaderIndex();
            int messageLength = in.readInt();
            
            if (messageLength < 0) {
                ctx.close();
            }
    
            if (in.readableBytes() < messageLength) {
                in.resetReaderIndex();
            } else {
                byte[] messageBody = new byte[messageLength];
                in.readBytes(messageBody);
    
                try {
                    Object obj = util.decode(messageBody);
                    out.add(obj);
                } catch (IOException ex) {
                    Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    }
    
    /**
     * @filename
     * @Description  RPC消息序序列化协议类型
     * @Author
     * @Date
     */
    public enum RpcSerializeProtocol {
    
        JDKSERIALIZE("jdknative"), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian");
    
        private String serializeProtocol;
    
        RpcSerializeProtocol(String serializeProtocol) {
            this.serializeProtocol = serializeProtocol;
        }
    
        public String toString() {
            ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
            return ReflectionToStringBuilder.toString(this);
        }
    
        public String getProtocol() {
            return serializeProtocol;
        }
    }
    

    Hessian序列化与反序列化的具体实现

    1. Hessian序列化/反序列化
    /**
     * @filename
     * @Description  Hessian序列化/反序列化实现
     * @Author
     * @Date
     */
    public class HessianSerialize implements RpcSerialize {
    
        public void serialize(OutputStream output, Object object) {
            Hessian2Output ho = new Hessian2Output(output);
            try {
                ho.startMessage();
                ho.writeObject(object);
                ho.completeMessage();
                ho.close();
                output.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public Object deserialize(InputStream input) {
            Object result = null;
            try {
                Hessian2Input hi = new Hessian2Input(input);
                hi.startMessage();
                result = hi.readObject();
                hi.completeMessage();
                hi.close();
                input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return result;
        }
    }
    
    1. Hessian编解码工具类
    /**
     * @filename
     * @Description  Hessian编解码工具类
     * @Author
     * @Date
     */
    public class HessianCodecUtil implements MessageCodecUtil {
    
        HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance();
        private static Closer closer = Closer.create();
    
        public HessianCodecUtil() {
    
        }
    
        public void encode(final ByteBuf out, final Object message) throws IOException {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                closer.register(byteArrayOutputStream);
                HessianSerialize hessianSerialization = pool.borrow();
                hessianSerialization.serialize(byteArrayOutputStream, message);
                byte[] body = byteArrayOutputStream.toByteArray();
                int dataLength = body.length;
                out.writeInt(dataLength);
                out.writeBytes(body);
                pool.restore(hessianSerialization);
            } finally {
                closer.close();
            }
        }
    
        public Object decode(byte[] body) throws IOException {
            try {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
                closer.register(byteArrayInputStream);
                HessianSerialize hessianSerialization = pool.borrow();
                Object object = hessianSerialization.deserialize(byteArrayInputStream);
                pool.restore(hessianSerialization);
                return object;
            } finally {
                closer.close();
            }
        }
    }
    
    1. 对象的池化
      这里并不是将Hessian对象池化,而是将序列化/反序列化工具对象池化,池化用的是commons.pool2,具体使用看这里http://ju.outofmemory.cn/entry/75642
    /**
     * @filename
     * @Description  Hessian序列化/反序列化对象工厂池
     * @Author
     * @Date
     */
    public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> {
    
        public HessianSerialize create() throws Exception {
            return createHessian();
        }
    
        public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) {
            return new DefaultPooledObject<>(hessian);
        }
    
        private HessianSerialize createHessian() {
            return new HessianSerialize();
        }
    }
    
    /**
     * @filename
     * @Description  Hessian序列化/反序列化池
     * @Author
     * @Date
     */
    public class HessianSerializePool {
    
        private GenericObjectPool<HessianSerialize> hessianPool;
        private static HessianSerializePool poolFactory = null;
    
        private HessianSerializePool() {
            hessianPool = new GenericObjectPool<>(new HessianSerializeFactory());
        }
    
        public static HessianSerializePool getHessianPoolInstance() {
            if (poolFactory == null) {
                synchronized (HessianSerializePool.class) {
                    if (poolFactory == null) {
                        poolFactory = new HessianSerializePool();
                    }
                }
            }
            return poolFactory;
        }
    
        public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
            hessianPool = new GenericObjectPool<>(new HessianSerializeFactory());
            
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            
            config.setMaxTotal(maxTotal);
            config.setMinIdle(minIdle);
            config.setMaxWaitMillis(maxWaitMillis);
            config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
            
            hessianPool.setConfig(config);
        }
    
        public HessianSerialize borrow() {
            try {
                return getHessianPool().borrowObject();
            } catch (final Exception ex) {
                ex.printStackTrace();
                return null;
            }
        }
    
        public void restore(final HessianSerialize object) {
            getHessianPool().returnObject(object);
        }
    
        public GenericObjectPool<HessianSerialize> getHessianPool() {
            return hessianPool;
        }
    }
    
    1. Hessian编码器
    /**
     * @filename
     * @Description  Hessian编码器
     * @Author
     * @Date
     */
    public class HessianEncoder extends MessageEncoder {
    
        public HessianEncoder(MessageCodecUtil util) {
            super(util);
        }
    }
    
    1. Hessian解码器
    /**
     * @filename
     * @Description  Hessian解码器
     * @Author
     * @Date
     */
    public class HessianDecoder extends MessageDecoder {
    
        public HessianDecoder(MessageCodecUtil util) {
            super(util);
        }
    }
    

    ** Kryo的编解码与Hessian的几乎一样,不同之处是Kyro有自己的池化KryoPool **

    RPC服务端的实现

    1. Rpc服务器执行模块(依赖于Spring)
    /**
     * @filename
     * @Description  Rpc服务器执行模块
     * @Author
     * @Date
     */
    public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {
    
        private String serverAddress;
        private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
    
        private final static String DELIMITER = ":";
    
        private Map<String, Object> handlerMap = new ConcurrentHashMap<>();
    
        private static ListeningExecutorService threadPoolExecutor;
    
        public MessageRecvExecutor(String serverAddress, String serializeProtocol) {
            this.serverAddress = serverAddress;
            this.serializeProtocol = Enum.valueOf(RpcSerializeProtocol.class, serializeProtocol);
        }
    
        /**
         * 异步执行,并将结果response返回给调用方
         * @param task
         * @param ctx
         * @param request
         * @param response
         */
        public static void submit(Callable<Boolean> task, final ChannelHandlerContext ctx, final MessageRequest request, final MessageResponse response) {
            if (threadPoolExecutor == null) {
                synchronized (MessageRecvExecutor.class) {
                    if (threadPoolExecutor == null) {
                        threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
                    }
                }
            }
    
            ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(task);
            Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
                public void onSuccess(Boolean result) {
                    ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            System.out.println("RPC Server Send message-id respone:" + request.getMessageId());
                        }
                    });
                }
    
                public void onFailure(Throwable t) {
                    t.printStackTrace();
                }
            }, threadPoolExecutor);
        }
    
        /**
         * 获取接口与实现类的对应
         * @param ctx
         * @throws BeansException
         */
        public void setApplicationContext(ApplicationContext ctx) throws BeansException {
            try {
                MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("org.microframe.rpc.model.MessageKeyVal"));
                Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();
    
                Set s = rpcServiceObject.entrySet();
                Iterator<Map.Entry<String, Object>> it = s.iterator();
                Map.Entry<String, Object> entry;
    
                while (it.hasNext()) {
                    entry = it.next();
                    handlerMap.put(entry.getKey(), entry.getValue());
                }
            } catch (ClassNotFoundException ex) {
                java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    
        /**
         * 经典的netty服务端套路
         * @throws Exception
         */
        public void afterPropertiesSet() throws Exception {
            ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");
    
            int parallel = Runtime.getRuntime().availableProcessors() * 2;
    
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup(parallel, threadRpcFactory, SelectorProvider.provider());
    
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
                        .childHandler(new MessageRecvChannelInitializer(handlerMap).buildRpcSerializeProtocol(serializeProtocol))   //通过serializeProtocol选择MessageRecvHandler
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);
    
                if (ipAddr.length == 2) {
                    String host = ipAddr[0];
                    int port = Integer.parseInt(ipAddr[1]);
                    ChannelFuture future = bootstrap.bind(host, port).sync();
                    System.out.printf("Netty RPC Server start success!\nip:%s\nport:%d\nprotocol:%s\n\n", host, port, serializeProtocol);
                    future.channel().closeFuture().sync();
                } else {
                    System.out.printf("Netty RPC Server start fail!\n");
                }
            } finally {
                worker.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
    }
    
    1. Rpc服务端管道初始化
    /**
     * @filename
     * @Description  Rpc服务器执行模块
     * @Author
     * @Date
     */
    public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        private RpcSerializeProtocol protocol;
        private RpcRecvSerializeFrame frame = null;
    
        MessageRecvChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
            this.protocol = protocol;
            return this;
        }
    
        MessageRecvChannelInitializer(Map<String, Object> handlerMap) {
            frame = new RpcRecvSerializeFrame(handlerMap);
        }
        
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            frame.select(protocol, pipeline);
        }
    }
    
    1. RPC服务端消息序列化协议框架
    /**
     * @filename
     * @Description  RPC服务端消息序列化协议框架
     * @Author
     * @Date
     */
    public class RpcRecvSerializeFrame implements RpcSerializeFrame {
    
        private Map<String, Object> handlerMap = null;
    
        public RpcRecvSerializeFrame(Map<String, Object> handlerMap) {
            this.handlerMap = handlerMap;
        }
    
        public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
            switch (protocol) {
                case JDKSERIALIZE: {
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
                    pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
                    pipeline.addLast(new ObjectEncoder());
                    pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                    pipeline.addLast(new MessageRecvHandler(handlerMap));
                    break;
                }
                case KRYOSERIALIZE: {
                    KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
                    pipeline.addLast(new KryoEncoder(util));
                    pipeline.addLast(new KryoDecoder(util));
                    pipeline.addLast(new MessageRecvHandler(handlerMap));
                    break;
                }
                case HESSIANSERIALIZE: {
                    HessianCodecUtil util = new HessianCodecUtil();
                    pipeline.addLast(new HessianEncoder(util));
                    pipeline.addLast(new HessianDecoder(util));
                    pipeline.addLast(new MessageRecvHandler(handlerMap));
                    break;
                }
            }
        }
    }
    
    1. Rpc服务器消息处理
    /**
     * @filename
     * @Description  Rpc服务器消息处理
     * @Author
     * @Date
     */
    public class MessageRecvHandler extends ChannelInboundHandlerAdapter {
    
        private final Map<String, Object> handlerMap;
    
        public MessageRecvHandler(Map<String, Object> handlerMap) {
            this.handlerMap = handlerMap;
        }
    
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MessageRequest request = (MessageRequest) msg;
            MessageResponse response = new MessageResponse();
            MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap);
            MessageRecvExecutor.submit(recvTask, ctx, request, response);
        }
    
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            ctx.close();
        }
    }
    
    1. Rpc服务器消息线程任务处理
    /**
     * @filename
     * @Description  Rpc服务器消息线程任务处理
     * @Author
     * @Date
     */
    public class MessageRecvInitializeTask implements Callable<Boolean> {
    
        private MessageRequest request = null;
        private MessageResponse response = null;
        private Map<String, Object> handlerMap = null;
        private ChannelHandlerContext ctx = null;
    
        public MessageResponse getResponse() {
            return response;
        }
    
        public MessageRequest getRequest() {
            return request;
        }
    
        public void setRequest(MessageRequest request) {
            this.request = request;
        }
    
        MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap) {
            this.request = request;
            this.response = response;
            this.handlerMap = handlerMap;
            this.ctx = ctx;
        }
    
        public Boolean call() {
            response.setMessageId(request.getMessageId());
            try {
                Object result = reflect(request);
                response.setResult(result);
                return Boolean.TRUE;
            } catch (Throwable t) {
                response.setError(t.toString());
                t.printStackTrace();
                System.err.printf("RPC Server invoke error!\n");
                return Boolean.FALSE;
            }
        }
    
        /**
         * 服务端调用本地方法
         * @param request
         * @return
         * @throws Throwable
         */
        private Object reflect(MessageRequest request) throws Throwable {
            String className = request.getClassName();
            Object serviceBean = handlerMap.get(className);
            String methodName = request.getMethodName();
            Object[] parameters = request.getParameters();
            return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
        }
    }
    

    RPC客户端的实现

    1. Rpc客户端执行模块 实际上都把任务交给了RpcServerLoader
    /**
     * @filename
     * @Description  Rpc客户端执行模块  
     * @Author
     * @Date
     */
    public class MessageSendExecutor {
    
        private RpcServerLoader loader = RpcServerLoader.getInstance();
    
        public MessageSendExecutor() {
        }
    
        public MessageSendExecutor(String serverAddress, RpcSerializeProtocol serializeProtocol) {
            loader.load(serverAddress, serializeProtocol);
        }
    
        /**
         * 启动客户端netty
         * @param serverAddress
         * @param serializeProtocol
         */
        public void setRpcServerLoader(String serverAddress, RpcSerializeProtocol serializeProtocol) {
            loader.load(serverAddress, serializeProtocol);
        }
    
        public void stop() {
            loader.unLoad();
        }
    
        /**
         * 动态代理
         * @param rpcInterface   必须是一个接口类
         * @param <T>
         * @return
         */
        public static <T> T execute(Class<T> rpcInterface) {
            return Reflection.newProxy(rpcInterface, new MessageSendProxy());
        }
    }
    
    1. rpc客户端配置加载
    /**
     * @filename
     * @Description  rpc服务器配置加载
     * @Author
     * @Date
     */
    public class RpcServerLoader {
    
        private volatile static RpcServerLoader rpcServerLoader;
        private final static String DELIMITER = ":";
        private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
    
        private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
        private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
        private static ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
        private MessageSendHandler messageSendHandler = null;
    
        private Lock lock = new ReentrantLock();
        private Condition connectStatus = lock.newCondition();
        private Condition handlerStatus = lock.newCondition();
    
        private RpcServerLoader() {
        }
    
        public static RpcServerLoader getInstance() {
            if (rpcServerLoader == null) {
                synchronized (RpcServerLoader.class) {
                    if (rpcServerLoader == null) {
                        rpcServerLoader = new RpcServerLoader();
                    }
                }
            }
            return rpcServerLoader;
        }
    
        /**
         * 客户端启动
         * @param serverAddress
         * @param serializeProtocol
         */
        public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {
            String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
            if (ipAddr.length == 2) {
                String host = ipAddr[0];
                int port = Integer.parseInt(ipAddr[1]);
                final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
    
                ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, serializeProtocol));
    
                Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
                    public void onSuccess(Boolean result) {
                        try {
                            lock.lock();
    
                            if (messageSendHandler == null) {
                                handlerStatus.await();
                            }
    
                            if (result == Boolean.TRUE && messageSendHandler != null) {
                                connectStatus.signalAll();
                            }
                        } catch (InterruptedException ex) {
                            Logger.getLogger(RpcServerLoader.class.getName()).log(Level.SEVERE, null, ex);
                        } finally {
                            lock.unlock();
                        }
                    }
    
                    public void onFailure(Throwable t) {
                        t.printStackTrace();
                    }
                }, threadPoolExecutor);
            }
        }
    
        public void setMessageSendHandler(MessageSendHandler messageInHandler) {
            try {
                lock.lock();
                this.messageSendHandler = messageInHandler;
                handlerStatus.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public MessageSendHandler getMessageSendHandler() throws InterruptedException {
            try {
                lock.lock();
                if (messageSendHandler == null) {
                    connectStatus.await();
                }
                return messageSendHandler;
            } finally {
                lock.unlock();
            }
        }
    
        public void unLoad() {
            messageSendHandler.close();
            threadPoolExecutor.shutdown();
            eventLoopGroup.shutdownGracefully();
        }
    
        public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {
            this.serializeProtocol = serializeProtocol;
        }
    }
    
    1. Rpc客户端线程任务处理
    /**
     * @filename
     * @Description  Rpc客户端线程任务处理
     * @Author
     * @Date
     */
    public class MessageSendInitializeTask implements Callable<Boolean> {
    
        private EventLoopGroup eventLoopGroup = null;
        private InetSocketAddress serverAddress = null;
        private RpcSerializeProtocol protocol;
    
        MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcSerializeProtocol protocol) {
            this.eventLoopGroup = eventLoopGroup;
            this.serverAddress = serverAddress;
            this.protocol = protocol;
        }
    
        @Override
        public Boolean call() {
            Bootstrap b = new Bootstrap();
            b.group(eventLoopGroup)
                    .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new MessageSendChannelInitializer().buildRpcSerializeProtocol(protocol));
    
            ChannelFuture channelFuture = b.connect(serverAddress);
            channelFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(final ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
                        RpcServerLoader.getInstance().setMessageSendHandler(handler);
                    }
                }
            });
            return Boolean.TRUE;
        }
    }
    
    1. Rpc客户端管道初始化
    /**
     * @filename
     * @Description  Rpc客户端管道初始化
     * @Author
     * @Date
     */
    public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        private RpcSerializeProtocol protocol;
        private RpcSendSerializeFrame frame = new RpcSendSerializeFrame();
    
        MessageSendChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) {
            this.protocol = protocol;
            return this;
        }
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            frame.select(protocol, pipeline);
        }
    }
    
    1. RPC客户端消息序列化协议
    /**
     * @filename
     * @Description  RPC客户端消息序列化协议框架
     * @Author
     * @Date
     */
    public class RpcSendSerializeFrame implements RpcSerializeFrame {
    
        public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) {
            switch (protocol) {
                case JDKSERIALIZE: {
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH));
                    pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));
                    pipeline.addLast(new ObjectEncoder());
                    pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                    pipeline.addLast(new MessageSendHandler());
                    break;
                }
                case KRYOSERIALIZE: {
                    KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());
                    pipeline.addLast(new KryoEncoder(util));
                    pipeline.addLast(new KryoDecoder(util));
                    pipeline.addLast(new MessageSendHandler());
                    break;
                }
                case HESSIANSERIALIZE: {
                    HessianCodecUtil util = new HessianCodecUtil();
                    pipeline.addLast(new HessianEncoder(util));
                    pipeline.addLast(new HessianDecoder(util));
                    pipeline.addLast(new MessageSendHandler());
                    break;
                }
            }
        }
    }
    
    1. Rpc消息回调
    /**
     * @filename
     * @Description  Rpc消息回调
     * @Author
     * @Date
     */
    public class MessageCallBack {
    
        private MessageRequest request;
        private MessageResponse response;
        private Lock lock = new ReentrantLock();
        private Condition finish = lock.newCondition();
    
        public MessageCallBack(MessageRequest request) {
            this.request = request;
        }
    
        public Object start() throws InterruptedException {
            try {
                lock.lock();
                finish.await(10*1000, TimeUnit.MILLISECONDS);
                if (this.response != null) {
                    return this.response.getResult();
                } else {
                    return null;
                }
            } finally {
                lock.unlock();
            }
        }
    
        public void over(MessageResponse reponse) {
            try {
                lock.lock();
                finish.signal();
                this.response = reponse;
            } finally {
                lock.unlock();
            }
        }
    }
    
    1. Rpc客户端代理
      ** 然而,只有当代理开始执行方法时,客户端才会发送request,然后异步等待执行结果 **
    /**
     * @filename
     * @Description  Rpc客户端代理
     * @Author
     * @Date
     */
    public class MessageSendProxy extends AbstractInvocationHandler {
     
        public Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
            MessageRequest request = new MessageRequest();
            request.setMessageId(UUID.randomUUID().toString());
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setTypeParameters(method.getParameterTypes());
            request.setParameters(args);
    
            MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
            //将request序列化发送到netty server端并异步获取结果
            MessageCallBack callBack = handler.sendRequest(request);
            return callBack.start();
        }
    }
    

    相关文章

      网友评论

        本文标题:浅析RPC框架的构建

        本文链接:https://www.haomeiwen.com/subject/yydvmttx.html