手写RPC框架

作者: 匠丶 | 来源:发表于2018-09-08 11:23 被阅读0次

    在分析RMI原理一文中,我们知道RMI是通过底层封装TCP网络通信实现。
    基于此思路本文从以下切入点实现一个简单的RPC框架,反之也促进了对RMI的理解,相辅相成。

    服务端

    服务端通过端口发布一个服务,监听端口请求,通过反射调用本地方法,并返回结果给调用方。
    1、定义要发布的接口和接口实现类

    public interface IHello {
        String say();
    }
    
    public class IHelloImpl implements IHello {
        @Override
        public String say() {
            return "Hello";
        }
    }
    
    public class ServerDemo {
        public static void main(String[] args) {
            IHello iHello = new IHelloImpl();
            RpcServer rpcServer = new RpcServer();
            rpcServer.publisher(iHello,8080);
        }
    }
    

    2、定义发布中心,通过线程池来监听请求

    public class RpcServer {
        //创建线程池
        private static final ExecutorService executorService=Executors.newCachedThreadPool();
    
        public void publisher(Object service,Integer port){
            try {
                ServerSocket serverSocket = new ServerSocket(port);
                //循环监听
                while (true){
                    Socket socket = serverSocket.accept();
                    executorService.submit(new ProcessorHandler(socket,service));
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    3、处理请求,通过反射得到结果,并将结果传递给客户端

    public class ProcessorHandler implements Runnable {
        private Socket socket;
    
        private Object service;
    
        public ProcessorHandler(Socket socket, Object service) {
            this.socket = socket;
            this.service = service;
        }
    
        @Override
        public void run(){
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
                Object result = invoke(rpcRequest);
    
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(result);
                objectOutputStream.flush();
                objectOutputStream.close();
                objectInputStream.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
     private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
            Object[] args = rpcRequest.getParams();
            Class[] types = null;
            if(args != null){
                types = new Class[args.length];
                for(int i=0 ; i<args.length;i++){
                    types[i] = args[i].getClass();
                }
            }
    
            Method method = service.getClass().getMethod(rpcRequest.getMethod(),types);
            return method.invoke(service,args);
        }
    }
    

    客户端

    寻找服务,发送请求,得到结果。
    1、定义代理,通过代理调用服务端接口

    public class RpcClientProxy {
    
        public <T> T clientProxy(Class<T> target,String host,Integer port){
            return (T)Proxy.newProxyInstance(target.getClassLoader(),new Class[]{target},new RemoteInvocationHandler(host,port));
        }
    }
    
    public class RemoteInvocationHandler implements InvocationHandler {
        public String host;
    
        public Integer port;
    
        public RemoteInvocationHandler(String host, Integer port) {
            this.host = host;
            this.port = port;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
           RpcRequest rpcRequest = new RpcRequest();
           rpcRequest.setName(method.getDeclaringClass().getName());
           rpcRequest.setMethod(method.getName());
           rpcRequest.setParams(args);
           TCPTransport tcpTransport = new TCPTransport(host,port);
           return tcpTransport.send(rpcRequest);
        }
    

    2、封装网络通信过程

    public class TCPTransport {
        public String host;
    
        public Integer port;
    
        public TCPTransport(String host, Integer port) {
            this.host = host;
            this.port = port;
        }
        private Socket newSocket(){
            try {
                return new Socket(host,port);
            } catch (IOException e) {
                throw new RuntimeException("连接建立失败");
            }
        }
    
        public Object send(RpcRequest rpcRequest){
            Socket socket = newSocket();
            //发送请求到服务端
            try {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(rpcRequest);
                objectOutputStream.flush();
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                Object result = objectInputStream.readObject();
                objectInputStream.close();
                objectOutputStream.close();
                return result;
    
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("发起远程调用异常",e);
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    public class ClientDemo {
        public static void main(String[] args) {
            RpcClientProxy rpcClientProxy = new RpcClientProxy();
            IHello iHello =  rpcClientProxy.clientProxy(IHello.class,"localhost",8080);
            System.out.println(iHello.say());
    
        }
    }
    
    

    相关文章

      网友评论

        本文标题:手写RPC框架

        本文链接:https://www.haomeiwen.com/subject/ecykgftx.html