美文网首页
RPC(五)

RPC(五)

作者: supremecsp | 来源:发表于2022-06-17 11:40 被阅读0次

在RPC框架中,服务暴露的api接口,并不会包含真实的业务逻辑,业务逻辑都在服务提供方应用里,但我们通过调用接口方法,确实拿到了想要的结果,是不是感觉有点神奇呢?

这里面用到的核心技术就是前面说的动态代理。RPC 会自动给接口生成一个代理类,当我们在项目中注入接口的时候,运行过程中实际绑定的是这个接口生成的代理类。这样在接口方法被调用的时候,它实际上是被生成代理类拦截到了,这样我们就可以在生成的代理类里面,加入远程调用逻辑。

既然动态代理是一种具体的技术框架,那就会涉及到选型。我们可以从这样三个角度去考虑:
因为代理类是在运行中生成的,那么代理框架生成代理类的速度、生成代理类的字节码大小等等,都会影响到其性能——生成的字节码越小,运行所占资源就越小。
还有就是我们生成的代理类,是用于接口方法请求拦截的,所以每次调用接口方法的时候,都会执行生成的代理类,这时生成的代理类的执行效率就需要很高效。
最后一个是从我们的使用角度出发的,我们肯定希望选择一个使用起来很方便的代理类框架,比如我们可以考虑:API 设计是否好理解、社区活跃度、还有就是依赖复杂度等等。

我们可以用Socket简单模拟连接请求
服务端:ServerSocket server = new ServerSocket(port)创建连接端口,server.accept();阻塞等待请求new ObjectInputStream(socket.getInputStream());new ObjectOutputStream(socket.getOutputStream())建立对象通讯;然后将ObjectInputStream::readObject读取到的数据通过反射到真正的方法进行执行,得到结构后ObjectOutputStream::writeObject输出回客户端

public void export(Object service, int port) throws Exception {
    if (service == null) {
      throw new IllegalArgumentException("service instance == null");
    }
    if (port <= 0 || port > 65535) {
      throw new IllegalArgumentException("Invalid port " + port);
    }

    ServerSocket server = new ServerSocket(port);
    while (true) {
      final Socket socket = server.accept();
      new Thread(() -> {
        try {
          ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
          ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
          try {
            String methodName = input.readUTF();
            Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
            Object[] arguments = (Object[]) input.readObject();
            Method method = service.getClass().getMethod(methodName, parameterTypes);
            Object result = method.invoke(service, arguments);
            output.writeObject(result);
          } catch (Throwable t) {
            output.writeObject(t);
          } finally {
            output.close();
            input.close();
            socket.close();
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }).start();
    }
  }

客户端:返回一个代理类实例,利用回调的方法,让方法执行的时候走通讯逻辑,同理有着socket ,input,output。

public <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {

    if (interfaceClass == null)
      throw new IllegalArgumentException("Interface class == null");
    if (! interfaceClass.isInterface())
      throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
    if (host == null || host.length() == 0)
      throw new IllegalArgumentException("Host == null!");
    if (port <= 0 || port > 65535)
      throw new IllegalArgumentException("Invalid port " + port);

    return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] {interfaceClass},
            (proxy, method, arguments) -> {
              Socket socket = new Socket(host, port);
              try {
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                try {
                  output.writeUTF(method.getName());
                  output.writeObject(method.getParameterTypes());
                  output.writeObject(arguments);
                  ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                  try {
                    Object result = input.readObject();
                    if (result instanceof Throwable) {
                      throw (Throwable) result;
                    }
                    return result;
                  } finally {
                    input.close();
                  }
                } finally {
                  output.close();
                }
              } finally {
                socket.close();
              }
            });
  }

Proxy.newProxyInstance不理解的话可以看看文章:对Proxy.newProxyInstance的一些理解

回调函数简单理解就是返回一个对象,在接下来的逻辑中才真正执行对象逻辑;在上面的例子就是丰富原先方法的功能(加上连接处理与数据返回)。
客户端和服务端的分别调用

public static void main(String[] args) throws Exception {
      RpcClient client = new RpcClient();
      CalculatorService service = client.refer(CalculatorService.class, "127.0.0.1", 1234);
      int result = service.add(2, 4);
      System.out.println("result:" + result);
    }

public static void main(String[] args) throws Exception {
      CalculatorService service = new CalculatorServiceImpl();
      RpcServer server = new RpcServer();
      server.export(service, 1234);
    }

之前使用过回调函数用户数据修复,由于生产环境和测试环境数据不同,修复逻辑也不同。为了复用代码和增强可读性,也用了回调函数。简单记录下用法。

创建函数式接口(用于回调)

    @FunctionalInterface
    interface CollectData{
        void collectUserData(A xxx, B xxx, C xxx);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        if(redisTemplate.hasKey(key)) return;
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(this::repairProMediaData);
    }

编写测试和生产环境各种的修复数据准备

     public void repairProMediaData(){
        CollectData collectData = (xxx, xxx, xxx) -> {
        //生产环境逻辑 xxx
        };
        this.repairMediaData(collectData, false);
    }

    public void repairFatMediaData(){
        CollectData collectData = (xxx, xxx, xxx) -> {
        //测试环境逻辑 xxx
        this.repairMediaData(collectData, true);
    }

数据修复流程,并保留方法方便写单元测试

 /**
    * @Description: 修复cos, mysql以及cos中数据
    * @Param: collectData 对用户当前文件夹中的数据进行收集
    * @Param: flag 如果当前文件夹属于用户,是否对文件夹中的数据进行判断;生产环境无需判断(视频功能还没上线,文件夹中的数据不会错乱);
                   测试环境需要判断
    * @Author: xxx
    * @Date: 2021/1/26
    */
    @VisibleForTesting
    protected void repairMediaData(CollectData collectData, Boolean flag){
        log.info("数据修复开始");
        //xxx
        this.deleteErrorFolder(mediaStroe, userData, collectData, flag);
        //xxx
        log.info("数据修复结束");
        redisTemplate.opsForValue().set(key, 1);
    }

修复数据逻辑

/**
    * @Description: cos删除多余文件夹并返回当前用户的空间与文件数目
    * @Param: [mediaStroe, userId]
    * @return: java.lang.Long
    * @Author: xxx
    * @Date: 2021/1/25
    */
    @VisibleForTesting
    @Transactional
    protected UserData deleteErrorFolder(MediaStroe mediaStroe, UserData userData, CollectData collectData, Boolean flag){
         //xxx
        collectData.collectUserData(userData, files, allRemoveFiles);
        //xxx
}

相关文章

网友评论

      本文标题:RPC(五)

      本文链接:https://www.haomeiwen.com/subject/lmtcvrtx.html