ChatServer
package com.ctgu.netty.chat;
import com.ctgu.netty.basic.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChatServer {
private int port;
public ChatServer(int port) {
this.port = port;
}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new ChatServerHandler());
}
});
System.out.println("-------Server is ready--------");
ChannelFuture cf = b.bind(port).sync();
System.out.println("-------Server is starting--------");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatServer(9999).run();
}
}
ChatServerHandler
package com.ctgu.netty.chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.ArrayList;
import java.util.List;
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
public static List<Channel> channels = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel childChannel = ctx.channel();
channels.add(childChannel);
System.out.println("[Server]:"
+ childChannel.remoteAddress().toString().substring(1) + ",上线了");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel childChannel = ctx.channel();
channels.remove(childChannel);
System.out.println("[Server]:"
+ childChannel.remoteAddress().toString().substring(1) + ",下线了");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel childChannel = ctx.channel();
String name = childChannel.remoteAddress().toString().substring(1);
channels.stream()
.filter(e -> e != childChannel)
.forEach(e -> e.writeAndFlush("[" + name + "]说:" + s));
System.out.println("[" + name + "]说:" + s);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel childChannel = ctx.channel();
String name = childChannel.remoteAddress().toString().substring(1);
System.out.println("[Server]:" + name + "" + ",异常断线了");
}
}
ChatClient
package com.ctgu.netty.chat;
import com.ctgu.netty.basic.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ChatClient {
private String host;
private int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new ChatClientHandler());
}
});
System.out.println("---------- Client is ready --------");
ChannelFuture cf = b.connect(host, port);
System.out.println("---------- Client is Starting --------");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatClient("127.0.0.1", 9999).run();
}
}
ChatClientHandler
package com.ctgu.netty.chat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Scanner;
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client:" + ctx);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
String msg;
while ((msg = scanner.nextLine()) != null) {
ctx.writeAndFlush(msg);
}
}).start();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务器异常关闭");
System.exit(0);
}
}
网友评论