netty的例子
maven依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.27.final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.14.GA</version>
</dependency>
服务端:
package com.springboot.vip.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HelloServer {
private int port;
public HelloServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用来接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用来处理已经被接收的连接
System.out.println("准备运行端口:" + port);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 这里告诉Channel如何接收新的连接
.childHandler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 自定义处理类
ch.pipeline().addLast(new HelloServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();
// 等待服务器socket关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 10110;
new HelloServer(port).run();
}
}
服务端处理器:
package com.springboot.vip.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
/**
* 输出接收到的消息
* @author Coder
*
*/
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到数据时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf in = (ByteBuf)msg;
System.out.print(in.toString(CharsetUtil.UTF_8));
} finally {
// 抛弃收到的数据
ReferenceCountUtil.release(msg);
}
// ctx.write(msg);
// ctx.flush();
}
/**
* 当Netty由于IO错误或者处理器在处理事件时抛出异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
验证:
item2下telnet
telnet 127.0.0.1 10110
然后在item2下输入,则会在console下看到对应的输出
栗子2:
netty版本
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
public void bind(int port) {
NioEventLoopGroup pGroup = new NioEventLoopGroup(1);
NioEventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
});
try {
ChannelFuture cf = server.bind(port).sync();
System.out.println("server 连接成功:" + port);
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port=8080;
TimeServer server=new TimeServer();
server.bind(port);
}
}
server handler:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Date;
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server(Thread:" + Thread.currentThread() + ") receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
}
client:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(loopGroup)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
cf.channel().closeFuture().sync();
} finally {
loopGroup.shutdownGracefully();
}
}
}
client handler:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class TimeClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
for (int i = 0; i < 5; i++) {
String order = "QUERY TIME ORDER";
ByteBuf buf = Unpooled.buffer(order.length());
buf.writeBytes(order.getBytes());
context.writeAndFlush(buf);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=(ByteBuf) msg;
byte[] bytes=new byte[buf.readableBytes()];
buf.readBytes(bytes);
String txt=new String(bytes,"UTF-8");
System.out.println("Now is:"+txt);
}
}
Netty提供的经过扩展的Buffer相对NIO中的有个许多优势,作为数据存取非常重要的一块,我们来看看Netty中的Buffer有什么特点。
ByteBuf
1.ByteBuf读写指针
在ByteBuffer中,读写指针都是position,而在ByteBuf中,读写指针分别为readerIndex和writerIndex,直观看上去ByteBuffer仅用了一个指针就实现了两个指针的功能,节省了变量,但是当对于ByteBuffer的读写状态切换的时候必须要调用flip方法,而当下一次写之前,必须要将Buffe中的内容读完,再调用clear方法。每次读之前调用flip,写之前调用clear,这样无疑给开发带来了繁琐的步骤,而且内容没有读完是不能写的,这样非常不灵活。相比之下我们看看ByteBuf,读的时候仅仅依赖readerIndex指针,写的时候仅仅依赖writerIndex指针,不需每次读写之前调用对应的方法,而且没有必须一次读完的限制。
2.零拷贝
Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。
Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。
3.引用计数与池化技术
在Netty中,每个被申请的Buffer对于Netty来说都可能是很宝贵的资源,因此为了获得对于内存的申请与回收更多的控制权,Netty自己根据引用计数法去实现了内存的管理。Netty对于Buffer的使用都是基于直接内存(DirectBuffer)实现的,大大提高I/O操作的效率,然而DirectBuffer和HeapBuffer相比之下除了I/O操作效率高之外还有一个天生的缺点,即对于DirectBuffer的申请相比HeapBuffer效率更低,因此Netty结合引用计数实现了PolledBuffer,即池化的用法,当引用计数等于0的时候,Netty将Buffer回收致池中,在下一次申请Buffer的没某个时刻会被复用
基于udp通信
bootstrap端启动参数配置:9999 /Users/penny/code/vip/from.txt
ServerBootStrap参数:9999
代码实例
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws Exception {
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
for (; ; ) {
long len = file.length();
if (len < pointer) {
pointer = len;
} else if (len > pointer) {
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(pointer);
String line;
while ((line = raf.readLine()) != null) {
ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line));
}
pointer = raf.getFilePointer();
raf.close();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) {
if (args.length != 2) {
throw new IllegalStateException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255", Integer.parseInt(args[0])), new File(args[1]));
try {
broadcaster.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
broadcaster.stop();
}
}
}
Encoder
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.List;
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress romoteAddress;
public LogEventEncoder(InetSocketAddress romoteAddress) {
this.romoteAddress = romoteAddress;
}
@Override
protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file =logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg=logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf=ctx.alloc().buffer(file.length+msg.length+1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR);
buf.writeBytes(msg);
out.add(new DatagramPacket(buf,romoteAddress));
}
}
Decoder
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import java.util.List;
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
ByteBuf data = datagramPacket.content();
int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
LogEvent event = new LogEvent(datagramPacket.sender(), System.currentTimeMillis(), filename, logMsg);
out.add(event);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address){
group=new NioEventLoopGroup();
bootstrap=new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
}).localAddress(address);
}
public Channel bind(){
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop(){
group.shutdownGracefully();
}
public static void main(String[] args) {
if (args.length!=1){
throw new IllegalArgumentException("usage : log event monitor<port>");
}
LogEventMonitor monitor=new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
try {
Channel channel=monitor.bind();
System.out.println(" log event monitor running");
channel.closeFuture().syncUninterruptibly();
}finally {
monitor.stop();
}
}
}
测试
item2(mac)中执行文件操作
echo "11111111111111111111111111111111111"> from.txt
在上次操作的基础上继续增加内容
echo "111111111111111111111111111111111112222222222222"> from.txt
则LogEventMonitor控制台显示:(上次内容的增量)
1537008098680 [/192.168.51.62:63169] [] : 1111111111111111111111111111111111
1537008144809 [/192.168.51.62:63169] [] : 222222222222
总结:
【参考博客】
1.https://www.jianshu.com/p/0d0eece6d467
2.https://www.cnblogs.com/wjlstation/p/8972895.html
3.https://yq.aliyun.com/articles/290834
网友评论