一、环境
java 版本 1.8.0_181
二、项目结构
项目结构如下图所示
image.png
三、导入必要的依赖
以下是pom.xml的配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>netty_tcp_test</groupId>
<artifactId>netty_tcp_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<netty.version>4.1.25.Final</netty.version>
<os.detected.classifier>linux-x86_64</os.detected.classifier>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>${os.detected.classifier}</classifier>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-udt</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-sctp</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</project>
三、创建客户端代码
TcpClient的 main代码
package tcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.omg.PortableServer.POA;
public class TcpClient {
public static String HOST = "127.0.0.1";
public static int PORT = 12340;
public static Bootstrap bootstrap=getBootstrap();
public static Channel channel = getChannel(HOST,PORT);
public static final Bootstrap getBootstrap(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",new TcpClientHandler());
}
});
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
return bootstrap;
}
public static final Channel getChannel(String HOST, int PORT){
Channel channel = null;
try{
channel = bootstrap.connect(HOST,PORT).sync().channel();
System.out.println("连接成功");
}catch (Exception e){
System.out.println("连接server(IP{},PROT{})失败");
return null;
}
return channel;
}
public static void sendMsg(Object msg) throws Exception{
if(channel!=null){
channel.writeAndFlush(msg).sync();
}else{
System.out.println("消息发送失败,连接尚未建立");
}
}
}
处理器TcpClientHandler代码
package tcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TcpClientHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception{
}
}
四、创建服务端代码
TcpServer服务端的main代码
package tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.omg.CORBA.PUBLIC_MEMBER;
public class TcpServer {
private static final String IP = "192.168.1.154";
private static final int PORT = 12340;
/*处理业务线程的线程组个数*/
protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2;//默认
/*业务出现线程大小*/
protected static final int BIZTHREADSIZE=4;
/*
* NioEventLoopGroup实际上就是个线程池,
* NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,
* 每一个NioEventLoop负责处理m个Channel,
* NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
*/
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
protected static void shutdown(){
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
public static void run() throws Exception{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new TcpServerHandler());
}
});
//标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
b.option(ChannelOption.SO_BACKLOG,1024);
//是否启用心跳保或机制
b.childOption(ChannelOption.SO_KEEPALIVE,true);
//绑定端口 开启监听
ChannelFuture channelFuture = b.bind(PORT).sync();
if(channelFuture.isSuccess()){
System.out.println(("Tcp服务启动成功---"));
}
//获取channel的closeFuture,并且阻塞当前线程直到它完成
channelFuture.channel().closeFuture().sync();
}
public static void main(String[] args) throws Exception{
TcpServer.run();
}
}
创建TcpServerHandler处理器代码
package tcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println("server received:"+msg.toString());
ctx.write(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
ctx.close();
}
}
五、单元测试代码
客户端单元测试TcpClientTest
package tcp;
import org.junit.Test;
import static org.junit.Assert.*;
public class TcpClientTest {
@Test
public void sendMsg() {
try{
TcpClient.sendMsg("henhao");
}catch (Exception e){
}
}
}
服务端单元测试 TcpServerTest
package tcp;
import org.junit.Test;
import static org.junit.Assert.*;
public class TcpServerTest {
public static void main(String[] args){
start();
}
@Test
public void testServer(){
start();
}
public static void start(){
try{
TcpServer.main(null);
}catch (Exception e){
}
}
}
网友评论