写一个丢弃服务器
世界上最简单的协议并非是 hello world ,而是 丢弃 ,这个协议丢弃所有收到的数据没有任何返回。
我们可以直接使用handler实现
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Discard the received data silently.
((ByteBuf) msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
- DiscardServerHandler 继承了ChannelInboundHandlerAdapter , ChannelInboundHandlerAdapter 是 ChannelInboundHandler的实现类。
ChannelInboundHandlerAdapter 提供了多个 可以重写的事件处理方法,当前已经足够使用而非自己实现接口。 - 我们重写了channelRead() 方法,该方法会在接收到消息的时候被调用。在该例子中接收到的消息 是ByteBuf。
- 为了实现 DISCARD 协议,这个handler 忽略收到的消息,ByteBuf 是一个引用计数对象,应该通过 release() 方法 显式的被释放。handler 有责任释放 所有传递过来的引用计数对象。通常,这个方法应该像下面那样重写
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
exceptionCaught 会在 Netty 处理I O 错误 或者 handler 处理事件时发生异常 被调用。在大多数情况下,应该记录异常,对应的channel 应该关闭。然而 ,我们可以有不同的实现依赖于我们想怎么处理此异常情况。例如我们发送一个待error code 的响应,在关闭链接之前。
目前为止,我们已经实现了一半的DISCARDSERVER , 新建一个main () 启动 handler。
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception{
int port = 8888 ;
if (args.length >0 ) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
private void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 1
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 2
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 3
.childHandler(new ChannelInitializer<SocketChannel>() { // 4
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 5
.childOption(ChannelOption.SO_KEEPALIVE, true); // 6
// bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // 7
// wait until the server socket is closed
// you can do that to gracefully
// shutdown you server
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
- NioEventLoopGroup 是一个处理IO操作的 多线程 event loop。Netty 提供了多种传输的 EventLoopGroup 实现。 在这个例子中我们实现了服务端应用程序,使用了两个NioEventLoopGroup 。第一个 boss 用来接受 传入连接。第二个是worker 处理已经接受的连接,注册已经接受的连接到worker。多少线程被创建,他们怎么映射到创建的channels 取决于EventLoopGroup 的实现,此外他们甚至是能通过构造器设置。
2.ServerBootstrap 是一个帮助类 创立 一个server 。你能直接通过使用channels ,那是一个繁杂的过程,大多数情况下不需要那样做。
3.这里我们指定 NioServerSocketChannel 这个class 来实例化channel 来接受新链接 - ChannelInitializer 是一个特殊的handler 目的是帮助用户设置新的channel 。最可能的情况是 你想为新的 channel 设置 channelPipeline ,通过增加一些handler 例如 DiscardServerHandler 提升网络应用程序。随着应用程序的复杂,你可能在pipeline中 添加更多的handler,最终抽出匿名类放入 顶级类。
5.在channel 实现中可以设置参数。我们写的是 TCP/IP Server 。我们允许设置 例如 tcpNoDelay 、keepAlive 的socket options。请参照 ChannelOption 的 api 文档 和 channel Config 具体实现,了解 ChannelOptions 的概况。
6.注意 option 和 childOption, option() 是 NioServerSocketChannel 接受进入的连接,childOption() 是被父 ServerChannel 接受的 channels 。这里是NioSocketChannel
7.绑定端口启动server, 这里我们绑定 所有网卡 8080 端口,我们可以多次调用bind(), 绑定不同的地址。
观察接受到的数据
现在我们写了第一个服务器,我们需要测试它是否工作。最简单的方式是 使用Telnet 命令,我们可以在 命令行中输入 telnet localhost 8080。
然而怎么才能知道我们的服务是正常工作的,我们不知道因为这是一个 discard server. 你将不会收到任何回应。我们需要修改server 打印收到数据。
我们需要修改 DiscardServerHandler 的 channelRead() 方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Discard the received data silently.
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // 1
System.out.println((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // 2
}
}
- 这个低效的loop 可以被简化, System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
- 此外 这里可用 in.release()
写一个响应Server
目前我们消费数据没有任何回应,通常一个服务应该回应这个请求,让我们学习响应消息给客服端,实现ECHO 协议,回写收到的消息。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg); // 1
ctx.flush(); // 2
}
- ChannelHandlerContext 提供了多个操作 可以让你去触发不同的IO 事件和操作,这里调用 write(object) 逐字返回写入的数据。我不不用像 DISCARD 那样 release 。因为netty 释放当我写流到网络
- ctx.write(Object) 不会写消息到网络。他会被缓存 当调用ctx.flush() 会被 flush。此外简单的调用 ctx.writeAndFlush(msg)
写一个 Time Server
在这个例子中,我们学习怎么构建和发送消息,完成时关闭连接。因为我们忽略收到的消息而去发送消息一旦连接建立。我们不能用channelRead(),应该 重写 channelActive()。实现如下
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // 1
final ByteBuf time = ctx.alloc().buffer(4); // 2
time.writeInt( (int) (System.currentTimeMillis() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);// 3
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assert f == future;
ctx.close();
}
});// 4
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- channelActive 被调用在连接建立准备产生流量时,写一个32位整数代表这个方法的当前时间。
2.为了发送消息,我们需要分配一个buffer 承载这个消息,我们准备写一个32位整数,我们至少要分配一个4 bytes ,得到当前的 ByteBufAllocator 通过 ChannelHandlerContext.alloc(),分配一个新 buffer.
3.通常我们写 构造好的消息
稍等,flip在哪,我们 在NIO 不需要调用 java.nio.ByteBuffer.flip() ? ByteBuf 没有 类似的方法,因为他有两个指针,一个读操作,一个写操作。当写的时候写指针增加而读指针不变,写和读指针各自 start 和 end。另一点需要注意的是 ChannelHandlerContext.write() (and writeAndFlush()) method 返回ChannelFuture
另一个需要注意的点是 ChannelHandlerContext.write() 和 writeAndFlush() 返回一个 ChannelFuture ,ChannelFuture 代表一个还未发生的I/O 操作,因为这个是异步的,例如 消息尚未被发送可能连接已经关闭了。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此我们应该在 ChannelFuture 被完成以后调用close() .操作完成完成后会通知 listener ,请注意 close() 方法不会立即关闭连接,他返回一个 ChannelFuture。
- 我们怎么得到通知在一个写请求结束时,我们可以在 返回的ChannelFuture上加一个 ChannelFutureListener 操作完成是关闭 channel。或者直接使用 预定义的 f.addListener(ChannelFutureListener.CLOSE);
写一个时间客户端
不像DISCARD 和ECHO 服务端, 我们需要一个时间客户端翻译int 成为日期.
用netty 实现客户端和服务端的最大不同是使用的 Bootstrap 和 Channel 实现
public class TimeClient {
public static void main(String[] args) throws Exception{
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // 1
b.group(worker); // 2
b.channel(NioSocketChannel.class); // 3
b.option(ChannelOption.SO_KEEPALIVE, true); // 4
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// start the client
ChannelFuture f = b.connect(host, port).sync();// 5
// wait until the connection is closed
f.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
- Bootstrap 是一个类似 ServerBootstrap , 为 无连接 或者 客户端 Channel 使用。
- 我们只定义一个EventLoopGroup ,他将被用作 boss 和 worker group,尽管boss worker 在客户端未被使用。
- 使用NioSocketChannel 替换 NioServerSocketChannel 生成客户端 channel .
- 注意 我们没有使用childOption() ,因为客户端 SocketChannel 没有 父级。
- 我们用 connect() 方法替换 bind() 方法
正如你所看到的,和服务端的代码差别不大,ChannelHandler 是怎么实现的,接受32位整数翻译成可读的日期,打印翻译时间 关闭连接
class TimeClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // 1
try {
long time = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(time));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 在 TCP/IP 协议中,Netty将读到数据转为 ByteBuf
这个看起来 非常简单,和服务端例子看来没什么不同,然而 这个handler 有时会拒绝工作 抛出 IndexOutOfBoundsException 异常
处理基于流的的协议
一个小的 Socket Buffer 备注
在基于流传输的协议 例如TCP/IP 会把接受到数据放在缓冲区,不幸的是基于流的传输是基于字节的队列而非包的队列,甚至你发送的消息是两个对立的包 但是操作系统把他们当做一堆字节,不能保证你收到的一定是远端写的,假设操作系统发送了三个包
image.png
有很大的概率应用收到 如下的 片端
image.png
因此,作为接受端,无论是客户还是服务端,应该整理收到的数据转换为1个或多个有意义的片段 ,在上面的例子中,接受到的数据应该分割成如下
image.png
第一个方案
一个简单的方法是创建内部增长的buffer,直到所有的四个字节被内部缓冲收到,下面的TimeClientHandler 解决了这个问题。
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
buf = ctx.alloc().buffer(4); // 1
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
buf.release(); // 1
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // 2
m.release();
if (buf.readableBytes() > 4) { // 3
long time = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(time));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 一个ChannelHandler 可以有2个生命周期函数,1个是handlerAdded() 另一个是handlerRemoved(),可以执行一个任意任务只要不阻塞太长时间。
- 所有的数据都被积累转入buf
- Handler 必须检测buf 有充足的数据,如例子中的四字节,进行实际的业务逻辑,否则Netty 将会在收到更多数据的时候再次调用channelRead() ,最后所有的四个字节都被收集
第二个方案
尽管第一个解决方案解决了这个问题,改动不够简洁,想象一个复杂的协议有个不同的长度的复杂字段,我们的handler 不会被很好的维护。
我们可能想ChannelPipeline 中添加超过1个的ChannelHandler ,我们分割一个巨大的ChannelHandler 成多个模块化的handler 降低复杂度,TimeClientHandler可以拆分为两个。
- TimeDecoder 解决分包问题
- TimeClientHandler最简单版本
幸运的是,Netty 提供了可扩展的类,第一步我们可以开箱即用
public class TimeDecoder extends ByteToMessageDecoder { //1
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //2
if (in.readableBytes() < 4) { // 3
return;
}
out.add(in.readBytes(4)); // 4
}
}
- ByteToMessageDecoder是ChannelInboundHandler 的一个实现,易于解决分包问题。
- ByteToMessageDecoder 当收到信息时调用decode() 方法 内部维护一个可增长的buffer。
- 当buffer没有收到足够数据时直接返回不作处理,当收到更多消息时会再次调用 decode() 方法
- 如果decode() 添加一个对象到 out 上,意味着decoder 成功解码了一个消息,ByteToMessageDecoder 会抛弃缓冲区中已读部分,请记住你不需要解析多个消息,ByteToMessageDecoder 会decode 并加入out,直到没有数据。
我们需要将另一个Handler 插入 ChannelPipeline, 我们需要修改ChannelInitializer 实现
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一个富有冒险精神的,你可以尝试使用ReplayingDecoder 进一步简化这个 decoder ,你需要参考API 文档。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty 提供了大量的开箱即用的解码器
- io.netty.example.factorial 二进制协议
- io.netty.example.telnet 基于文本的协议
使用POJO替换 ByteBuf
在应用中使用POJO的优势是很明显的,Handler是可维护和可复用的 将数据 ByteBuf 抽取出来,我们用ByteBuf 仅仅是读取一个32位整数, 分割是非常有必要的实现真实的协议。
第一步创建一个POJO
public class UnixTime {
private Long value;
public UnixTime(Long value) {
this.value = value;
}
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public Long value() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
TimeDecoder 使用 UnixTime 替换 ByteBuf
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //2
if (in.readableBytes() < 4) { // 3
return;
}
out.add(new UnixTime(in.readUnsignedInt())); // 4
}
随着 decoder 升级 TimeClientHandler 更新
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UnixTime m = (UnixTime) msg; // 1
System.out.println(m);
ctx.close();
}
是不是优雅简单很多,服务端使用相同的技术 更新TimeServerHandler
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // 1
ChannelFuture f = ctx.writeAndFlush(new UnixTime());// 3
f.addListener(ChannelFutureListener.CLOSE);
}
唯一剩下的一块是 encoder 是 ChannelOutboundHandler 的实现,翻译UnixTime 成 ByteBuf ,它比decode 简单,因为它不需要拆包组装
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); //1
}
}
- 我们使用 ChannelPromise ,在我们编码 数据 写入网络时,netty 让它变为成功或失败,
我们不调用 ctx.flush() ,这个handler 有一个 flush() 方法,可以重写 flush操作
我们可以进一步简化 ,利用 MessageToByteEncoder
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
最后一个任务 TimeEncoder 插入 ChannelPipeline 在 TimeServerHandler的左侧
关闭应用程序
Netty 关闭应用程序 EventLoopGroup调用 shutdownGracefully,所有的 EventLoopGroup 和 所有属于group 的 Channel也被关闭
网友评论