-
导入netty的library包
image.png
下载所有的netty包
image.pngio.netty:netty-all
选择4.1.20的版本:io.netty:netty-all:4.1.20.Final
image.png
-
Netty demo1
package cn.netty.project1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
/*
bossGroup和workerGroup含有的子线程(NioEventLoop)个数默认等于cp核数 * 2
*/
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
// 创建服务器的启动对象,配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 注意:可以通过一个集合管理SocketChannel,在推送消息时,可以将业务加入到各个channel对应的NIOEventLoop的taskQueue或者scheduleTaskQueue
System.out.println("客户端SocketChannel hashCode=" + socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("...服务器is ready...");
//绑定一个端口并且同步生成了一个ChannelFuture对象
// 启动服务器并启动监听
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package cn.netty.project1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.nio.ByteBuffer;
/**
说明:
自定义一个Handler需要继承netty规定好的某个HandlerAdapter适配器
这时候我们自定义的Handler才能称之为handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件:可以读取客户端发送的消息
* @param ctx : 上下文对象,含有管道pipeline,通道channel,地址
* @param msg: 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx=" + ctx);
/*
将客户端发送的数据msg转成byteBuffer
*/
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的消息=" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址=" + ctx.channel().remoteAddress()); // 拿到客户端地址
}
/**
* 读取数据完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写入缓存并刷新
// 对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hell 客户端", CharsetUtil.UTF_8));
}
/**
* 处理异常,关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package cn.netty.project1;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try{
// 创建客户端启动对象
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(nioEventLoopGroup) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端is ok...");
//启动客户端连接服务端
// 关于channel要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
// 给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
nioEventLoopGroup.shutdownGracefully();
}
}
}
package cn.netty.project1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
说明:
1.Inbound代表入栈
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter{ //SimpleChannelInboundHandler<HttpContent>
/**
* 通道准备就绪就会调用该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
}
/*
当通道有读取事件时就会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务器地址:" + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务器端输出:
...服务器is ready...
server ctx=ChannelHandlerContext(NettyServerHandler#0, [id: 0x9bb8b3b5, L:/127.0.0.1:6668 - R:/127.0.0.1:65274])
客户端发送的消息=hello server
客户端地址=/127.0.0.1:65274
客户端输出:
客户端is ok...
服务器回复的消息:hell 客户端
服务器地址:/127.0.0.1:6668
-
TaskQueue自定义任务
-
TaskQueue自定义任务
任务队列中的task有3种经典使用场景
1.用户程序自定义的普通任务
image.png
2.用户自定义定时任务
3.非当前Reactor线程调用Channel的各种方法
例如在推送系统
的业务线程里面,根据用户的标识
找到对应的Channel引用
,然后调用Write类方法向该用户推送消息,就会进入到这种场景,最终的write会提交到任务队列中后被异步消费
demo:自定义普通任务
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 模拟耗时操作(异步执行->提交到该channel对应的NIOEventLoop的taskQueue中)
Thread.sleep(10000); // 睡眠10秒
ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!", CharsetUtil.UTF_8));
System.out.println("go on...");
}
服务器端输出:
服务器is ready...
go on...
客户端输出:
客户端is ok...
服务器回复的消息:first hello java!second hello 客户端
服务器地址:/127.0.0.1:6668
把耗时操作放入taskQueue
会先输出“second hello 客户端”, 再输出“first hello java!”
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000); // 睡眠10秒
ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!", CharsetUtil.UTF_8));
} catch (Exception e){
System.out.println("捕获异常=" + e.getMessage());
}
}
});
}
客户端输出:
客户端is ok...
服务器回复的消息:second hello 客户端
服务器地址:/127.0.0.1:6668
服务器回复的消息:first hello java!
服务器地址:/127.0.0.1:6668
2.自定义scheduleTask
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 用户自定义定时任务(提交到scheduleTaskQueue)
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000); // 睡眠5秒
ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
} catch (Exception e){
System.out.println("捕获异常=" + e.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000); // 睡眠5秒
ctx.writeAndFlush(Unpooled.copiedBuffer("third hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
} catch (Exception e){
System.out.println("捕获异常=" + e.getMessage());
}
}
}, 10, TimeUnit.SECONDS);
System.out.println("go on..."); // 在这一行打断点
}
ctx->pipeline->channel->eventLoop->scheduledTaskQueue
image.png
客户端输出:
客户端is ok...
服务器回复的消息:second hello 客户端当前时间=23:30:26
服务器地址:/127.0.0.1:6668
服务器回复的消息:first hello java!当前时间=23:30:36
服务器地址:/127.0.0.1:6668
服务器回复的消息:third hello java!当前时间=23:30:46
服务器地址:/127.0.0.1:6668
方案再说明:
- 1.Netty抽象出两组线程池,BossGroup专门负责接受客户端连接,WorkerGroup专门负责网络读写操作
- 2.NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道
- 3.NioEventLoop内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
- 4.NioEventLoopGroup下包含多个NioEventLoop
每个NioEventLoop中包含一个Selector,一个taskQueue
每个NioEventLoop的Selector上可以注册监听多个NioChannel
- 5.每个NioChannel只会绑定在唯一的NioEventLoop上
- 6.每个NioChannel都绑定有一个自己的ChannelPipeLine
群聊系统
编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通信(非阻塞)
实现多人群聊
服务器端:监测用户上线、离线,并实现消息转发功能
客户端:通过channel可以无阻塞发送消息给其他所有用户,同时可以接受其他用户发送的消息(由服务器转发得到)
image.png
//GroupChatServer
package cn.netty.groupchat2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port;
public GroupChatServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7011).run();
}
public void run() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听关闭
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// GroupChatServerHandler
package cn.netty.groupchat2;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//使用一个hashmap 管理
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定义一个channle 组,管理所有的channel
//GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
//获取到当前channel
Channel channel = channelHandlerContext.channel();
//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
channelGroup.forEach((ch)->{
if (channel != ch){ //不是当前的channel,转发消息
ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
} else { //回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
}
});
}
/**
* handlerAdded 表示连接建立,一旦连接,第一个被执行,将当前channel 加入到 channelGroup
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
/**
* 表示channel 处于不活动状态, 提示 xx离线了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
/**
* 表示channel 处于活动状态, 提示 xx上线
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了~");
}
/**
* 断开连接, 将xx客户离开信息推送给当前在线的客户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channel.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
//GroupChatClient
package cn.netty.groupchat2;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
private int port ;
private String host ;
public GroupChatClient(int port, String host) {
this.port = port;
this.host = host;
}
public void run() throws Exception{
NioEventLoopGroup group = new NioEventLoopGroup();
try (Scanner scanner = new Scanner(System.in)){
Bootstrap bootstrap = new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress()+ "--------");
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
new GroupChatClient( 7011, "127.0.0.1").run();
}
}
// GroupChatClientHandler
package cn.netty.groupchat2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg.trim());
}
}
网友评论