美文网首页
netty创建tcp (使用maven构建)

netty创建tcp (使用maven构建)

作者: 奋斗live | 来源:发表于2019-06-13 17:16 被阅读0次
一、环境

java 版本 1.8.0_181

二、项目结构

项目结构如下图所示


image.png
三、导入必要的依赖

以下是pom.xml的配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>netty_tcp_test</groupId>
    <artifactId>netty_tcp_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <netty.version>4.1.25.Final</netty.version>
        <os.detected.classifier>linux-x86_64</os.detected.classifier>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-codec</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-transport</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-transport-native-epoll</artifactId>
            <version>${netty.version}</version>
            <classifier>${os.detected.classifier}</classifier>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-codec-http</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-handler</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-transport-udt</artifactId>
            <version>${netty.version}</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-transport-sctp</artifactId>
            <version>${netty.version}</version>
        </dependency>

    </dependencies>
</project>
三、创建客户端代码

TcpClient的 main代码

package tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.omg.PortableServer.POA;

public class TcpClient {

    public static String HOST = "127.0.0.1";

    public static int PORT = 12340;

    public static Bootstrap bootstrap=getBootstrap();

    public static Channel channel = getChannel(HOST,PORT);

    public static final Bootstrap getBootstrap(){
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception{

                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
                pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));
                pipeline.addLast(new ObjectEncoder());
                pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                pipeline.addLast("handler",new TcpClientHandler());
            }
        });
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        return bootstrap;
    }

    public static final Channel getChannel(String HOST, int PORT){
        Channel channel = null;
        try{
            channel = bootstrap.connect(HOST,PORT).sync().channel();
            System.out.println("连接成功");
        }catch (Exception e){
            System.out.println("连接server(IP{},PROT{})失败");
            return null;
        }
        return channel;
    }

    public static void sendMsg(Object msg) throws Exception{
        if(channel!=null){
            channel.writeAndFlush(msg).sync();
        }else{
            System.out.println("消息发送失败,连接尚未建立");
        }
    }

}

处理器TcpClientHandler代码

package tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TcpClientHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception{

    }
}

四、创建服务端代码

TcpServer服务端的main代码

package tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.omg.CORBA.PUBLIC_MEMBER;

public class TcpServer {

    private static final String IP = "192.168.1.154";
    private static final int PORT = 12340;

    /*处理业务线程的线程组个数*/

    protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2;//默认

    /*业务出现线程大小*/
    protected static final int BIZTHREADSIZE=4;
    /*
     * NioEventLoopGroup实际上就是个线程池,
     * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,
     * 每一个NioEventLoop负责处理m个Channel,
     * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
     */
    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);

    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

    protected static void shutdown(){
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

    public static void run() throws Exception{
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup,workerGroup);
        b.channel(NioServerSocketChannel.class);
        b.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            public void initChannel(SocketChannel ch) throws Exception{
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
                pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));
                pipeline.addLast(new ObjectEncoder());
                pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                pipeline.addLast(new TcpServerHandler());
            }
        });
        //标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
        b.option(ChannelOption.SO_BACKLOG,1024);
        //是否启用心跳保或机制
        b.childOption(ChannelOption.SO_KEEPALIVE,true);
        //绑定端口 开启监听
        ChannelFuture channelFuture = b.bind(PORT).sync();
        if(channelFuture.isSuccess()){
            System.out.println(("Tcp服务启动成功---"));
        }
        //获取channel的closeFuture,并且阻塞当前线程直到它完成
        channelFuture.channel().closeFuture().sync();

    }
    public static void main(String[] args) throws Exception{
        TcpServer.run();
    }
}

创建TcpServerHandler处理器代码

package tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("server received:"+msg.toString());
        ctx.write(msg);
    }

    @Override
    public  void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
        ctx.close();
    }

}

五、单元测试代码

客户端单元测试TcpClientTest

package tcp;

import org.junit.Test;

import static org.junit.Assert.*;

public class TcpClientTest {

    @Test
    public void sendMsg() {
        try{
            TcpClient.sendMsg("henhao");
        }catch (Exception e){

        }

    }
}

服务端单元测试 TcpServerTest

package tcp;

import org.junit.Test;

import static org.junit.Assert.*;

public class TcpServerTest {

   public static void main(String[] args){
       start();
   }

   @Test
   public void testServer(){
       start();
   }

   public static void start(){
       try{
           TcpServer.main(null);
       }catch (Exception e){

       }

   }
}

相关文章

网友评论

      本文标题:netty创建tcp (使用maven构建)

      本文链接:https://www.haomeiwen.com/subject/imasfctx.html