Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty是一个NIO客户端、服务端框架。允许快速简单的开发网络应用程序。例如:服务端和客户端之间的协议。它最牛逼的地方在于简化了网络编程规范。例如:TCP和UDP的Socket服务。
“快速和容易”并不意味着结果应用程序将遇到可维护性或性能问题。 Netty已经仔细设计了从许多协议,如FTP,SMTP,HTTP和各种二进制和基于文本的遗留协议的实现获得的经验。 因此,Netty成功地找到了一种方法来实现易于开发,性能,稳定性和灵活性的应用程序。
NIO介绍:
Buffer:
Channel:
Selector:
Buffer对象使用demo:
package com.wjb.demo.nio;
import java.nio.IntBuffer;
/**
* Created by wjb on 2018/1/12.
* Buffer对象使用,有很多个Buffer抽象类,实际上是一个数组,对应JAVA的基本数据类型,除了boolean。
* Buffer对象有几个属性 position,limit,capacity
*/
public class BufferDemo {
public static void main(String[] args) {
//分配10个大小的容量,此时buffer是 [pos=0 lim=10 cap=10]
IntBuffer buffer = IntBuffer.allocate(10);
buffer.put(2);
buffer.put(3);
buffer.put(4);
//put了三个元素,此时buffer是 [pos=3 lim=10 cap=10]
System.out.println(buffer);
/**
* get()无参数方法获取position位置元素,此时position为3,无元素,所以返回0,并且position加1。
* 有参数就获取指定下标的元素。
* 此时buffer是 [pos=4 lim=10 cap=10]
*/
System.out.println(buffer.get());
buffer.flip();
/**
*
* flip()是复位的意思,此时buffer是 [pos=0 lim=4 cap=10]
*一般需要读取buffer中的元素时,必须调用flip()方法,否则异常。
* filp()方法后,position变成0,limit变为4(limit可以理解成buffer中实际存了多少元素的容器)
*/
System.out.println(buffer);
for (int i = 0;i<buffer.limit();i++){
System.out.println(buffer.get());
}
// System.out.println(buffer.capacity());
// System.out.println(buffer.limit());
System.out.println(buffer);
System.out.println(buffer.get());
}
/**
* 其它的一些方法使用
*/
public void wrapDemo(){
int[] array = new int[]{1,3,5};
//wrap()方法可以包裹一个数组成来Buffer对象
IntBuffer wrap = IntBuffer.wrap(array);
//只要1和3
IntBuffer.wrap(array,0,2);
IntBuffer buffer = IntBuffer.allocate(10);
//复制一份buffer
IntBuffer buffer2 = buffer.duplicate();
}
}
模拟客户端服务器通信
Client
package com.wjb.demo.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SocketChannel;
/**
* Created by wjb on 2018/1/12.
*/
public class Client {
public static void main(String[] args) {
SocketChannel channel =null;
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
channel = SocketChannel.open();
channel.connect(address);
while (true){
byte[] bytes = new byte[1024];
System.in.read(bytes);
buffer.put(bytes);
buffer.flip();
channel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Server
package com.wjb.demo.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* Created by wjb on 2018/1/12.
* Selector和Channel这两个概念和关系,Selector里有很多Channel,
* Selector会根据key来轮询通道
*
*/
public class Server implements Runnable {
private Selector selector;
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//打开多路复用器
this.selector = Selector.open();
//打开服务器通道
ServerSocketChannel channel = ServerSocketChannel.open();
//设置为非阻塞模式
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(port));
//把通道注册到Selector上,并监听阻塞事件
channel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("server start,port is :"+port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true){
try {
//Selector开始监听
this.selector.select();
//返回Selector选择的结果集
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()){
//key其实就是通道的key,意思就是选择了一个通道
SelectionKey key = keys.next();
//把选择的这个key从所有的keys中移除,避免重复执行
keys.remove();
//如果key是有效的
if (key.isValid()){
//如果为阻塞状态
if (key.isAcceptable()){
this.accept(key);
}
//如果是可读状态
if(key.isReadable()){
this.read(key);
}
//如果是可写状态
if (key.isWritable()){
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key){
//清空旧数据
this.readBuf.clear();
//获取注册的通道channel
SocketChannel channel = (SocketChannel) key.channel();
try {
int count = channel.read(this.readBuf);
if (count == -1){
key.channel().close();
key.cancel();
return;
}
//如果有数据就读取,别忘了复位
this.readBuf.flip();
//remainint()方法可获取缓冲取可读数据
byte[] bytes = new byte[this.readBuf.remaining()];
//接收缓冲区数据
this.readBuf.get(bytes);
System.out.println(new String(bytes));
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key){
//根据key获取当前通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
try {
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
channel.register(this.selector,SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8888)).start();
}
}
AIO介绍:
示例demo:
Client
package com.wjb.demo.nio.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
/**
* Created by wjb on 2018/1/12.
*/
public class Client implements Runnable {
private AsynchronousSocketChannel channel;
public Client() throws IOException {
channel = AsynchronousSocketChannel.open();
}
public void connect(){
channel.connect(new InetSocketAddress("127.0.0.1",8765));
}
public void write(String response){
try {
channel.write(ByteBuffer.wrap(response.getBytes())).get();
read();
} catch (Exception e) {
e.printStackTrace();
}
}
public void read(){
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
channel.read(buffer).get();
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes,"utf-8"));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//真实开发环境一般是放在一个web容器中的,不会像这样一直死循环的。
@Override
public void run() {
while (true){
}
}
public static void main(String[] args)throws Exception {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();
new Thread(c1).start();
new Thread(c2).start();
new Thread(c3).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
c1.write("aaaaa");
c2.write("bbbbb");
c3.write("ccccc");
}
}
Server
package com.wjb.demo.nio.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by wjb on 2018/1/12.
*/
public class Server {
private ExecutorService executorService;
private AsynchronousChannelGroup group;
public AsynchronousServerSocketChannel channel;
public Server(int port) {
try {
//创建线程池
executorService = Executors.newCachedThreadPool();
//通道组的概念,配合线程池使用
group = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
//创建服务器通道
channel = AsynchronousServerSocketChannel.open(group);
channel.bind(new InetSocketAddress(port));
System.out.println("server start,port is:"+port);
channel.accept(this,new ServerCompletionHandler());
//一直阻塞,不让服务器停止。实际不应如此,这里只是为了让线程不终止。
Thread.sleep(Integer.MAX_VALUE);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Server(8765);
}
}
ServerCompletionHandler
package com.wjb.demo.nio.aio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
/**
* Created by wjb on 2018/1/12.
*/
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
/**
* 类似于递归,或者叫拉力,反复调用Server里的accept()方法,保证多个客户端都能连上
* 因为AIO不像NIO是一直轮询的。
*
* @param channel
* @param server
*/
@Override
public void completed(AsynchronousSocketChannel channel, Server server) {
server.channel.accept(server, this);
read(channel);
}
@Override
public void failed(Throwable exc, Server server) {
exc.printStackTrace();
}
public void read(final AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String s = new String(attachment.array());
System.out.println("server response data is :" + s);
write(channel, s);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
public void write(AsynchronousSocketChannel channel, String data) {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(data.getBytes());
buffer.flip();
channel.write(buffer).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
netty使用示例:
Client:
package com.wjb.demo.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by wjb on 2018/1/12.
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//具体处理交给ClientHandler
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture connect = b.connect("127.0.0.1", 8888);
System.out.println("client connected");
connect.channel().writeAndFlush(Unpooled.copiedBuffer("hello server 1".getBytes()));
Thread.sleep(1000);
connect.channel().writeAndFlush(Unpooled.copiedBuffer("hello server 2".getBytes()));
Thread.sleep(1000);
connect.channel().writeAndFlush(Unpooled.copiedBuffer("hello server 3".getBytes()));
connect.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
ClientHandler:
package com.wjb.demo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
/**
* Created by wjb on 2018/1/12.
*/
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//ByteBuf是个引用计数对象
try {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
System.out.println(new String(bytes,"utf-8"));
} finally {
//释放缓冲,以防ByteBuf溢出
ReferenceCountUtil.release(msg);
}
}
}
Server:
package com.wjb.demo.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Created by wjb on 2018/1/12.
*/
public class Server {
public static void main(String[] args) throws Exception{
//处理客户端连接
NioEventLoopGroup pGroup = new NioEventLoopGroup();
//处理网络读写
NioEventLoopGroup cGroup = new NioEventLoopGroup();
//辅助工具类,用于配置
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(pGroup,cGroup)
//指定NIO模式
.channel(NioServerSocketChannel.class)
//tcp缓冲区大小
.option(ChannelOption.SO_BACKLOG,1024)
//发送缓冲大小
.option(ChannelOption.SO_SNDBUF,32*1024)
//接收缓冲大小
.option(ChannelOption.SO_RCVBUF,32*1024)
//保持连接
.option(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//具体处理交给ServerHandler
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("server connected");
//阻塞等待,类似Thread.sleep(Integer.MAX_VALUE),实际开发不这样
future.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
ServerHandler:
package com.wjb.demo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* Created by wjb on 2018/1/12.
*/
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* 为什么客户端有 ReferenceCountUtil.release(msg)释放,而服务器没有呢。
* 因为服务器端只要有写操作就会自动释放,如果只有读没有写的话也要手动释放。
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//ByteBuf缓冲区引用计数对象,无须像NIO一样flip,因为它内部有两个指针,分别处理读和写。
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
//从缓冲区把数据读到新建的数组里
buf.readBytes(bytes);
System.out.println(new String(bytes,"utf-8"));
String response = "hello client";
//通过工具类把数据写到缓冲区
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
}
TCP拆包粘包:
server:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//TCP拆包粘包,可以设定特殊字符或都固定字节两种方式
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
//采用固定字节的方式,如何字节不够,必须用空格补全。
// sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
//直接转码成符串形式,如果这里解码成字符串了,那ServerHandler里就不用强制转换ByteBuf再处理了。
//sc.pipeline().addLast(new StringDecoder());
//具体处理交给ServerHandler
sc.pipeline().addLast(new ServerHandler());
}
client:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//TCP拆包粘包,可以设定特殊字符或都固定字节两种方式
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
//采用固定字节的方式,如何字节不够,必须用空格补全。
// sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
//直接转码成符串形式,如果这里解码成字符串了,那ServerHandler里就不用强制转换ByteBuf再处理了。
//sc.pipeline().addLast(new StringDecoder());
//具体处理交给ClientHandler
sc.pipeline().addLast(new ClientHandler());
}
网友评论