本人微信公众号(jwfy)欢迎关注
手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理
本次进行第三个版本的迭代,支持了自定义序列化工具,在代码中默认实现了Java内置的序列化方式以及Hessian序列化方式,并对比其两者的效果,以及泛型参数传递的小优化点、空指针的处理等。最后聊一下InputStream.read 读取数据的问题以及整个手写框架的进度情况。
image这张图在手写RPC框架(2)-引入zookeeper做服务治理已经贴出了过,而我们这次的重点也就是在这里的1-4个点上。
- 1、客户端代理对象产生的RpcRequest序列化成byte,发送给服务端
- 2、服务端收到byte数据,反序列化成RpcRequest对象
- 3、服务端生成数据后,拼接成RpcResponse,序列化成byte,发送给客户端
- 4、客户端收到byte数据
在使用类似于Hessian序列化工具时,需要先引入该jar包
<!--hessian-->
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.38</version>
</dependency>
RPC 实践 V3版本
image圈住的代码就是本次的重点序列化和反序列化
MessageProtocol 消息协议
/**
* 请求、应答 解析和反解析,包含了序列化以及反序列化操作
*
* @author jwfy
*/
public interface MessageProtocol {
/**
* 服务端解析从网络传输的数据,转变成request
* @param inputStream
* @return
*/
RpcRequest serviceToRequest(InputStream inputStream);
/**
* 服务端把计算结果包装好,通过outputStream 返回给客户端
* @param response
* @param outputStream
* @param <T>
*/
<T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);
/**
* 客户端把请求拼接好,通过outputStream发送到服务端
* @param request
* @param outputStream
*/
void clientToRequest(RpcRequest request, OutputStream outputStream);
/**
* 客户端接收到服务端响应的结果,转变成response
* @param inputStream
*/
<T> RpcResponse<T> clientGetResponse(InputStream inputStream);
}
主要是修改了serviceToRequest以及clientGetResponse接口,从参数挪到返回值,使其更加容易理解
MessageProtocol 消息协议实现类
public class DefaultMessageProtocol implements MessageProtocol {
private SerializeProtocol serializeProtocol;
public DefaultMessageProtocol() {
// 默认是采用了Hessian协议
this.serializeProtocol = new HessianSerialize();
}
public void setSerializeProtocol(SerializeProtocol serializeProtocol) {
// 可通过set方法替换序列化协议
this.serializeProtocol = serializeProtocol;
}
@Override
public RpcRequest serviceToRequest(InputStream inputStream) {
try {
// 2、bytes -> request 反序列化
byte[] bytes = readBytes(inputStream);
// System.out.println("[2]服务端反序列化出obj:[" + new String(bytes) + "], length:" + bytes.length);
System.out.println("[2]服务端反序列化出obj length:" + bytes.length);
RpcRequest request = serializeProtocol.deserialize(RpcRequest.class, bytes);
return request;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
try {
// 3、把response 序列化成bytes 传给客户端
byte[] bytes = serializeProtocol.serialize(RpcResponse.class, response);
// System.out.println("[3]服务端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
System.out.println("[3]服务端序列化出bytes length:" + bytes.length);
outputStream.write(bytes);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void clientToRequest(RpcRequest request, OutputStream outputStream) {
try {
// 1、先把这个request -> bytes 序列化掉
byte[] bytes = serializeProtocol.serialize(RpcRequest.class, request);
// System.out.println("[1]客户端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
System.out.println("[1]客户端序列化出bytes length:" + bytes.length);
outputStream.write(bytes);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public <T> RpcResponse<T> clientGetResponse(InputStream inputStream) {
try {
// 4、bytes 反序列化成response
byte[] bytes = readBytes(inputStream);
// System.out.println("[4]客户端反序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
System.out.println("[4]客户端反序列化出bytes length:" + bytes.length);
RpcResponse response = serializeProtocol.deserialize(RpcResponse.class, bytes);
return response;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private byte[] readBytes(InputStream inputStream) throws IOException {
if (inputStream == null) {
throw new RuntimeException("input为空");
}
// return fun1(inputStream);
return fun2(inputStream);
// return fun3(inputStream);
}
private byte[] fun1(InputStream inputStream) throws IOException {
// 有个前提是数据最大是1024,并没有迭代读取数据
byte[] bytes = new byte[1024];
int count = inputStream.read(bytes, 0, 1024);
return Arrays.copyOf(bytes, count);
}
private byte[] fun2(InputStream inputStream) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int bufesize = 1024;
while (true) {
byte[] data = new byte[bufesize];
int count = inputStream.read(data,0,bufesize);
byteArrayOutputStream.write(data, 0, count);
if (count < bufesize) {
break;
}
}
return byteArrayOutputStream.toByteArray();
}
/**
* 有问题的fun3,调用之后会阻塞在read,可通过jstack查看相关信息
* @param inputStream
* @return
* @throws IOException
*/
private byte[] fun3(InputStream inputStream) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int bufesize = 1024;
byte[] buff = new byte[bufesize];
int rc = 0;
while ((rc = inputStream.read(buff, 0, bufesize)) > 0) {
byteArrayOutputStream.write(buff, 0, rc);
buff = new byte[bufesize];
}
byte[] bytes = byteArrayOutputStream.toByteArray();
return bytes;
}
}
四个方法也是依次针对上述四个步骤的流程操作,其中包含了对byte数据(内容和长度)的输出观察,在此就不细说了。重点关注下fun1、fun2、fun3 三个方法的细节操作,其中fun1和fun2都测试是没有问题的,唯独第三个使用的inputStream.read
会时常出现阻塞的情况,如下图观察线程运行情况。
目前还没有深入的了解InputStream.read方法的原理细节,本人也需要仔细学习和了解,后面将会作为一个单独的学习笔记进行补充说明
序列化&反序列化接口
public interface SerializeProtocol {
/**
* 序列化
*/
<T> byte[] serialize(Class<T> clazz, T t);
/**
* 反序列化
*/
<T> T deserialize(Class<T> clazz, byte[] bytes);
}
这个就没什么可说的,一个非常简单的序列化和反序列化接口,序列化返回byte数据,反序列化根据泛型返回对应实体对象
Hessian序列化&反序列化实现
public class HessianSerialize implements SerializeProtocol {
@Override
public <T> byte[] serialize(Class<T> clazz, T t) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Hessian2Output hessian2Output = new Hessian2Output(outputStream);
try {
hessian2Output.writeObject(t);
// NOTICE 验证过,一定需要在flush刷新之前关闭hessian2Output,否则无法有效获取字节数据
} catch (IOException e) {
throw new RuntimeException(e.getMessage());
} finally {
try {
hessian2Output.close();
} catch (IOException e){
e.printStackTrace();
}
}
try {
outputStream.flush();
byte[] bytes = outputStream.toByteArray();
return bytes;
} catch (IOException e) {
throw new RuntimeException(e.getMessage());
} finally {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
Hessian2Input hessian2Input = new Hessian2Input(inputStream);
try {
T t = (T) hessian2Input.readObject();
return t;
} catch (IOException e) {
throw new RuntimeException(e.getMessage());
} finally {
try {
hessian2Input.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
关于Hessian的使用细节,自行查询官网http://hessian.caucho.com/
这里并未对抛出的异常做很好的处理,只是简单的抛出、日志输出而已
关于Java内置的序列化方式就不贴出来了,代码较为简单,只需要按照类似的套路自行编写即可。
实践
样例按照手写RPC框架(2)-引入zookeeper做服务治理所说的保持一致。
Hessian序列化工具
imageimage
Java内置序列化工具
imageimage
对比这两者结果很明显
- Hessian:请求数据 204 字节,响应数据 75 字节
- Java内置:请求数据 430 字节,响应数据 280 字节
单就这一个简单试验而言,Hessian的效率就比Java内置的高出100%还要多,可见在RPC框架中一个优秀的序列化框架多么重要,毕竟数据的大小即影响网络传输的速率,也影响序列号和反序列化的执行性能。
总结思考
本次更新并没有更新太多内容,只是在v2版本上替换了之前的Java内置的序列化工具,而且本文也只是介绍了Hessian序列化工具,其实Google的ProtoBuffer也是一个不错的选择,甚至FastJson也可作为序列化工具使用。
文中还遗留了一个问题:InputStream.read 何时会被阻塞?,暂时未对其细节原理有更多的认识,接下来会出一篇附加的学习笔记好好学习总结一下其原因。
整个RPC学习笔记不出意外的话,应该还剩下3篇,一篇引入Netty,替换当前的BIO模型;一篇引入日志,并完善整个的代码的一些异常点;最后一篇结合Spring,使其成为一个项目中真正可用的Simple-RPC框架。此外关于SPI、快速失败、监控、等由于本人能力&精力问题看时间更新。
本学习笔记主要目的是学习和了解RPC框架,并及时进行总结和反思
如有想需要demo代码的可关注本人微信公众号,给我发私信。
本人微信公众号(搜索jwfy)欢迎关注
微信公众号
网友评论