美文网首页
Dubbo源码分析(十一) 序列化机制

Dubbo源码分析(十一) 序列化机制

作者: skyguard | 来源:发表于2018-11-13 09:55 被阅读0次

    下面我们来说一下Dubbo的序列化机制。数据在网络上进行传输,先要把数据序列化成字节流,接收端收到数据后,再反序列化成对象。现在我们就来说一下Dubbo的序列化机制是怎么实现的。
    先来看一个类TransportCodec,这个类有两个方法:encode和decode,分别是序列化和反序列化的实现。来看一下是怎么实现的

    public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
        // 获得反序列化的 ObjectOutput 对象
        OutputStream output = new ChannelBufferOutputStream(buffer);
        ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
        // 写入 ObjectOutput
        encodeData(channel, objectOutput, message);
        objectOutput.flushBuffer();
        // 释放
        if (objectOutput instanceof Cleanable) {
            ((Cleanable) objectOutput).cleanup();
        }
    }
    
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 反序列化
        InputStream input = new ChannelBufferInputStream(buffer);
        ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);
        Object object = decodeData(channel, objectInput);
        // 释放
        if (objectInput instanceof Cleanable) {
            ((Cleanable) objectInput).cleanup();
        }
        return object;
    }
    

    DubboCodec是TransportCodec的子类,这个类也实现了序列化和反序列化。看一下是怎么实现的

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2];
        // 获得 Serialization 对象
        byte proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 获得请求||响应编号
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        // 解析响应
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            // 若是心跳事件,进行设置
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 设置状态
            // get status.
            byte status = header[3];
            res.setStatus(status);
            // 正常响应状态
            if (status == Response.OK) {
                try {
                    Object data;
                    // 解码心跳事件
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    // 解码其它事件
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    // 解码普通响应
                    } else {
                        DecodeableRpcResult result;
                        // 在通信框架(例如,Netty)的 IO 线程,解码
                        if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
                            result.decode();
                        // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
                        } else {
                            result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    // 设置结果
                    res.setResult(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            // 异常响应状态
            } else {
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        // 解析请求
        } else {
            // decode request.
            Request req = new Request(id);
            req.setVersion("2.0.0");
            // 是否需要响应
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            // 若是心跳事件,进行设置
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                // 解码心跳事件
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                // 解码其它事件
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                // 解码普通请求
                } else {
                    // 在通信框架(例如,Netty)的 IO 线程,解码
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }
    

    再到deserialize方法

    private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) throws IOException {
        return serialization.deserialize(url, is);
    }
    

    下面我们来看一个接口Serialization,这个接口是序列化器的接口

     /**
     * create serializer
     *
     * 创建 ObjectOutput 对象,序列化输出到 OutputStream
     *
     * @param url URL
     * @param output 输出流
     * @return serializer
     * @throws IOException 当发生 IO 异常时
     */
    @Adaptive
    ObjectOutput serialize(URL url, OutputStream output) throws IOException;
    
    /**
     * create deserializer
     *
     * 创建 ObjectInput 对象,从 InputStream 反序列化
     *
     * @param url URL
     * @param input 输入流
     * @return deserializer
     * @throws IOException 当发生 IO 异常时
     */
    @Adaptive
    ObjectInput deserialize(URL url, InputStream input) throws IOException;
    

    JavaSerialization实现了Serialization,是用jdk序列化的方式实现的

    public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
        return new JavaObjectOutput(out);
    }
    
    public ObjectInput deserialize(URL url, InputStream is) throws IOException {
        return new JavaObjectInput(is);
    }
    

    看下JavaObjectInput的实现

    public Object readObject() throws IOException, ClassNotFoundException {
        byte b = getObjectInputStream().readByte();
        if (b == 0)
            return null;
    
        return getObjectInputStream().readObject();
    }
    

    再看下JavaObjectOutput的实现

    public void writeObject(Object obj) throws IOException {
        if (obj == null) { // 空
            getObjectOutputStream().writeByte(0); // 空
        } else {
            getObjectOutputStream().writeByte(1); // 非空
            getObjectOutputStream().writeObject(obj); // 对象
        }
    }
    

    下面我们再介绍一种序列化的实现方式,就是kryo,这是一个高性能的序列化框架,我们来看一下Dubbo是怎么实现的。先来看一个类KryoSerialization

    public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
        return new KryoObjectOutput(out);
    }
    
    public ObjectInput deserialize(URL url, InputStream is) throws IOException {
        return new KryoObjectInput(is);
    }
    

    再看一下KryoObjectInput这个类

    public Object readObject() throws IOException, ClassNotFoundException {
        // TODO optimization
        try {
            return kryo.readClassAndObject(input);
        } catch (KryoException e) {
            throw new IOException(e);
        }
    }
    

    再看一下KryoObjectOutput这个类

    public void writeObject(Object v) throws IOException {
        // TODO carries class info every time.
        kryo.writeClassAndObject(output, v);
    }
    

    kryo对象是通过KryoUtils获取到的,看一下KryoUtils这个类

    public static Kryo get() {
        return kryoFactory.getKryo();
    }
    
    public static void release(Kryo kryo) {
        kryoFactory.returnKryo(kryo);
    }
    
    public static void register(Class<?> clazz) {
        kryoFactory.registerClass(clazz);
    }
    
    public static void setRegistrationRequired(boolean registrationRequired) {
        kryoFactory.setRegistrationRequired(registrationRequired);
    }
    

    这样就实现了kryo的序列化机制。
    Dubbo的序列化机制就介绍到这里了。

    相关文章

      网友评论

          本文标题:Dubbo源码分析(十一) 序列化机制

          本文链接:https://www.haomeiwen.com/subject/eozcfqtx.html