rpc理解

作者: 君子兰琚琚 | 来源:发表于2022-04-17 02:14 被阅读0次

    1. 什么是rpc?

      rpc,即 Remote Procedure Call,中文:远程过程调用。简单点说,就是跨进程、跨机器、基于网络来实现的方法调用,下面以代码来解释:

    import org.junit.Test;
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.nio.charset.StandardCharsets;
    
    /**
     * @Author juwm
     * rpc调用方,可采用任意支持socket编程的语言实现
     */
    public class RpcClient {
    
        @Test
        public void client() {
            try (Socket socket = new Socket("127.0.0.1", 6639);
                 OutputStream out = socket.getOutputStream();
                 InputStream in = socket.getInputStream()) {
                // 目标对象 、目标方法
                String protocol = "{\"method\":\"targetMethod\",\"classFullName\":\"a.b.TargetObject\"}]";
                out.write(protocol.getBytes(StandardCharsets.UTF_8));
                int len = 100;
                byte[] c = new byte[len];
                StringBuffer buffer = new StringBuffer(100);
                for (; ; ) {
                    int read = in.read(c);
                    buffer.append(new String(c));
                    if (read != len) // 如果数据长度是len的倍数,这里不会跳出循环
                        break;
                }
                String result = buffer.substring(0, buffer.lastIndexOf("]"));
                System.out.println(result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    import com.alibaba.fastjson.JSON;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.lang.reflect.Method;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.nio.charset.StandardCharsets;
    import java.util.Map;
    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author juwm
     * rpc被调方,可采用任意支持socket编程的语言实现
     */
    public class RpcServer {
    
        private static Queue<Socket> socketList = new ArrayBlockingQueue(10);
    
        private static ThreadPoolExecutor executor =
                new ThreadPoolExecutor(5,
                        5,
                        1000,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(10),
                        Executors.defaultThreadFactory(),
                        new ThreadPoolExecutor.DiscardPolicy());
    
        @Test
        public void server() throws IOException {
            // new a ServerSocket
            ServerSocket server = new ServerSocket(6639);
            while (true) {
                // listen connect
                Socket socket = server.accept();
                // connect line up
                socketList.offer(socket);
                // create a task to Thread Pool
                dealWithConnect();
            }
        }
    
        public static void dealWithConnect() {
            executor.execute(() -> {
                try (Socket socket = socketList.poll();
                     InputStream in = socket.getInputStream();
                     OutputStream out = socket.getOutputStream()) {
                    int len = 10;
                    byte[] c = new byte[len];
                    StringBuffer buffer = new StringBuffer(100);
                    for (; ; ) {
                        /*
                         * 当read()执行时如果没有数据让它读,它就会阻塞直到有数据到来,
                         * 这是Bio的特点。read()阻塞后它后边的代码在数据到来前都将不得执行。
                         * 所以如果有代码需要再read()阻塞前执行,就需要知道客户端发送的数据的长度。
                         * 计算出当次读取的最后一次read(),执行完最后一次read()后先执行想要执行的代码,
                         * 然后再read(),这样就可以在read()阻塞前把想执行的代码执行。
                         */
                        int read = in.read(c);
                        buffer.append(new String(c));
                        if(read != len) {
                            // 这里判断是否为最后一次读取的方案是:
                            // 读取出来的数据长度是否等于数组的长度。
                            // 如果读出来的数据不能填满数组,
                            // 说明读到头了,下一次读取read()将阻塞,
                            // 如果有代码需要在阻塞前执行,则此时执行,
                            // 这种设计的bug是如果数据的长度正好是10
                            // 或者10的倍数则不行
                            break;
                        }
                    }
                    String context = buffer.substring(0, buffer.lastIndexOf("]"));
                    Map map = JSON.parseObject(context, Map.class);
                    // client想调用的目标对象(全类名)
                    Class<?> objClass = Class.forName((String) map.get("classFullName"));
                    // client想调用的目标方法
                    Method me = objClass.getMethod((String) map.get("method"), null);
                    // 利用反射调用目标对象的方法,从而实现跨进程的方法调用:Remote Procedure Call
                    String result = me.invoke(objClass.newInstance(), null) + "]";
                    // 返回结果给client
                    out.write(result.getBytes(StandardCharsets.UTF_8));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    /**
     * @Author juwm
     * 目标对象
     */
    public class TargetObject {
        // 目标方法
        public String targetMethod() {
            return "hello Remote Procedure Call!";
        }
    }
    
    image.png
    image.png

    相关文章

      网友评论

          本文标题:rpc理解

          本文链接:https://www.haomeiwen.com/subject/nqpkertx.html