美文网首页
分布式:RPC原理与实例实践

分布式:RPC原理与实例实践

作者: 五百一十七XX | 来源:发表于2019-11-07 19:30 被阅读0次

参考链接
RPC相关
https://blog.csdn.net/weixin_38327420/article/details/85064617
https://www.jianshu.com/p/32ca4fd5a7e2
https://www.jianshu.com/p/5b90a4e70783
https://www.cnblogs.com/baizhanshi/p/6053213.html
java序列化
https://blog.csdn.net/so_geili/article/details/78931742

目录

一、RPC的概念
二、RPC通信过程
三、RPC实例

一、RPC的概念

RPC:remote procedure call Protocol 远程过程调用
RPC 是一种通过网络从远程计算机上请求服务但不需要关心底层的网络通信细节的通信协议。简单来说,RPC 是一种通信协议。

RPC允许像调用本地服务一样调用远程服务,让调用者对网络通信细节透明。

RPC 的架构

通常为 C/S 模型。一个典型的 RPC 结构组成有如下几个部分:

    1. 通信模块

通信模块就是用于在网络上传输数据包的,它一般不会对数据包内容作任何处理。 RPC 的通信有同步和异步两种通信模式。

    1. Stub 程序

可以简单理解成是一个代理。它上面记载了所有支持的远程调用接口,服务端和客户端各自持有一份完全相同的 Stub 程序。我们的 RPC 的过程,其本质上就是去调用各自手里的这份 Stub 程序上的接口而已。

    1. 调度程序

调度器,运行在服务端。接收来自通信模块的请求,并根据请求中的标识符来选择对应的 Stub 程序去执行。

二、RPC通信过程

要让网络通信细节对使用者透明,我们需要对通信细节进行封装,一个RPC调用的流程涉及的通信细节如下:

RPC通信过程

1.服务消费方(client)调用以本地调用方式调用服务;
2.client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
3. client stub找到服务地址,并将消息发送到服务端;
4.server stub收到消息后进行解码;
5.server stub根据解码结果调用本地的服务;
6.本地服务执行并将结果返回给server stub;
7.server stub将返回结果打包成消息并发送至消费方;
8.client stub接收到消息,并进行解码;
9.服务消费方得到最终结果。

三、RPC实例

本文利用的实例代码,见 github
在本地模拟RPC运行原理,运行时先运行服务器(ProviderApp)端代码,再运行客户端(ComsumerApp)代码。

利用RPC框架简易实现1+2=3远程调用的例子。客户端发送两个参数,服务端返回两个数字的相加结果。接下来对代码的主要内容进行说明。

主要分为客户端、服务提供端,请求处理三个部分。

代码结构
1)服务端

服务端提供客户端所期待的服务,一般包括三个部分:服务接口,服务实现以及服务的注册暴露三部分

  • 服务接口
    定义Calculate接口
public interface Calculator {
    int add(int a, int b);
}
  • 服务实现
    实现具体的加法计算,传入两个参数,返回计算结果
public class CalculatorImpl implements Calculator {
    public int add(int a, int b) {
        return a + b;
    }
}
  • 服务暴露,服务暴露功能实现
    只有把服务暴露出来,才能让客户端进行调用,这是RPC框架功能之一。

RPC服务端逻辑 :首先创建ServerSocket负责监听特定端口并接收客户连接请求,然后使用Java原生的序列化/反序列化机制来解析得到请求,包括所调用方法的名称、参数列表和实参,最后反射调用服务端对服务接口的具体实现并将得到的结果回传至客户端。至此,一次简单PRC调用的服务端流程执行完毕。

//服务暴露

public class ProviderApp {
    private static Logger log = LoggerFactory.getLogger(ProviderApp.class);

    private Calculator calculator = new CalculatorImpl();

    public static void main(String[] args) throws IOException {
        new ProviderApp().run();
    }

//服务端暴露服务具体实现
    private void run() throws IOException {
        ServerSocket listener = new ServerSocket(9090);
        try {
            while (true) {
                Socket socket = listener.accept();
                try {
                    // 将请求反序列化
                    ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                    Object object = objectInputStream.readObject();

                    log.info("request is {}", object);

                    // 调用服务
                    int result = 0;
                    if (object instanceof CalculateRpcRequest) {
                        CalculateRpcRequest calculateRpcRequest = (CalculateRpcRequest) object;
                        if ("add".equals(calculateRpcRequest.getMethod())) {
                            result = calculator.add(calculateRpcRequest.getA(), calculateRpcRequest.getB());
                        } else {
                            throw new UnsupportedOperationException();
                        }
                    }

                    // 返回结果
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                    objectOutputStream.writeObject(new Integer(result));
                } catch (Exception e) {
                    log.error("fail", e);
                } finally {
                    socket.close();
                }
            }
        } finally {
            listener.close();
        }
    }

}
2)客户端

客户端调用服务端所提供的服务,包括服务接口、服务引用两个部分。

  • 服务接口:与服务端共享同一个服务接口
public interface Calculator {
    int add(int a, int b);
}
  • 服务引用
    消费端通过RPC框架进行远程调用
public class ComsumerApp {
    private static Logger log = LoggerFactory.getLogger(ComsumerApp.class);

    public static void main(String[] args) {
        Calculator calculator = new CalculatorRemoteImpl();
        int result = calculator.add(1, 2);
        log.info("result is {}", result);
    }
}
  • 服务引用实现

通过CalculatorRemoteImpl,封装客户端引用RPC的逻辑。

RPC客户端逻辑:首先创建Socket客户端并与服务端建立链接,然后使用Java原生的序列化/反序列化机制将调用请求发送给客户端,包括所调用方法的名称、参数列表将服务端的响应返回给用户即可。至此,一次简单PRC调用的客户端流程执行完毕。

分布式应用下,一个服务可能有多个实例,比如Service B,可能有ip地址为198.168.1.11和198.168.1.13的两个实例,lookupProviders其实就是在寻找要调用的服务的实例列表。在分布式应用下,通常会有一个服务注册中心,来提供查询实例列表的功能。查到实例列表之后利用chooseTarget选择具体调用哪个实例,内部是一个负载均衡策略。

代码仅实现了一个非常简单的RPC框架,不考虑查找实例和动态代理问题,这里将端口固定为9090,ip地址为本地地址127.0.0.1

public class CalculatorRemoteImpl implements Calculator {
    public static final int PORT = 9090;
    private static Logger log = LoggerFactory.getLogger(CalculatorRemoteImpl.class);

    public int add(int a, int b) {
        List<String> addressList = lookupProviders("Calculator.add");
        String address = chooseTarget(addressList);
        try {
            Socket socket = new Socket(address, PORT);

            // 将请求序列化
            CalculateRpcRequest calculateRpcRequest = generateRequest(a, b);
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

            // 将请求发给服务提供方
            objectOutputStream.writeObject(calculateRpcRequest);

            // 将响应体反序列化
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            Object response = objectInputStream.readObject();

            log.info("response is {}", response);

            if (response instanceof Integer) {
                return (Integer) response;
            } else {
                throw new InternalError();
            }

        } catch (Exception e) {
            log.error("fail", e);
            throw new InternalError();
        }
    }

    private CalculateRpcRequest generateRequest(int a, int b) {
        CalculateRpcRequest calculateRpcRequest = new CalculateRpcRequest();
        calculateRpcRequest.setA(a);
        calculateRpcRequest.setB(b);
        calculateRpcRequest.setMethod("add");
        return calculateRpcRequest;
    }

    private String chooseTarget(List<String> providers) {
        if (null == providers || providers.size() == 0) {
            throw new IllegalArgumentException();
        }
        return providers.get(0);
    }

    public static List<String> lookupProviders(String name) {
        List<String> strings = new ArrayList();
        strings.add("127.0.0.1");
        return strings;
    }
}
request

CalculateRpcRequest类里是序列化定义相关内容,实现Serializable接口

public class CalculateRpcRequest implements Serializable {

    private static final long serialVersionUID = 7503710091945320739L;

    private String method;
    private int a;
    private int b;

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }
      
    public int getA() {
        return a;
    }

    public void setA(int a) {
        this.a = a;
    }

    public int getB() {
        return b;
    }

    public void setB(int b) {
        this.b = b;
    }

    @Override
    public String toString() {
        return "CalculateRpcRequest{" +
                "method='" + method + '\'' +
                ", a=" + a +
                ", b=" + b +
                '}';
    }
}
运行结果

如下,最终得到1+2=3的结果。

1+2=3

相关文章

网友评论

      本文标题:分布式:RPC原理与实例实践

      本文链接:https://www.haomeiwen.com/subject/ixhpbctx.html