美文网首页Java 杂谈手写RPC框架
手写RPC框架(3)-引入Hessian序列化工具

手写RPC框架(3)-引入Hessian序列化工具

作者: jwfy | 来源:发表于2019-07-14 21:49 被阅读12次

    本人微信公众号(jwfy)欢迎关注

    手写RPC框架
    1、手写一个RPC框架,看看100个线程同时调用效果如何
    2、手写RPC框架(2)-引入zookeeper做服务治理

    本次进行第三个版本的迭代,支持了自定义序列化工具,在代码中默认实现了Java内置的序列化方式以及Hessian序列化方式,并对比其两者的效果,以及泛型参数传递的小优化点、空指针的处理等。最后聊一下InputStream.read 读取数据的问题以及整个手写框架的进度情况

    image

    这张图在手写RPC框架(2)-引入zookeeper做服务治理已经贴出了过,而我们这次的重点也就是在这里的1-4个点上。

    • 1、客户端代理对象产生的RpcRequest序列化成byte,发送给服务端
    • 2、服务端收到byte数据,反序列化成RpcRequest对象
    • 3、服务端生成数据后,拼接成RpcResponse,序列化成byte,发送给客户端
    • 4、客户端收到byte数据

    在使用类似于Hessian序列化工具时,需要先引入该jar包

    <!--hessian-->
    <dependency>
        <groupId>com.caucho</groupId>
        <artifactId>hessian</artifactId>
        <version>4.0.38</version>
    </dependency>
    

    RPC 实践 V3版本

    image

    圈住的代码就是本次的重点序列化和反序列化

    MessageProtocol 消息协议

    /**
     * 请求、应答 解析和反解析,包含了序列化以及反序列化操作
     *
     * @author jwfy
     */
    public interface MessageProtocol {
    
        /**
         * 服务端解析从网络传输的数据,转变成request
         * @param inputStream
         * @return
         */
        RpcRequest serviceToRequest(InputStream inputStream);
    
        /**
         * 服务端把计算结果包装好,通过outputStream 返回给客户端
         * @param response
         * @param outputStream
         * @param <T>
         */
         <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);
    
        /**
         * 客户端把请求拼接好,通过outputStream发送到服务端
         * @param request
         * @param outputStream
         */
         void clientToRequest(RpcRequest request, OutputStream outputStream);
    
        /**
         * 客户端接收到服务端响应的结果,转变成response
         * @param inputStream
         */
        <T> RpcResponse<T>  clientGetResponse(InputStream inputStream);
    }
    

    主要是修改了serviceToRequest以及clientGetResponse接口,从参数挪到返回值,使其更加容易理解

    MessageProtocol 消息协议实现类

    public class DefaultMessageProtocol implements MessageProtocol {
    
        private SerializeProtocol serializeProtocol;
    
        public DefaultMessageProtocol() {
            // 默认是采用了Hessian协议
            this.serializeProtocol = new HessianSerialize();
        }
    
        public void setSerializeProtocol(SerializeProtocol serializeProtocol) {
            // 可通过set方法替换序列化协议
            this.serializeProtocol = serializeProtocol;
        }
    
        @Override
        public RpcRequest serviceToRequest(InputStream inputStream) {
            try {
                // 2、bytes -> request 反序列化
                byte[] bytes = readBytes(inputStream);
                // System.out.println("[2]服务端反序列化出obj:[" + new String(bytes) + "], length:" + bytes.length);
                System.out.println("[2]服务端反序列化出obj length:" + bytes.length);
                RpcRequest request = serializeProtocol.deserialize(RpcRequest.class, bytes);
                return request;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        @Override
        public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
            try {
                // 3、把response 序列化成bytes 传给客户端
                byte[] bytes = serializeProtocol.serialize(RpcResponse.class, response);
                // System.out.println("[3]服务端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
                System.out.println("[3]服务端序列化出bytes length:" + bytes.length);
                outputStream.write(bytes);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void clientToRequest(RpcRequest request, OutputStream outputStream) {
            try {
                // 1、先把这个request -> bytes 序列化掉
                byte[] bytes = serializeProtocol.serialize(RpcRequest.class, request);
                // System.out.println("[1]客户端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
                System.out.println("[1]客户端序列化出bytes length:" + bytes.length);
                outputStream.write(bytes);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public <T> RpcResponse<T>  clientGetResponse(InputStream inputStream) {
            try {
                // 4、bytes 反序列化成response
                byte[] bytes = readBytes(inputStream);
                // System.out.println("[4]客户端反序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
                System.out.println("[4]客户端反序列化出bytes length:" + bytes.length);
                RpcResponse response = serializeProtocol.deserialize(RpcResponse.class, bytes);
                return response;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        private byte[] readBytes(InputStream inputStream) throws IOException {
            if (inputStream == null) {
                throw new RuntimeException("input为空");
            }
            // return fun1(inputStream);
            return fun2(inputStream);
            // return fun3(inputStream);
        }
    
        private byte[] fun1(InputStream inputStream) throws IOException {
            // 有个前提是数据最大是1024,并没有迭代读取数据
            byte[] bytes = new byte[1024];
            int count = inputStream.read(bytes, 0, 1024);
            return Arrays.copyOf(bytes, count);
        }
    
        private byte[] fun2(InputStream inputStream) throws IOException {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            int bufesize = 1024;
            while (true) {
                byte[] data = new byte[bufesize];
                int count = inputStream.read(data,0,bufesize);
                byteArrayOutputStream.write(data, 0, count);
                if (count < bufesize) {
                    break;
                }
            }
            return byteArrayOutputStream.toByteArray();
        }
    
        /**
         * 有问题的fun3,调用之后会阻塞在read,可通过jstack查看相关信息
         * @param inputStream
         * @return
         * @throws IOException
         */
        private byte[] fun3(InputStream inputStream) throws IOException {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            int bufesize = 1024;
    
            byte[] buff = new byte[bufesize];
            int rc = 0;
            while ((rc = inputStream.read(buff, 0, bufesize)) > 0) {
                byteArrayOutputStream.write(buff, 0, rc);
                buff = new byte[bufesize];
            }
            byte[] bytes = byteArrayOutputStream.toByteArray();
            return bytes;
        }
    
    }
    

    四个方法也是依次针对上述四个步骤的流程操作,其中包含了对byte数据(内容和长度)的输出观察,在此就不细说了。重点关注下fun1、fun2、fun3 三个方法的细节操作,其中fun1和fun2都测试是没有问题的唯独第三个使用的inputStream.read会时常出现阻塞的情况,如下图观察线程运行情况。

    image

    目前还没有深入的了解InputStream.read方法的原理细节,本人也需要仔细学习和了解,后面将会作为一个单独的学习笔记进行补充说明

    序列化&反序列化接口

    public interface SerializeProtocol {
        /**
         * 序列化
         */
        <T> byte[] serialize(Class<T> clazz, T t);
    
        /**
         * 反序列化
         */
         <T> T deserialize(Class<T> clazz, byte[] bytes);
    }
    

    这个就没什么可说的,一个非常简单的序列化和反序列化接口,序列化返回byte数据,反序列化根据泛型返回对应实体对象

    Hessian序列化&反序列化实现

    public class HessianSerialize implements SerializeProtocol {
    
        @Override
        public <T> byte[] serialize(Class<T> clazz, T t) {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            Hessian2Output hessian2Output = new Hessian2Output(outputStream);
            try {
                hessian2Output.writeObject(t);
                // NOTICE 验证过,一定需要在flush刷新之前关闭hessian2Output,否则无法有效获取字节数据
            } catch (IOException e) {
                throw new RuntimeException(e.getMessage());
            } finally {
                try {
                    hessian2Output.close();
                } catch (IOException e){
                    e.printStackTrace();
                }
            }
            try {
                outputStream.flush();
                byte[] bytes = outputStream.toByteArray();
                return bytes;
            } catch (IOException e) {
                throw new RuntimeException(e.getMessage());
            } finally {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public <T> T deserialize(Class<T> clazz, byte[] bytes) {
            ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
            Hessian2Input hessian2Input = new Hessian2Input(inputStream);
            try {
                T t = (T) hessian2Input.readObject();
                return t;
            } catch (IOException e) {
                throw new RuntimeException(e.getMessage());
            } finally {
                try {
                    hessian2Input.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    关于Hessian的使用细节,自行查询官网http://hessian.caucho.com/
    这里并未对抛出的异常做很好的处理,只是简单的抛出、日志输出而已
    关于Java内置的序列化方式就不贴出来了,代码较为简单,只需要按照类似的套路自行编写即可。

    实践

    样例按照手写RPC框架(2)-引入zookeeper做服务治理所说的保持一致。

    Hessian序列化工具

    image
    image

    Java内置序列化工具

    image
    image

    对比这两者结果很明显

    • Hessian:请求数据 204 字节,响应数据 75 字节
    • Java内置:请求数据 430 字节,响应数据 280 字节

    单就这一个简单试验而言,Hessian的效率就比Java内置的高出100%还要多,可见在RPC框架中一个优秀的序列化框架多么重要,毕竟数据的大小即影响网络传输的速率,也影响序列号和反序列化的执行性能

    总结思考

    本次更新并没有更新太多内容,只是在v2版本上替换了之前的Java内置的序列化工具,而且本文也只是介绍了Hessian序列化工具,其实Google的ProtoBuffer也是一个不错的选择,甚至FastJson也可作为序列化工具使用。

    文中还遗留了一个问题:InputStream.read 何时会被阻塞?,暂时未对其细节原理有更多的认识,接下来会出一篇附加的学习笔记好好学习总结一下其原因。

    整个RPC学习笔记不出意外的话,应该还剩下3篇,一篇引入Netty,替换当前的BIO模型;一篇引入日志,并完善整个的代码的一些异常点;最后一篇结合Spring,使其成为一个项目中真正可用的Simple-RPC框架。此外关于SPI、快速失败、监控、等由于本人能力&精力问题看时间更新。

    本学习笔记主要目的是学习和了解RPC框架,并及时进行总结和反思

    如有想需要demo代码的可关注本人微信公众号,给我发私信。

    本人微信公众号(搜索jwfy)欢迎关注

    微信公众号

    相关文章

      网友评论

        本文标题:手写RPC框架(3)-引入Hessian序列化工具

        本文链接:https://www.haomeiwen.com/subject/auaikctx.html