前一篇文章简单介绍了下RPC的基本原理,同时附上了一个小的demo,但是这个小的demo并不能在生产上使用,因为生产上的RPC还需要考虑很多因素,比如接入注册中心、高性能的网络通信、高性能的序列化和反序列化、自动路由、容错处理等等。要实现生产上使用的先不谈,我们先来实现一个稍微复杂的RPC,借助这个RPC例子来更深刻的理解RPC原理,为后续Dubbo源码的分析做准备。
一、简单RPC架构设计
回顾下RPC原理图:
如果要自己设计实现稍微简单的一个rpc框架,应该需要考虑注册中心、网络通信、序列化等内容,因为可以设计出如下键略的RPC架构图:
二、项目起步说明
1、本RCP项目涉及的功能点
动态代理、反射、序列化、反序列化、网络通信、编解码、服务发现和注册、心跳与链路检测等
2、本小节内容说明
设计的技术点:动态代理技术、反射(有关动态代理可以看我的另一篇博文:代理模式)
实现的功能:简易的基于socket的rpc
3、本小节项目总体框架图
三、编码实现
1、api模块
创建用于测试的接口
public interface HelloService {
String sayHello(String name);
}
2、common模块
request请求实体类
public class Request implements Serializable {
private static final long serialVersionUID = 7929047349488932740L;
/**
* 请求表示id
*/
private String requestId;
/**
* 请求服务类型
*/
private String className;
/**
* 请求方法名称
*/
private String methodName;
/**
* 请求方法参数类型数组
*/
private Class<?>[] parameterTypes;
/**
* 请求参数列表
*/
private Object[] args;
......省略getter/setter
}
response响应实体类:
public class Response {
private static final long serialVersionUID = -1023480952777229650L;
private String requestId;
/**
* 响应状态吗
*/
private int code;
/**
* 响应消息说明
*/
private String msg;
/**
* 相应数据
*/
private Object data;
......省略getter/setter
3、provider模块
服务的暴露(包好服务的注册和服务的发布),服务端基本流程是
服务注册->服务发布->服务启动监听请求(socket)->处理请求
//rpc代理服务,用于暴露服务
public class RpcProxyServer {
/**
* 创建一个线程池
*/
ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 端口号
*/
private int port;
/**
* 1、服务注册
* @param serviceInterface
* @param impClass
* @return
*/
public RpcProxyServer register(Class serviceInterface, Class impClass) {
//注册服务(接口名:实现类名)
ProcessorHandler.register(serviceInterface, impClass);
return this;
}
public RpcProxyServer(int port) {
this.port = port;
}
/**
*2、 启动发布(启动)
*/
public void start() {
System.out.println("服务启动====");
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {//3、通过循环不断接受请求
Socket socket = serverSocket.accept();//监听客户端的请求
//4、每一个socket交给一个processorhandler处理,这里的target就是真正的业务类
executorService.execute(new ProcessorHandler(socket));//处理客户端的请求
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
具体处理请求的handler
//服务端接受请求处理线程
public class ProcessorHandler implements Runnable {
private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
/**
* socket
*/
private Socket socket;
public static void register(Class serviceInterface, Class impClass) {
//注册服务(接口名:实现类名)
serviceRegistry.put(serviceInterface.getName(), impClass);
}
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
public void run() {
//用于定义输入流和输出流
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
//从socket中读取请求流对象
Request rpcRequest = (Request) objectInputStream.readObject();
//调用正真的处理方法
Object result = invoke(rpcRequest);
Response response = new Response();
response.setRequestId(rpcRequest.getRequestId());
response.setData(result);
response.setMsg(ResponseCodeEnum.SUCCESS.getMsg());
response.setCode(ResponseCodeEnum.SUCCESS.getCode());
//将结果通过socket输出
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeOpenSource(objectInputStream, objectOutputStream);
}
}
private void closeOpenSource(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 利用反射技术执行正真的方法(这里只是简单的实现,没有容错处理)
*
* @param rpcRequest
* @return
*/
private Object invoke(Request rpcRequest) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
//获取目标对象并执行目标方法(也就是获取注册后的接口实现类对象)
Class<?> targetClass = serviceRegistry.get(rpcRequest.getClassName());
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Method method = targetClass.getMethod(rpcRequest.getMethodName(), parameterTypes);
Object[] args = rpcRequest.getArgs();
return method.invoke(targetClass.newInstance(), args);
}
}
用到的枚举
public enum ResponseCodeEnum {
SUCCESS(0, "success"),
FAIL(1, "fail");
....省略
rpc接口实现类:
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) {
return "hello " + name;
}
}
启动服务main方法:
public class Demo1Main {
public static void main(String[] args) {
//创建代理服务
RpcProxyServer rpcProxyServer = new RpcProxyServer(8888);
//注册服务
rpcProxyServer.register(HelloService.class, HelloServiceImpl.class);
//启动服务
rpcProxyServer.start();
}
}
4、consumer模块
消费端模块主要是通过jdk动态代理的方式实现rpc接口代理请求远程,基本流程
client->创建代理对象->通过代理对象请求远程服务->接受返回的信息
public class ClientProxy<T> {
/**
* 服务端代理接口
*/
private Class<T> serverInstance;
/**
* 服务端地址
*/
private InetSocketAddress address;
public ClientProxy(Class<T> serverInstance, String ip, Integer port) {
this.address = new InetSocketAddress(ip, port);
this.serverInstance = serverInstance;
}
/**
* 获取客户端代理对象
*
* @return
*/
public T getClientInstance() {
return (T) Proxy.newProxyInstance(serverInstance.getClassLoader(), new Class<?>[]{serverInstance}, new RemoteInvocationHandler(address));
}
}
具体远程调用invoke方法(jdk动态代理InvocationHandler)
public class RemoteInvocationHandler implements InvocationHandler {
/**
* 服务端地址
*/
private InetSocketAddress address;
public RemoteInvocationHandler(InetSocketAddress address) {
this.address=address;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request rpcRequest = new Request();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
//通过网络发送正式请求
RpcNetTransport netTransport = new RpcNetTransport(address.getPort(), address.getHostName());
Object result = (Object) netTransport.send(rpcRequest);
return result;//返回收到的结果
}
}
具体的rpc网络请求(socket)
//网络传送
public class RpcNetTransport {
private int port;
private String host;
public RpcNetTransport(int port, String host) {
this.port = port;
this.host = host;
}
/**
* 发送请求
*
* @param request
*/
public Object send(Request request) throws IOException, ClassNotFoundException {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
// 1.创建Socket客户端,根据指定地址连接远程服务提供者
socket = new Socket(host, port);
//2、将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
outputStream=new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(request);
//3、同步阻塞等待服务器返回应答,获取应答后返回
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
}
}
}
消费端消费服务main方法:
public class Demo1Main {
public static void main(String[] args) {
ClientProxy clientProxy = new ClientProxy(HelloService.class, "127.0.0.1", 8888);
HelloService helloService = (HelloService) clientProxy.getClientInstance();
String result = helloService.sayHello("嘿嘿嘿");
System.out.println(result);
}
}
三、总结与思考
总结:本节实现了一个非常简单的rpc原型项目,包含了服务注册、采用BIO的网络通信模型传送数据、采用jdk原生代理模式进行服务代理、采用jdk原生的序列化方式进行序列化和反序列化等。后续将会针对该原型项目不断的改进,不断的引入新的“武器”,来丰富整个rpc项目。
后期预热:引入注册中心(解决服务治理问题)、引入多种高效的序列化机制、引入NIO的网络通信模型、引入软负载均衡机制、引入spi扩展机制、接入spring等等,敬请期待。
网友评论