美文网首页手写RPC框架Java 杂谈
手写一个RPC框架,看看100个线程同时调用情况如何

手写一个RPC框架,看看100个线程同时调用情况如何

作者: jwfy | 来源:发表于2019-06-29 13:12 被阅读83次

    本人微信公众号(jwfy)欢迎关注

    简单的介绍RPC是什么,RPC整个调用流程是什么,包含了什么组件。然后实际编写一个RPC实例,模拟100个线程调用以验证RPC的可用性,稳定性等。最后总结自己编写的RPC框架存在哪些问题,可以去完善的,一个优秀的RPC框架应该必备的功能点。

    什么是RPC

    RPC(Remote Procedure Call),远程过程调用,可通过网络调用其他机器的服务请求。RPC是一种规范,和TCP、UDP都没有关系,RCP可以采用TCP协议完成数据传输,甚至可以使用HTTP应用协议。RCP是C端模式,包含了服务端(服务提供方)、客户端(服务使用方),采用特定的网络传输协议,把数据按照特定的协议包装后进行传输操作等操作。先来了解下一个具体的RPC调用请求的执行过程

    image

    本图来自网络

    • 1、服务调用方(Client)调用本地调用的方式调用本地代理对象
    • 2、代理对象将类名称、方法、参数等请求数据按照请求协议组装成Request
    • 3、通过Request数据从服务治理获取有效的服务端信息
    • 4、将Request数据按照序列化协议序列化后,使用网络传输协议通过网络发送到服务端中
    • 5、服务端接收到序列化后到数据,利用序列号协议反序列化操作生成Request数据
    • 6、通过Request数据找到具体的服务提供方,并调用执行特定的方法,计算出执行结果
    • 7、执行结果包装成Response,按照原路返回至客户端
    • 8、客户端解析Response,得到对应的执行结果,又或者是具体的错误信息

    这就是一个完整的RPC调用过程,对使用方而言就只暴露了本地代理对象,剩下的数据解析、运输等都被包装了,从服务提供方的角度看还有服务暴露,如下图DUBBO的架构图。

    image

    RPC 实践

    学习写RPC之前必须先了解动态代理反射这两个知识点,如不了解先自行了解,本学习笔记不涉及到此内容的介绍。

    文件夹目录

    image

    Request对象

    // lombok 
    @Data
    public class MethodParameter {
    
        String className;
        String methodName;
        Object[] arguments;
        Class<?>[] parameterTypes;
    
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    
        public static MethodParameter convert(InputStream inputStream) {
    
            try {
                ObjectInputStream input = new ObjectInputStream(inputStream);
                String className = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                Object[] arguments = (Object[])input.readObject();
    
                MethodParameter methodParameter = new MethodParameter();
                methodParameter.setClassName(className);
                methodParameter.setMethodName(methodName);
                methodParameter.setArguments(arguments);
                methodParameter.setParameterTypes(parameterTypes);
    
                return methodParameter;
            } catch (Exception e) {
                throw new RuntimeException("解析请求错误:" + e.getMessage());
            }
        }
    
    }
    

    可以很清楚的看到convert方法就是从一个输入流中读取出类名称、方法名等数据,组成一个MethodParameter对象,也就是上面所说的Request

    服务端 - 服务暴露

    public class RpcExploreService {
    
        private Map<String, Object> objectMap = new HashMap<>();
    
        public void explore(String className, Object object) {
            objectMap.put(className, object);
        }
    
        public Object invoke(MethodParameter methodParameter) {
            Object object = objectMap.get(methodParameter.getClassName());
            if (object == null) {
                throw new RuntimeException("无对应执行类:" + methodParameter.getClassName());
            }
            Method method = null;
            try {
                method = object.getClass().getMethod(methodParameter.getMethodName(), methodParameter.getParameterTypes());
            } catch (NoSuchMethodException e) {
                throw new RuntimeException("无对应执行方法:" + methodParameter.getClassName() + ", 方法:" + methodParameter.getMethodName());
            }
    
            try {
                Object result = method.invoke(object, methodParameter.getArguments());
    
                System.out.println(methodParameter);
    
                return result;
            } catch (Exception e) {
                throw new RuntimeException("invoke方法执行失败:" + e.getMessage());
            }
        }
    
    }
    

    服务暴露存储了一个Map<String, Object> objectMap对象,所有可对外提供服务的都必须添加到该容器中,以便于收到网络数据后能找到对应的服务,然后采用反射invoke调用,返回得到的结果。

    服务端 - 网络数据处理

    public class IOService implements Runnable{
    
        private int port;
        private ServerSocket serverSocket;
        private RpcExploreService rpcExploreService;
        private volatile boolean flag;
    
        public IOService(RpcExploreService rpcExploreService, int port) throws IOException {
            this.rpcExploreService = rpcExploreService;
            this.port = port;
            this.serverSocket = new ServerSocket(port);
            this.flag = true;
            System.out.println("服务端启动了");
    
            // 优雅关闭
            Runtime.getRuntime().addShutdownHook(new Thread() {
    
                @Override
                public void run() {
                    flag = false;
                    System.out.println("服务端关闭了");
                }
            });
        }
    
        @Override
        public void run() {
            while (flag) {
                Socket socket = null;
                try {
                    socket = serverSocket.accept();
                } catch (IOException e) {
                }
                if (socket == null) {
                    continue;
                }
                new Thread(new ServerSocketRunnable(socket)).start();
            }
        }
    
        class ServerSocketRunnable implements Runnable {
            private Socket socket;
            public ServerSocketRunnable(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
                try {
                    InputStream inputStream = socket.getInputStream();
                    OutputStream outputStream = socket.getOutputStream();
                    MethodParameter methodParameter = MethodParameter.convert(inputStream);
                    Object result = rpcExploreService.invoke(methodParameter);
                    ObjectOutputStream output = new ObjectOutputStream(outputStream);
                    output.writeObject(result);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    

    简单的BIO模型,开启了一个ServerSocket后,接收到数据后就把套接字丢给一个新的线程处理,ServerSocketRunnable接受一个socket后,解析出MethodParameter这个请求对象,然后调用服务暴露的invoke方法,再写回到socket传输给客户端

    客户端 - 服务订阅

    public class RpcUsedService {
    
        private Map<String, Object> proxyObjectMap = new HashMap<>();
        private Map<String, Class> classMap = new HashMap<>();
        private IOClient ioClient;
    
        public void setIoClient(IOClient ioClient) {
            this.ioClient = ioClient;
        }
    
        public void register(Class clazz) {
            String className = clazz.getName();
            classMap.put(className, clazz);
            if (!clazz.isInterface()) {
                throw new RuntimeException("暂时只支持接口类型的");
            }
    
            try {
                RpcInvocationHandler handler = new RpcInvocationHandler();
                handler.setClazz(clazz);
                Object proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, handler);
                proxyObjectMap.put(className, proxyInstance);
                // 然后需要包装起来
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public <T> T get(Class<T> clazz) {
            String className = clazz.getName();
            return (T) proxyObjectMap.get(className);
        }
    
        class RpcInvocationHandler implements InvocationHandler {
    
            private Class clazz;
            public void setClazz(Class clazz) {
                this.clazz = clazz;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 实际上proxy没啥用处,不需要真正的反invoke射
                MethodParameter methodParameter = new MethodParameter();
    
                methodParameter.setClassName(clazz.getName());
                methodParameter.setMethodName(method.getName());
                methodParameter.setArguments(args);
                methodParameter.setParameterTypes(method.getParameterTypes());
    
                return ioClient.invoke(methodParameter);
            }
        }
    }
    

    服务使用方需要使用register进行服务的注册,会生成对应的本地代理对象,后续只需要通过本地代理对象。

    客户端 - 网络处理

    public class IOClient {
    
        private String ip;
        private int port;
        public IOClient(String ip, int port) throws IOException {
            this.ip = ip;
            this.port = port;
        }
    
        public Object invoke(MethodParameter methodParameter) {
            Socket socket = null;
            try {
                socket = new Socket(ip, port);
                OutputStream outputStream = socket.getOutputStream();
                ObjectOutputStream ouput = new ObjectOutputStream(outputStream);
    
                ouput.writeUTF(methodParameter.getClassName());
                ouput.writeUTF(methodParameter.getMethodName());
                ouput.writeObject(methodParameter.getParameterTypes());
                ouput.writeObject(methodParameter.getArguments());
    
                InputStream inputStream = socket.getInputStream();
                ObjectInputStream input = new ObjectInputStream(inputStream);
                return input.readObject();
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
            return null;
        }
    }
    

    代理对象被调用后生成一个MethodParameter对象,通过此IOClient把数据传输到服务端,并且返回对应的数据。

    实践

    服务端

    public class Service {
    
        public static void main(String[] args) {
            RpcExploreService rpcExploreService = new RpcExploreService();
            // 传入的字符串是接口的全名称
            rpcExploreService.explore("new2019.rpc.rpc_v1.expore.Helloworld", new HelloWorldImpl());
    
            try {
                Runnable ioService = new IOService(rpcExploreService, 10001);
                new Thread(ioService).start();
                // 开启了端口为10001的服务监听
            } catch (IOException e) {
            }
        }
    }
    

    客户端

    public class Client {
    
        public static void main(String[] args) {
            RpcUsedService rpcUsedService = new RpcUsedService();
            rpcUsedService.register(Helloworld.class);
    
            try {
                IOClient ioClient = new IOClient("127.0.0.1", 10001);
                // 网络套接字链接 同上是10001端口
                rpcUsedService.setIoClient(ioClient);
    
                Helloworld helloworld = rpcUsedService.get(Helloworld.class);
                // 生成的本地代理对象 proxy
    
                for(int i=0; i< 100; i++) {
                    // 开启了100个县城
                    new Thread(() -> {
                        long start = System.currentTimeMillis();
                        int a = new Random().nextInt(100);
                        int b = new Random().nextInt(100);
                        int c = helloworld.add(a, b);
                        // .add 操作就是屏蔽了所有的细节,提供给客户端使用的方法
                        System.out.println("a: " + a + ", b:" + b + ", c=" + c + ", 耗时:" + (System.currentTimeMillis() - start));
                    }).start();
                }
    
            } catch (IOException e) {
            }
        }
    }
    

    测试服务

    // Helloworld 接口
    public interface Helloworld {
        String hi();
        int add(int a, int b);
    }
    
    // Helloworld 接口 实现类
    public class HelloWorldImpl implements Helloworld {
    
        @Override
        public String hi() {
            return "ok";
        }
    
        @Override
        public int add(int a, int b) {
            long start = System.currentTimeMillis();
            try {
                Thread.sleep(new Random().nextInt(10000));
                // 故意添加了耗时操作,以便于模拟真实的调用操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int c = a + b;
            System.out.println(Thread.currentThread().getName() + " 耗时:" + (System.currentTimeMillis() - start));
            return c;
        }
    }
    

    运行效果

    image image

    总结 & 思考

    这只是一个非常简单的RPC实践,包含了服务暴露、服务注册(Proxy生成)、BIO模型进行网络传输,java默认的序列化方法,对RPC有一个初步的认识和了解,知道RPC必须包含的模块

    不过还是有很多需要优化的点以改进。

    • IO模型:使用的是BIO模型,可以改进换成NIO模型,引入netty
    • 池化:不要随意新建线程,所有的线程都应有线程池统一管理
    • 服务发现:本地模拟的小demo,并没有服务发现,可以采用zk管理
    • 序列化:java本身自带的序列化效率很低,可以换成Hessian(DUBBO默认采用其作为序列化工具)、Protobuf(Protobuf是由Google提出的一种支持多语言的跨平台的序列化框架)等

    还有例如服务统计、优雅下线、负载均衡等也都是一个成熟的RPC框架必须要考虑到的点。

    本人微信公众号(搜索jwfy)欢迎关注

    微信公众号

    相关文章

      网友评论

        本文标题:手写一个RPC框架,看看100个线程同时调用情况如何

        本文链接:https://www.haomeiwen.com/subject/vvqpcctx.html