美文网首页
netty踩坑初体验

netty踩坑初体验

作者: idelo | 来源:发表于2018-08-13 09:44 被阅读0次

    基于4.1.10.Final
    目前为止踩坑最多,踩netty之前,要先踩Java NIO,因为netty还是基于Java NIO的api开发的,事件模型什么的需要有基础,这只是一个初步的研究,毕竟只是出于兴趣,比较好的坑在这里。Java NIO 系列教程

    netty in action.png

    netty的基本组件与各组件功能,netty核心就不介绍了,网上各种大牛的源码分析,认真看完基本就通了。本文是记录踩坑,不是教学。思路按着这个思维导图来走,实现一下简单的功能。

    1.字符串传输。

    netty是端对端的传输,最简单的可以使用嵌套字传输,基本功能就是hello word。假定一个场景,有一个服务端一直开着接收字符串,客户端想发送字符串到服务端,怎么做?
    首先明确一点,Netty中的消息传递,都必须以字节的形式,以ByeBuff为载体传递。简单的说,就是你想直接写个字符串过去,不行,收到都是乱码,虽然Netty定义的writer的接口参数是Object的,这就是比较坑的地方了。
    有了这个分析,就有思路了。
    客户端就这么发:

            ByteBuf buffer = Unpooled.copiedBuffer(msg, Charset.forName("UTF-8"));
            ChannelFuture future = ctx.writeAndFlush(buffer);
            future.addListener(f ->  ctx.close());
    

    服务器这么收:

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf m = (ByteBuf) msg;
            String message = m.toString(CharsetUtil.UTF_8);
            System.out.println(message);
            ctx.close();
        }
    

    是的可以直接强转,原理不明,不知道哪里进行了装箱。---------- 遗留问题1

    但是每次都要这么写是不是有点麻烦?不是有定义好的编码与解码器吗,那么就可以先加一下处理,两边都这么加

                       new ChannelInitializer<SocketChannel>(){
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast("decoder", new StringDecoder());
                                ch.pipeline().addLast("encoder", new StringEncoder());
                                ch.pipeline().addLast(new ServerHandler());
                            }
                        }
    

    对的直接放上去,编码与解码不用关心顺序,处理类放在最后就好了。

    客户端:

            ChannelFuture future = ctx.writeAndFlush("hello world");
            future.addListener(f ->  ctx.close());
    

    服务器端

       
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(msg);
            ctx.close();
        }
    

    对,直接就是收到一个String。至此字符串就可以互相传递了,但是还有问题,netty中在传送字符串的长度有限制,超过1024个字节就截断了,导致接收的信息不完整,ok要这么处理一下。

    new ChannelInitializer<SocketChannel>(){
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
         ch.pipeline().addLast("encoder", new LengthFieldPrepender(4, false));
         ch.pipeline().addLast(new ServerHandler());
     }      
    

    自定义长度, 4个字节32位,足够存了。

    2.传递简单对象

    有几种方法可以实现对象的传递,这里用的是protocol buffers编解码器进行对象传递。
    首先要了解什么是protocol buffers,这东西就相当于xsd对于xml,是一个规则文件,通过.proto文件通过官方提供的工具就可以生成java类,他有两个常用方法

    public byte[] toByteArray(){};
    T parseFrom(byte[]){};
    

    他提供了序列化方法,直接把类转化为字节数组,再把数据转为java类,十分方便。netty是天生支持这种序列化方式的
    服务器端:

             @Override
             protected void initChannel(SocketChannel ch) throws Exception {
            /**
            * 采用Base 128 Varints进行编码,在消息头上加上32个整数,来标注数据的长度。
            */
            ch.pipeline().addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
            ch.pipeline().addLast("protobufDecoder", new ProtobufDecoder(AddressBookProtos.AddressBook.getDefaultInstance()));
    
            /**
             * 对采用Base 128 Varints进行编码的数据解码
             */
            ch.pipeline().addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
            ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());
            ch.pipeline().addLast(new ServerHandler());
            }
    

    增加已经提供了的解码、编码器,在业务处理的handle中可以这么拿数据。

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            AddressBookProtos.AddressBook addressBookProtos = (AddressBookProtos.AddressBook)msg;
            List<AddressBookProtos.Person> list = addressBookProtos.getPeopleList();
        }
    

    任然是可以直接强转成目标对象。然后获取里面的成员变量。

    客户端:
    在管道里加上那4个编码、解码器。然后在业务代码中这样定义数据并且直接塞到ctx中就可以了。其余的根本不用操心,都封装好了,我们只需要关心自己的业务实现。

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            AddressBookProtos.AddressBook pb = AddressBookProtos.AddressBook.newBuilder()
                    .addPeople(
                            AddressBookProtos.Person.newBuilder().setEmail("345@qq.com").setId(34).setName("zhangsn")
                    )
                    .addPeople(
                            AddressBookProtos.Person.newBuilder().setEmail("123@163.com").setId(12).setName("lisi")
                    )
                    .build();
    
            ChannelFuture future = ctx.writeAndFlush(pb);
            future.addListener(f ->  ctx.close());
        }
    

    3.使用http协议

    Netty对http协议有自己的抽象,把一个FullHttpRequest抽象成了HttpRequest、HttpContent、LastHttpContent。生成一个http request也有点不同。例子演示了,客户端发送http请求,服务端接收并发送http响应到客户,客户端接收响应之后断开连接。
    服务端:

                new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                   /*
                   * https部分
                    File certificate = new File("/Users/public.crt"); // 证书
                    File privateKey = new File("/Users/private.pem"); // 私钥
                    final SslContext context = SslContextBuilder.forServer(certificate, privateKey).build();
                    SSLEngine engine = context.newEngine(ch.alloc());
                    ch.pipeline().addLast(new SslHandler(engine));
                    */
    
    //                            ch.pipeline().addLast("decoder", new HttpRequestDecoder());
    //                            ch.pipeline().addLast("encoder", new HttpResponseEncoder());
                    ch.pipeline().addLast(new HttpServerCodec()); //等于上面那两个
                    ch.pipeline().addLast(new HttpObjectAggregator(512 * 1024)); //聚合把头部和content聚合在了一起
                    ch.pipeline().addLast(new HttpServiceHandle());
                }
            }
    

    如果不使用聚合,那么在接收的时候会多次触发read方法,第一次接收HttpRequest,之后接收HttpContent内容。使用聚合HttpServerCodec之后,接收的参数即有HttpRequest也有HttpContent。

    客户端发送与接收:

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            URI uri = new URI("http://127.0.0.1:8889");
            String msg = "Message from client";
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
                    uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
    
            // 构建http请求
            request.headers().set(HttpHeaders.Names.HOST, "127.0.0.1");
            request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
            // 发送http请求
            ctx.writeAndFlush(request);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof HttpResponse) {
                HttpResponse response = (HttpResponse) msg;
                System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
            }
            if(msg instanceof HttpContent) {
                HttpContent content = (HttpContent)msg;
                ByteBuf buf = content.content();
                System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
                buf.release();
            }
        }
    

    服务端接收:

        private HttpRequest request;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //http 如果请求没有聚合,则分段传输过来
            if (msg instanceof HttpRequest) {
                request = (HttpRequest) msg;
                String uri = request.uri();
                System.out.println("Uri:" + uri);
            }
    
            if (msg instanceof HttpContent) {
                HttpContent content = (HttpContent) msg;
                ByteBuf buf = content.content();
                System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
                buf.release();
    
                String res = "response from server";
                FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
                        OK, Unpooled.wrappedBuffer(res.getBytes("UTF-8")));
                response.headers().set(CONTENT_TYPE, "text/plain");
                response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
                if (HttpHeaders.isKeepAlive(request)) {
                    response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
                }
                ctx.write(response);
                ctx.flush();
            }
        }
    

    4.心跳

    这个东西应该是与HTTP长连接或者是websocket一起的,这里独立出来了。
    服务端:

                        {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
                                ch.pipeline().addLast(new WebSocketServiceHandle());
                            }
                        }
    

    handle,如果10秒内没有触发读,那么就会触发userEventTriggered方法。

        int dealTime = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
            String ip = socketAddress.getHostName() + ":" + socketAddress.getPort();
    
            ByteBuf byteBuf = (ByteBuf)msg;
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            System.out.println(ip + ":" + message);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (dealTime == 2){
                System.out.println("关喽");
                ctx.channel().close();
            }
            dealTime++;
            String recall = "are you alive?" ;
            ByteBuf buffer = Unpooled.copiedBuffer(recall, Charset.forName("UTF-8"));
            ctx.writeAndFlush(buffer);
            super.userEventTriggered(ctx, evt);
        }
    

    客户端:就是一个简单的应该,连接上之后什么也不干,干等10秒,等待服务端发来询问。

         @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf)msg;
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            System.out.println("message from service: " + message);
    
            String recall = "hello service i am alive";
            ByteBuf buffer = Unpooled.copiedBuffer(recall, Charset.forName("UTF-8"));
            ctx.writeAndFlush(buffer);
        }
    

    文件读写、websocket、最终demo。。咕咕咕

    相关连接:
    [netty]--最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender
    Protocol Buffer的基本使用(六)
    Protocol Buffer 语法(syntax)

    相关文章

      网友评论

          本文标题:netty踩坑初体验

          本文链接:https://www.haomeiwen.com/subject/yqurpftx.html