美文网首页
【Netty】netty简单客户端和服务端实现

【Netty】netty简单客户端和服务端实现

作者: 有章 | 来源:发表于2018-10-19 18:49 被阅读0次

    netty的例子

    maven依赖:
    <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.27.final</version>
            </dependency>
            <dependency>
                <groupId>org.jboss.marshalling</groupId>
                <artifactId>jboss-marshalling-serial</artifactId>
                <version>1.3.14.GA</version>
            </dependency>
    

    服务端:

    package com.springboot.vip.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class HelloServer {
        private int port;
        
        public HelloServer(int port) {
            this.port = port;
        }
        
        public void run() throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();        // 用来接收进来的连接
            EventLoopGroup workerGroup = new NioEventLoopGroup();    // 用来处理已经被接收的连接
            System.out.println("准备运行端口:" + port);
            
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)            // 这里告诉Channel如何接收新的连接
                .childHandler( new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 自定义处理类
                        ch.pipeline().addLast(new HelloServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
                
                // 绑定端口,开始接收进来的连接
                ChannelFuture f = b.bind(port).sync();
                
                // 等待服务器socket关闭
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
       
        public static void main(String[] args) throws Exception {
            int port = 10110;
            new HelloServer(port).run();
        }
    }
    

    服务端处理器:

    package com.springboot.vip.netty;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
    
    /**
     * 输出接收到的消息
     * @author Coder
     *
     */
    public class HelloServerHandler extends ChannelInboundHandlerAdapter {
        /**
         * 收到数据时调用
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf in = (ByteBuf)msg;
                System.out.print(in.toString(CharsetUtil.UTF_8));
            } finally {
                // 抛弃收到的数据
                ReferenceCountUtil.release(msg);
            }
            
    //        ctx.write(msg);
    //        ctx.flush();
        }
        
        /**
         * 当Netty由于IO错误或者处理器在处理事件时抛出异常时调用
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    验证:

    item2下telnet
    telnet 127.0.0.1 10110
    然后在item2下输入,则会在console下看到对应的输出
    
    

    栗子2:

    netty版本
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>5.0.0.Alpha1</version>
            </dependency>
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class TimeServer {
        public void bind(int port) {
            NioEventLoopGroup pGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup cGroup = new NioEventLoopGroup();
            ServerBootstrap server = new ServerBootstrap();
            server.group(pGroup, cGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
    
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            try {
                ChannelFuture cf = server.bind(port).sync();
                System.out.println("server 连接成功:" + port);
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                pGroup.shutdownGracefully();
                cGroup.shutdownGracefully();
            }
        }
    
    
        public static void main(String[] args) {
        int port=8080;
        TimeServer server=new TimeServer();
        server.bind(port);
    
        }
    }
    

    server handler:

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.Date;
    
    public class TimeServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] bytes = new byte[buf.readableBytes()];
            buf.readBytes(bytes);
            String body = new String(bytes, "UTF-8");
            System.out.println("The time server(Thread:" + Thread.currentThread() + ") receive order : " + body);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
        }
    }
    

    client:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    
    
    public class TimeClient {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup loopGroup = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            try {
    
                bootstrap.group(loopGroup)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new TimeClientHandler());
                            }
                        });
                ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
                cf.channel().closeFuture().sync();
            } finally {
                loopGroup.shutdownGracefully();
            }
        }
    }
    

    client handler:

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class TimeClientHandler extends ChannelHandlerAdapter {
    
    
        @Override
        public void channelActive(ChannelHandlerContext context) throws Exception {
            for (int i = 0; i < 5; i++) {
                String order = "QUERY TIME ORDER";
                ByteBuf buf = Unpooled.buffer(order.length());
                buf.writeBytes(order.getBytes());
                context.writeAndFlush(buf);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf=(ByteBuf) msg;
            byte[] bytes=new byte[buf.readableBytes()];
            buf.readBytes(bytes);
            String txt=new String(bytes,"UTF-8");
            System.out.println("Now is:"+txt);
        }
    }
    

    Netty提供的经过扩展的Buffer相对NIO中的有个许多优势,作为数据存取非常重要的一块,我们来看看Netty中的Buffer有什么特点。
    ByteBuf

    1.ByteBuf读写指针
    在ByteBuffer中,读写指针都是position,而在ByteBuf中,读写指针分别为readerIndex和writerIndex,直观看上去ByteBuffer仅用了一个指针就实现了两个指针的功能,节省了变量,但是当对于ByteBuffer的读写状态切换的时候必须要调用flip方法,而当下一次写之前,必须要将Buffe中的内容读完,再调用clear方法。每次读之前调用flip,写之前调用clear,这样无疑给开发带来了繁琐的步骤,而且内容没有读完是不能写的,这样非常不灵活。相比之下我们看看ByteBuf,读的时候仅仅依赖readerIndex指针,写的时候仅仅依赖writerIndex指针,不需每次读写之前调用对应的方法,而且没有必须一次读完的限制。
    2.零拷贝
    Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
    Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。
    Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。
    3.引用计数与池化技术
    在Netty中,每个被申请的Buffer对于Netty来说都可能是很宝贵的资源,因此为了获得对于内存的申请与回收更多的控制权,Netty自己根据引用计数法去实现了内存的管理。Netty对于Buffer的使用都是基于直接内存(DirectBuffer)实现的,大大提高I/O操作的效率,然而DirectBuffer和HeapBuffer相比之下除了I/O操作效率高之外还有一个天生的缺点,即对于DirectBuffer的申请相比HeapBuffer效率更低,因此Netty结合引用计数实现了PolledBuffer,即池化的用法,当引用计数等于0的时候,Netty将Buffer回收致池中,在下一次申请Buffer的没某个时刻会被复用
    

    基于udp通信
    bootstrap端启动参数配置:9999 /Users/penny/code/vip/from.txt
    ServerBootStrap参数:9999

    代码实例
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioDatagramChannel;
    
    import java.io.File;
    import java.io.RandomAccessFile;
    import java.net.InetSocketAddress;
    
    public class LogEventBroadcaster {
        private final EventLoopGroup group;
        private final Bootstrap bootstrap;
        private final File file;
    
        public LogEventBroadcaster(InetSocketAddress address, File file) {
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new LogEventEncoder(address));
            this.file = file;
        }
    
        public void run() throws Exception {
            Channel ch = bootstrap.bind(0).sync().channel();
            long pointer = 0;
            for (; ; ) {
                long len = file.length();
                if (len < pointer) {
                    pointer = len;
                } else if (len > pointer) {
                    RandomAccessFile raf = new RandomAccessFile(file, "r");
                    raf.seek(pointer);
                    String line;
                    while ((line = raf.readLine()) != null) {
                        ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line));
                    }
                    pointer = raf.getFilePointer();
                    raf.close();
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    break;
                }
            }
    
    
        }
    
        public void stop() {
            group.shutdownGracefully();
        }
    
        public static void main(String[] args) {
            if (args.length != 2) {
                throw new IllegalStateException();
            }
            LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255", Integer.parseInt(args[0])), new File(args[1]));
            try {
                broadcaster.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                broadcaster.stop();
            }
        }
    }
    

    Encoder

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import io.netty.util.CharsetUtil;
    
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
        private final InetSocketAddress romoteAddress;
    
        public LogEventEncoder(InetSocketAddress romoteAddress) {
            this.romoteAddress = romoteAddress;
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
            byte[] file =logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
            byte[] msg=logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
            ByteBuf buf=ctx.alloc().buffer(file.length+msg.length+1);
            buf.writeBytes(file);
            buf.writeByte(LogEvent.SEPARATOR);
            buf.writeBytes(msg);
            out.add(new DatagramPacket(buf,romoteAddress));
    
        }
    }
    

    Decoder

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.util.CharsetUtil;
    
    import java.util.List;
    
    public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
            ByteBuf data = datagramPacket.content();
            int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
            String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
            String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
            LogEvent event = new LogEvent(datagramPacket.sender(), System.currentTimeMillis(), filename, logMsg);
            out.add(event);
        }
    }
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioDatagramChannel;
    
    import java.net.InetSocketAddress;
    
    
    public class LogEventMonitor {
        private final EventLoopGroup group;
        private final Bootstrap bootstrap;
        public LogEventMonitor(InetSocketAddress address){
            group=new NioEventLoopGroup();
            bootstrap=new Bootstrap();
            bootstrap.group(group).channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST,true)
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ChannelPipeline pipeline=ch.pipeline();
                            pipeline.addLast(new LogEventDecoder());
                            pipeline.addLast(new LogEventHandler());
                        }
                    }).localAddress(address);
        }
        public Channel bind(){
            return bootstrap.bind().syncUninterruptibly().channel();
        }
        public void stop(){
            group.shutdownGracefully();
        }
    
        public static void main(String[] args) {
            if (args.length!=1){
                throw new IllegalArgumentException("usage : log event monitor<port>");
            }
            LogEventMonitor monitor=new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
            try {
                Channel channel=monitor.bind();
                System.out.println(" log event monitor running");
                channel.closeFuture().syncUninterruptibly();
            }finally {
                monitor.stop();
            }
        }
    }
    

    测试

    item2(mac)中执行文件操作
    echo "11111111111111111111111111111111111"> from.txt
    在上次操作的基础上继续增加内容
    echo "111111111111111111111111111111111112222222222222"> from.txt
    则LogEventMonitor控制台显示:(上次内容的增量)
    1537008098680 [/192.168.51.62:63169] [] : 1111111111111111111111111111111111
    1537008144809 [/192.168.51.62:63169] [] : 222222222222
    

    总结:

    【参考博客】
    1.https://www.jianshu.com/p/0d0eece6d467
    2.https://www.cnblogs.com/wjlstation/p/8972895.html
    3.https://yq.aliyun.com/articles/290834

    相关文章

      网友评论

          本文标题:【Netty】netty简单客户端和服务端实现

          本文链接:https://www.haomeiwen.com/subject/nmmdgftx.html