下面我们来说一下Dubbo的序列化机制。数据在网络上进行传输,先要把数据序列化成字节流,接收端收到数据后,再反序列化成对象。现在我们就来说一下Dubbo的序列化机制是怎么实现的。
先来看一个类TransportCodec,这个类有两个方法:encode和decode,分别是序列化和反序列化的实现。来看一下是怎么实现的
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
// 获得反序列化的 ObjectOutput 对象
OutputStream output = new ChannelBufferOutputStream(buffer);
ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
// 写入 ObjectOutput
encodeData(channel, objectOutput, message);
objectOutput.flushBuffer();
// 释放
if (objectOutput instanceof Cleanable) {
((Cleanable) objectOutput).cleanup();
}
}
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 反序列化
InputStream input = new ChannelBufferInputStream(buffer);
ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);
Object object = decodeData(channel, objectInput);
// 释放
if (objectInput instanceof Cleanable) {
((Cleanable) objectInput).cleanup();
}
return object;
}
DubboCodec是TransportCodec的子类,这个类也实现了序列化和反序列化。看一下是怎么实现的
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2];
// 获得 Serialization 对象
byte proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 获得请求||响应编号
// get request id.
long id = Bytes.bytes2long(header, 4);
// 解析响应
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 设置状态
// get status.
byte status = header[3];
res.setStatus(status);
// 正常响应状态
if (status == Response.OK) {
try {
Object data;
// 解码心跳事件
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解码其它事件
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解码普通响应
} else {
DecodeableRpcResult result;
// 在通信框架(例如,Netty)的 IO 线程,解码
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
result.decode();
// 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
} else {
result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
}
data = result;
}
// 设置结果
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 异常响应状态
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
// 解析请求
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
// 是否需要响应
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
// 解码心跳事件
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解码其它事件
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解码普通请求
} else {
// 在通信框架(例如,Netty)的 IO 线程,解码
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
// 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
} else {
inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
再到deserialize方法
private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) throws IOException {
return serialization.deserialize(url, is);
}
下面我们来看一个接口Serialization,这个接口是序列化器的接口
/**
* create serializer
*
* 创建 ObjectOutput 对象,序列化输出到 OutputStream
*
* @param url URL
* @param output 输出流
* @return serializer
* @throws IOException 当发生 IO 异常时
*/
@Adaptive
ObjectOutput serialize(URL url, OutputStream output) throws IOException;
/**
* create deserializer
*
* 创建 ObjectInput 对象,从 InputStream 反序列化
*
* @param url URL
* @param input 输入流
* @return deserializer
* @throws IOException 当发生 IO 异常时
*/
@Adaptive
ObjectInput deserialize(URL url, InputStream input) throws IOException;
JavaSerialization实现了Serialization,是用jdk序列化的方式实现的
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
return new JavaObjectOutput(out);
}
public ObjectInput deserialize(URL url, InputStream is) throws IOException {
return new JavaObjectInput(is);
}
看下JavaObjectInput的实现
public Object readObject() throws IOException, ClassNotFoundException {
byte b = getObjectInputStream().readByte();
if (b == 0)
return null;
return getObjectInputStream().readObject();
}
再看下JavaObjectOutput的实现
public void writeObject(Object obj) throws IOException {
if (obj == null) { // 空
getObjectOutputStream().writeByte(0); // 空
} else {
getObjectOutputStream().writeByte(1); // 非空
getObjectOutputStream().writeObject(obj); // 对象
}
}
下面我们再介绍一种序列化的实现方式,就是kryo,这是一个高性能的序列化框架,我们来看一下Dubbo是怎么实现的。先来看一个类KryoSerialization
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
return new KryoObjectOutput(out);
}
public ObjectInput deserialize(URL url, InputStream is) throws IOException {
return new KryoObjectInput(is);
}
再看一下KryoObjectInput这个类
public Object readObject() throws IOException, ClassNotFoundException {
// TODO optimization
try {
return kryo.readClassAndObject(input);
} catch (KryoException e) {
throw new IOException(e);
}
}
再看一下KryoObjectOutput这个类
public void writeObject(Object v) throws IOException {
// TODO carries class info every time.
kryo.writeClassAndObject(output, v);
}
kryo对象是通过KryoUtils获取到的,看一下KryoUtils这个类
public static Kryo get() {
return kryoFactory.getKryo();
}
public static void release(Kryo kryo) {
kryoFactory.returnKryo(kryo);
}
public static void register(Class<?> clazz) {
kryoFactory.registerClass(clazz);
}
public static void setRegistrationRequired(boolean registrationRequired) {
kryoFactory.setRegistrationRequired(registrationRequired);
}
这样就实现了kryo的序列化机制。
Dubbo的序列化机制就介绍到这里了。
网友评论