美文网首页
SOCKET编程关于stream-based的一些思考

SOCKET编程关于stream-based的一些思考

作者: bingoc | 来源:发表于2016-03-08 21:07 被阅读148次

附上netty 5 用户指南地址http://ifeve.com/netty5-user-guide/

流数据的传输处理

一个小的Socket Buffer问题

在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的TCP/TP协议栈已经接收了3个数据包: netty5_1.png

由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段。

netty5_2.png

因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:

netty5_3.png

问题总结一下就是客户端连续发了三个包,服务端在收到的时候可能是不同片段大小收到的,如果在每一次收到一片时便处理数据的话,很有可能数据是不完整的。(我的猜测是,TCP底层的窗口机制引起的,窗口的挪动会让已经完成的数据向上传送,而没有收到还会等待。这也是保证顺序不错乱的机制,所以我在一开始考虑会不会有顺序错乱问题,这个应该是在底层已经解决掉了)

Netty提供的解决方案总结

第一种:缓存收到的字节,当缓存的字节数目到达要求时,读取buff完成业务逻辑

(1)在handler生命周期手动缓存
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)在生命周期中,声明一个4字节的缓存大小
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)在生命周期结束的时候释放掉
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)将每次收到的字节流写进缓存里
        m.release();

        if (buf.readableBytes() >= 4) { // (3)当缓存大小达到4字节时处理实际业务
            long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(2)利用netty的解码函数来控制字节

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)extends ByteToMessageDecoder类
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)回调函数将字节流写入in中来积累缓存
        if (in.readableBytes() < 4) { // (3)判断in的大小
            return;
        }

        out.add(in.readBytes(4)); // (4)将4个字节输出到out中
    }
}

第二种:利用POJO(Plain Ordinary Java Object)来代替ByteBuf

构造新类型(感觉和javabean一样)
package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final int value;

    public UnixTime() {
        this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
    }

    public UnixTime(int value) {
        this.value = value;
    }

    public int value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}
在decode回调函数中,将bytebuf组装成unixTime对象
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readInt()));
}

总结:关于bytebuf大小的约定,我觉得在发送的时候在首部加上包头,即用1字节来表示整个包的大小,这样就可以控制可变的缓存大小了。虽然增加了数据量但是更适应于可变的环境。同样在使用POJO时,加上类型标识符,也能拥有更加广泛的可行性。

相关文章

网友评论

      本文标题:SOCKET编程关于stream-based的一些思考

      本文链接:https://www.haomeiwen.com/subject/akpzkttx.html