美文网首页
0528--RPC简易实现

0528--RPC简易实现

作者: 小明的爷爷的爸爸 | 来源:发表于2020-05-28 11:49 被阅读0次

    RPC 主要实现远程过程调用

    一、实现思路

    1. 通讯方式 先简单实用socket
    
    2. server端,
    
        RPCServer
    
          - start() 启动
    
          - register(interface,impl) 注册 接口 和 对应的实现
    
    3. RPCClient
    
          - getRemoteProxy(interface,add) 获取到访问远程接口方法的代理对象
    
    4. ServiceProduce
    
          - sendData(String data) 远程的某一个调用接口
    

    二、简易实现

    RPCServer 接口

    
    public interface RPCServer {
    
        /**
    
        * 启动
    
        * @throws IOException
    
        */
    
        public void start()  throws IOException;
    
        /**
    
        * 注册 接口 和 对应的实现
    
        * @param serverInterface
    
        * @param impl
    
        */
    
        public void register(Class serverInterface,Class impl);
    
    }
    
    

    RPCServerImpl 实现

    
    public class RPCServerImpl implements RPCServer {
    
        private  int port;
    
        static final HashMap<String,Class> serviceImplMap = new HashMap<>();
    
        private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        public RPCServerImpl(int port) {
    
            this.port = port;
    
        }
    
        @Override
    
        public void start() throws IOException{
    
            //启动服务,并调用服务
    
                ServerSocket server = new ServerSocket();
    
                server.bind(new InetSocketAddress(port));
    
                System.out.println("Server start...");
    
                try{
    
                    while(true) {
    
                        //为了方便管理,使用线程池调用
    
                        executor.execute(new ServiceTask(server.accept()));
    
                    }
    
                }finally {
    
                    server.close();
    
                }
    
        }
    
        @Override
    
        public void register(Class serverInterface, Class impl) {
    
            //注册接口与实现的对应关系
    
            serviceImplMap.put(serverInterface.getName(),impl);
    
        }
    
        /**
    
        * 主要业务逻辑放置在Task中
    
        */
    
        private static class ServiceTask implements Runnable{
    
            Socket socket;
    
            public ServiceTask(Socket accept) {
    
                this.socket = accept;
    
            }
    
            @Override
    
            public void run() {
    
                ObjectInputStream input =null;
    
                ObjectOutputStream output =null;
    
                // 对输入流处理 ,反序列化拿到数据
    
                try{
    
                    input = new ObjectInputStream(socket.getInputStream());
    
                    // 输入的格式: 服务名 方法名 参数类型  参数值
    
                    String serviceInterfaceName = input.readUTF();
    
                    String methodName = input.readUTF();
    
                    Class[] parameterTypes = (Class<?>[])input.readObject();
    
                    Object[] arguments = (Object[])input.readObject();
    
                    //通过服务接口名获取到实现
    
                    Class serviceImpl = serviceImplMap.get(serviceInterfaceName);
    
                    if(serviceImpl==null){
    
                        throw new ClassNotFoundException(serviceInterfaceName+" not found!");
    
                    }
    
                    Method method = serviceImpl.getMethod(methodName, parameterTypes);
    
                    Object invokeResult = method.invoke(serviceImpl.newInstance(), arguments);
    
                    output = new ObjectOutputStream(socket.getOutputStream());
    
                    output.writeObject(invokeResult);
    
                }catch (Exception e){
    
                    e.printStackTrace();
    
                }finally {
    
                    if(output!=null){
    
                        try {
    
                            output.close();
    
                        } catch (IOException e) {
    
                            e.printStackTrace();
    
                        }
    
                    }
    
                    if(input!=null){
    
                        try {
    
                            input.close();
    
                        } catch (IOException e) {
    
                            e.printStackTrace();
    
                        }
    
                    }
    
                    if(socket!=null){
    
                        try {
    
                            socket.close();
    
                        } catch (IOException e) {
    
                            e.printStackTrace();
    
                        }
    
                    }
    
                }
    
            }
    
        }
    
    }
    
    

    RPCClient 接口

    
    public interface RPCClient<T> {
    
        public T getRemoteProxy(Class<?> serviceInterface, InetSocketAddress addr);
    
    }
    
    

    RPCClient 实现接口

    
    public class RPCClientImpl<T> implements RPCClient {
    
        //由于是调用接口,直接动态代理,发送Socket给服务端,调用服务端服务
    
        @Override
    
        public T getRemoteProxy(Class serviceInterface, InetSocketAddress addr) {
    
            return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
    
                @Override
    
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
                    Socket socket = null;
    
                    ObjectOutputStream output =null;
    
                    ObjectInputStream input = null;
    
                    try{
    
                        socket = new Socket();
    
                        socket.connect(addr);
    
                        //传输数据
    
                        output = new ObjectOutputStream(socket.getOutputStream());
    
                        output.writeUTF(serviceInterface.getName());
    
                        output.writeUTF(method.getName());
    
                        output.writeObject(method.getParameterTypes());
    
                        output.writeObject(args);
    
                        //获取返回结果
    
                        input = new ObjectInputStream(socket.getInputStream());
    
                        //这里返回的是invoke ,也就是方法执行后的返回结果
    
                        return input.readObject();
    
                    }finally {
    
                        if (socket != null){
    
                            socket.close();
    
                        }
    
                        if (output != null){
    
                            output.close();
    
                        }
    
                        if (input != null){
    
                            input.close();
    
                        }
    
                    }
    
                }
    
            });
    
        }
    
    }
    
    

    上面用到的某个远程调用实例

    
    public interface ServiceProducer {
    
        public String sendData(String data);
    
    }
    
    public class ServiceProducerImpl implements ServiceProducer {
    
        @Override
    
        public String sendData(String data) {
    
            return "I am service producer!!!, the data is "+ data;
    
        }
    
    }
    
    

    测试用例

    
    ublic class RPCTest {
    
        public static void main(String[] args) {
    
            int port =9999;
    
            //需要开启线程,不然会阻塞,由于有while(true)
    
            new Thread(){
    
                @Override
    
                public void run() {
    
                    //创建服务
    
                    RPCServer server = new RPCServerImpl(port);
    
                    //注册服务
    
                    server.register(ServiceProducer.class,ServiceProducerImpl.class);
    
                    //启动服务
    
                    try {
    
                        server.start();
    
                    } catch (IOException e) {
    
                        e.printStackTrace();
    
                    }
    
                }
    
            }.start();
    
            RPCClient<ServiceProducer> client = new RPCClientImpl<>();
    
            ServiceProducer serviceProducer = client.getRemoteProxy(ServiceProducer.class, new InetSocketAddress("localhost",port));
    
            System.out.println(serviceProducer.sendData("rpc test ..."));
    
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:0528--RPC简易实现

          本文链接:https://www.haomeiwen.com/subject/ueboahtx.html