参考:
https://www.cnblogs.com/damowang/p/6226167.html
https://www.cnblogs.com/java13/p/10920924.html
1. pom
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>
2. 服务端
2.1 入口
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChatServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 多线程事件循环器-接收进来的连接
EventLoopGroup workGroup = new NioEventLoopGroup(); // 多线程事件循环器-处理接收的连接
try {
ServerBootstrap bootstrap = new ServerBootstrap(); // 服务端启动类
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) // 管道类型
.option(ChannelOption.SO_BACKLOG, 1024) // 设置参数
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() { // 初始化管道
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder()); // 流水线增加处理器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatServerHandler());
}
});
// 绑定端口启动
Channel channel = bootstrap.bind(8888).syncUninterruptibly().channel();
// 等待结束
channel.closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
注:ChannelPipeline 中必须加入 Decoder、Encoder 处理器,否则无法解析消息。
2.2 自定义处理器
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ChatServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 连接建立时调用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String welcome = channel.remoteAddress() + " 进入聊天室";
channelGroup.add(channel);
for (Channel item : channelGroup) {
item.writeAndFlush(welcome);
}
}
/**
* 连接关闭时调用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String quit = channel.remoteAddress() + " 退出聊天室";
channelGroup.remove(channel);
for (Channel item : channelGroup) {
item.writeAndFlush(quit);
}
}
/**
* 接收消息时调用(客户端 -> 服务端)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
for (Channel item : channelGroup) {
// 消息群发给其他Client
if (!channel.remoteAddress().equals(item.remoteAddress())) {
String message = "[" + channel.remoteAddress() + "]: " + (String) msg;
item.writeAndFlush(message);
}
}
}
/**
* 捕获异常时调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3. 客户端
3.1 入口
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.learn.constant.Constant;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Random;
public class ChatClient {
private static final EventLoopGroup clientEventLoopGroup = new NioEventLoopGroup();
public static void main(String[] args) throws Exception {
try {
Bootstrap bootstrap = new Bootstrap(); // 客户端启动类
bootstrap
.group(clientEventLoopGroup)
.channel(NioSocketChannel.class) // 管道类型
.handler(new ChannelInitializer<SocketChannel>() { // 初始化管道
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder()); // 流水线加入处理器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatClientHandler());
}
});
// 连接服务器启动
Channel channel = bootstrap.connect("localhost", 8888).syncUninterruptibly().channel();
// 等待输入消息
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
channel.writeAndFlush(reader.readLine());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2 自定义处理器
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ChatClientHandler extends ChannelInboundHandlerAdapter {
/**
* 接收消息时调用(服务端 -> 客户端)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
}
/**
* 捕获异常时调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
4. 测试
IDEA 中点击 Edit Configurations,选择 ChatClient,右上角勾选 Allow parallel run 可同时运行多个应用。
网友评论