美文网首页
《netty in action》读书笔记 PART1

《netty in action》读书笔记 PART1

作者: 晨钟初磬 | 来源:发表于2018-11-15 00:34 被阅读0次

    6. ChannelHandler and ChannelPipeline

    6.1 The ChannelHandler family

    6.1.1 Channel的生命周期

    ChannelUnregistered

    已创建,但是还没有被注册到EventLoop上。

    ChannelRegistered

    已创建,并且已经注册到EventLoop。

    ChannelActive

    连接上远程主机。

    ChannelActive

    没有连接到远程主机。

    Channel状态的变化会触发相应的事件。

    6.1.2 ChannelHandler的生命周期

    handlerAdd

    添加handler

    handlerRemove

    删除handler

    exceptionCaught

    发生异常

    ChannelHandler有两个重要的子接口:ChannelInboundHandlerChannelOutboundHandler

    6.1.3 ChannelInboundHandler接口

    接受到数据或者Channel的状态发生改变会调用ChannelInboundHandler中的方法。注意,当ChannelInboundHandler中的channelRead()方法被overwrite,需要对ByteBuf实例持有的资源进行显示释放。

    public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ReferenceCountUtil.release(msg);
    }
    }
    

    可以使用SimpleChannelInboundHandler,它会自动释放资源,无需人工干预:

    @Sharable
    public class SimpleDiscardHandler
    extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
    Object msg) {
    // No need to do anything special
    }
    }
    

    6.1.4 ChannelOutboundHandler接口

    它一个比较强大的功能是延迟执行。

    CHANNELPROMISE VS. CHANNELFUTURE

    CHANNELPROMISE是CHANNELFUTURE的子接口,CHANNELFUTURE是不可写的,CHANNELPROMISE是可写的(例如setSuccess(),setFailure()方法)

    6.1.5 ChannelHandler adapters

    关系图

    6.1.6 资源管理

    要注意ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()要释放相应的资源,否则会产生内存泄漏。netty使用引用计数法来管理内存资源。可以使用netty提供的ResourceLeakDetector来发现潜在的内存泄漏问题。

    java -Dio.netty.leakDetectionLevel=ADVANCED
    

    leakDetectionLevel可以为DISABLED、SIMPLE(默认)、ADVANCED和PARANOID。

    6.2 ChannelPipeline接口

    ChannelPipeline可以看成由ChannelHandler组成的链表,I/O事件会在ChannelPipeline上传播。每个新Channel会绑定一个新ChannelPipeline,两者是一对一关系。


    pipeline中的事件传播

    事件传播的时候,会判断ChannelHandler的类型(implements Inbound还是OutBound的接口)和事件传播的方向是否一致,不一致跳过。

    6.2.1 ChannelPipeline修改

    ChannelPipeline中ChannelHandler可以动态地被添加、删除或者替换。


    ChannelPipeline中操作ChannelHandler

    6.2.2 Firing events

    会调用ChannelPipeline中下一个ChannelHandler里的方法。


    代码示例:

    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class HttpServer {
        
        public static void main(String[] args) throws InterruptedException {
            
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            
            
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workGroup)
                         .channel(NioServerSocketChannel.class)
                         .childHandler(new ChannelInitializer<Channel>() {
        
                            @Override
                            protected void initChannel(Channel ch) throws Exception {
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new MyHandler());
                                ch.pipeline().addLast(new MyHandler2());
                            }
                             
                        });
                ChannelFuture future = bootstrap.bind(8080).sync();
                future.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    
    }
    
    class MyHandler extends SimpleChannelInboundHandler<String>{
    
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("in MyHandler1 , messageReceived invoked");
            for(int i = 0;i < 10 ; i++) {
                ctx.fireChannelInactive();//调用fireChannelInactive 10次
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("in MyHandler1 , channelInactive invoked");
        }
    }
    
    class MyHandler2 extends SimpleChannelInboundHandler<String>{
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println("in MyHandler2 ,messageReceived invoked");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("in MyHandler2 , channelInactive invoked");
        }
    }
    

    输出:

    控制台输出

    6.3 ChannelHandlerContext接口

    ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的联系,无论何时,添加一个ChannelHandler到ChannelPipeline就会创建一个ChannelHandlerContext。ChannelHandlerContext的主要功能是和所在ChannelPipeline的其他ChannelHandler交互。

    ChannelHandlerContext有很多方法,大部分方法在Channel和ChannelPipeline里都出现过,但是这里有一个非常大的区别,调用Channel和ChannelPipeline里的方法,会在整个pipeline里传播(从头到尾),而ChannelHandlerContext里同名的方法,是从当前ChannelHandler开始传播。

    6.3.1 Using ChannelHandlerContext

    概念关系图

    6.3.2 ChannelHandler和ChannelHandlerContext的高级用法。

    1. ChannelHandlerContext的pipeline()方法可以获取ChannelPipeline的引用,这样我们可以通过这个引用操作ChannelHandler,实现动态协议
    2. 可以把ChannelHandlerContext的引用缓存起来,在ChannelHandler方法外面用,甚至在一个不同的线程里使用。下面提供了一个示例。
    引用缓存实例
    1. 可以将一个ChannelHandler实例可能会被添加到不同的ChannelPipeline里,但是需要使用@Sharable注解,此外还需注意的是,这个Sharable的ChannelHandler需要是线程安全的。

    为什么需要@Sharable的ChannelHandler,一个需求就是通过这个@Sharable来统计多个Channel的数据。

    6.4 异常处理

    6.4.1 Inbound异常处理

    Inbound异常处理

    由于exception默认会从触发异常的ChannelHandler继续向后流动,所以图中的这种处理逻辑,我们一般放在最后ChannelPipeline的末尾。这样就可以确保,无论是哪个ChannelHandler触发异常,都能够被捕获并处理。如果不对异常做捕获处理操作,netty会打印异常未被捕获的日志。

    6.4.2 outbound异常处理

    进行outbound操作,要想知道结果(正常完成还是发生异常),需要这样做:

    1. 每个outbound操作都会返回一个ChannelFuture。添加到ChannelFuture上的监听器会收到成功或者错误通知。

    2. ChannelOutboundHandler中的方法绝大多数都会ChannelPromise类型的参数。ChannelPromise也可以添加监听来接受异步通知。ChannelPromise是可写的,可以通过它的setSucess()方法或者setFailure(Throwable cause)立即发布通知。

    如果ChannelOutboundHandler自己抛出异常,netty会通知添加到ChannelPromise上的监听器。

    7. EventLoop and threading model

    7.1 Threading model overview

    JDK早期版本多线程编程的方式是create新线程再start。JDK5推出了Executor API,它的线程池技术通过缓存和重用大大提高了性能。

    1. 有任务(Runnable实现)的时候,从线程池里挑选出一个空闲线程,把任务submit给它。
    2. 任务执行完毕了,线程变成空闲,回到线程池,等待下一次挑选使用。
    线程池技术

    线程池不能解决上下文切换开销的问题,上下文的开销在heavy load下会很大。

    7.2 EventLoop接口

    EventLoop是一个用来处理事件的任务,基本思想如下图所示:

    image.png

    EventLoop接口的API分为两类:concurrent和networking。

    1. concurrent
      基于java.util.concurrent包,提供thread executors
    2. networking
      io.netty.channel继承了EventLoop接口,提供了和Channel事件交互的能力。

    7.2.1 Netty 4中I/O事件的处理

    7.3.1 JDK 任务调度API

    JDK5之前,任务调度只能用java.util.Timer,Timer就是一个后台线程,有很多限制:

    1. 如果执行多个定时任务,一个任务发生异常没有捕获,整个Timer线程会挂掉(其他所有任务都会down掉)
    2. 假如某个任务的执行时间过长,超过一些任务的间隔时间,会导致这些任务执行推迟。

    JDK后续推出了java.util.concurrent,其中定义的ScheduleExecutorService克服了这些缺陷。

    ScheduledExecutorService executor =Executors.newScheduledThreadPool(10);
    ScheduledFuture<?> future = executor.schedule(
      new Runnable() {
      @Override
      public void run() {
      System.out.println("60 seconds later");
    }
    }, 60, TimeUnit.SECONDS);
    //to do
    executor.shutdown();
    

    尽管ScheduledExecutorSevice挺好用的,但是在负载大的时候有较大的性能耗费,netty进行了优化。

    7.3.2 使用EventLoop进行任务调度

    ScheduledExecutorService也有一些限制,例如会创建额外创建一些线程来管理线程池,这在任务调度非常激烈的情况下,会成为性能的瓶颈。netty没有直接使用ScheduledExecutorService,使用了继承于ScheduledExecutorService,自己实现的EventLoop

    Channel ch = ...
    ScheduledFuture<?> future = ch.eventLoop().schedule(
      new Runnable() {
      @Override
      public void run() {
        System.out.println("60 seconds later");
      }
    }, 60, TimeUnit.SECONDS);
    

    重复定时执行:

    Channel ch = ...
    ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
      new Runnable() {
      @Override
      public void run() {
        System.out.println("Run every 60 seconds");
      }
    }, 60, 60, TimeUnit.Seconds);
    

    7.4 实现细节

    7.4.1 线程管理

    netty线程模型的优越之处是在于它会确定当前执行线程的身份,再进行相应操作。如果当前执行线程被绑定到当前的ChannelEventLoop,会被直接执行,否则会被放到EventLoop的队列里,每个EventLoop有自己单独的队列。

    EventLoop 执行逻辑

    Never put a long-running task in the execution queue, because it will block any other task from executing on the same thread.” If you must make blocking calls or execute long-running tasks, we advise the use of a dedicated EventExecutor.

    7.4.2 EventLoop/Thread分配

    EventLoopGroup包含了EventLoopsChannelsEventLoops创建方式取决于使用哪种I/O.

    异步I/O

    异步I/O仅仅使用少量的EventLoops,这些EventLoops被很多的Channels共享,这样就可以用最少的线程接受很多的Channels,而不是一个线程一个Channel

    阻塞I/O

    共同点:每个Channel的I/O事件只会被一个线程处理。

    8. Bootstrapping

    bootstrapping an application is the process of configuring it to run

    8.1 Bootstrap classes

    Namely, a server devotes a parent channel to accepting connections from clients and
    creating child channels for conversing with them, whereas a client will most likely
    require only a single, non-parent channel for all network interactions. (As we’ll see, this
    applies also to connectionless transports such as UDP , because they don’t require a
    channel for each connection.)

    server需要一个parent channel来接受客户端连接,需要创建多个child channels来应答客户端。

    client只需要一个单独的channel,不需要parent channel。

    服务端处理使用ServerBootstrap,客户端使用Bootstrap

    Why are the bootstrap classes Cloneable?
    You’ll sometimes need to create multiple channels that have similar or identical settings. To support this pattern without requiring a new bootstrap instance to be created and configured for each channel, AbstractBootstrap has been marked Cloneable . Calling clone() on an already configured bootstrap will return another bootstrap instance that’s immediately usable. Note that this creates only a shallow copy of the bootstrap’s EventLoopGroup , so the latter will be shared among all of the cloned channels. This is acceptable, as the cloned channels are often short-lived, a typical case being a channel created to make an HTTP request.

    8.2 Bootstrapping clients and connectionless protocols

    Bootstrap主要用来给客户端和使用面向无连接的应用创建Channels

    Bootstraping a client:

    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new SimpleChannelInboundHandler<ByteBuf>() {
        @Override
        protected void channeRead0(
        ChannelHandlerContext channelHandlerContext,
        ByteBuf byteBuf) throws Exception {
            System.out.println("Received data");
        }
    } );
    ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture)throws Exception {
            if (channelFuture.isSuccess()) {
                System.out.println("Connection established");
            } else {
                System.err.println("Connection attempt failed");
                channelFuture.cause().printStackTrace();
            }
        }
    } );
    

    8.2.2 Channel和EventLoopGroup的兼容性

    you can’t mix components having different
    prefixes, such as NioEventLoopGroup and OioSocketChannel . The following listing
    shows an attempt to do just that.

    ChannelEventLoopGroup的前缀要一样。否则会抛出IllegalStateException

    8.3 Bootstraping servers

    ServerBootstrap类

    A ServerBootstrap creating a ServerChannel on bind() , and the ServerChannel managing a number of child Channels.

    相比 Bootstrap类,增加了childHandler(),childAttr(),childOption()方法。ServerChannel来创建许许多多的子Channel,代表接受的连接。ServerBootstrap提供了这些方法来简化对子Channel的配置。

    NioEventLoopGroup group = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(group)
    .channel(NioServerSocketChannel.class)
    .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throw Exception {
        System.out.println("Received data");
    }
    } );
    
    ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                System.out.println("Server bound");
            } else {
                System.err.println("Bound attempt failed");
                channelFuture.cause().printStackTrace();
            }
        }
    } );
    

    8.4 Bootstrapping clients from a Channel

    Suppose your server is processing a client request that requires it to act as a client to
    a third system. This can happen when an application, such as a proxy server, has to
    integrate with an organization’s existing systems, such as web services or databases. In
    such cases you’ll need to bootstrap a client Channel from a ServerChannel

    作为服务端接受连接,同时又作为客户端,请求远程服务器(类似于proxy),最容易想到的办法是再创建一个客户端的Bootstrap,但是这样需要另外一个EventLoop来处理客户端角色的Channel,发生在服务端Channel和客户端Channel之间数据交换引起的上文切换也会带来额外的性能损耗。

    最好的办法是创建的客户端Channel和服务端Channel共享同一个EventLoop:

        ServerBootstrap bootstrap = new ServerBootstrap();
    //Sets the EventLoopGroups that provide EventLoops for processing Channel events
            bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
                    .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                        ChannelFuture connectFuture;
    
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                      //Creates a Bootstrap to connect to remote host
                            Bootstrap bootstrap = new Bootstrap();
                            bootstrap.channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
                                    System.out.println("Received data");
                                }
                            });
    //Uses the same EventLoop as the one assigned to the accepted channel
                            bootstrap.group(ctx.channel().eventLoop());
                            connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
                        }
    
                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
                                throws Exception {
                            if (connectFuture.isDone()) {
    // do something with the data
    //When the connection is complete performs some data operation (such as proxying)   
                            }
                        }
                    });
            ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        System.out.println("Server bound");
                    } else {
                        System.err.println("Bind attempt failed");
                        channelFuture.cause().printStackTrace();
                    }
                }
            });
    

    8.5 Adding multiple ChannelHandlers during a bootstrap

    bootstrap的时候,如何添加多个ChannelHandler?

    netty提供了ChannelInboundHandlerAdapter的特殊子类ChannelInitializer:

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
    

    ChannelInitializer提供了initChannel()可以轻松添加ChannelHandlersChannelPipeline

    protected abstract void initChannel(C ch) throws Exception;
    

    一旦Channel注册到EventLoop,我们实现的initChannel()就会被调用。当initChannel()返回的时候,ChannelInitializer实例会把自己从ChannelPipeline中删除。

            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializerImpl());
    
            ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
            future.sync();
    

    对应ChannelInitializerImpl的实现:

    final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpClientCodec());
            pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
        }
    }
    

    8.6 Using Netty ChannelOptions and attributes

    不需要我们手工配置每个Channel,netty提供了option()方法来把ChannelOptions应用到bootstrapChannelOptions中的配置会自动地应用到所有Channel

    Netty的Channelbootstrap类,提供了AttributeMap抽象集合和AttributeKey<T>泛型类,用来insert和retrieve属性值。使用这些工具,我们可以安全地把任意类型的数据和Channel关联起来。

    Attribute的一个使用场景是,服务端应用需要追踪用户和Channels的关系。可以把用户的ID作为一个属性存到Channel里。这样就可以实现根据ID来路由消息和Channel不活跃自动关闭等功能。

    final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
            .handler(new SimpleChannelInboundHandler<ByteBuf>() {
                @Override
                public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                    Integer idValue = ctx.channel().attr(id).get();
                    // do something with the idValue
                }
    
                @Override
                protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
                        throws Exception {
                    System.out.println("Received data");
                }
            });
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    bootstrap.attr(id, 123456);
    ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
    future.syncUninterruptibly();
    

    8.7 Bootstrapping DatagramChannels

    之前的bootstrap示例代码都是基于TCP-based的SocketChannelbootstrap也可以配置为无连接协议。

    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
            .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
                @Override
                public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                    // Do something with the packet
                }
            });
    ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                System.out.println("Channel bound");
            } else {
                System.err.println("Bind attempt failed");
                channelFuture.cause().printStackTrace();
            }
        }
    });
    

    8.8 Shutdown

    Alternatively, you can call Channel.close() explicitly on all active channels before calling EventLoopGroup.shutdownGracefully() . But in all cases, remember to shut down the EventLoopGroup itself.

    EventLoopGroup.shutdownGracefully(),它的返回值是一个future,这也是一个异步操作。

    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
    .channel(NioSocketChannel.class);
    ...
    Future<?> future = group.shutdownGracefully();
    // block until the group has shutdown
    future.syncUninterruptibly();
    

    9 Unit testing

    Netty提供了embedded transport来测试ChannelHandlers,embedded transportEmbeddedChannel (一种特殊的Channel实现) 的特色功能,可以简单地实现在pipeline中传播事件。

    我们可以写入inbound或者outbound数据到EmbeddedChannel,然后检查是否有东西传输到ChannelPipeline的末尾。我们还可以确定消息是否被编解码,是否有ChannelHandler被触发。

    Inbound data会被ChannelInboundHandlers处理,代表着从远程主机读取的数据。

    outbound data会被ChannelOutboundHandlers处理,代表将要发送到远程主机的数据。

    相关API:

    图9.1展示了数据在EmbededChannel的流动情况。我们可以:

    1. 使用writeOutbound(),写入消息到Channel,让消息以outbound方向在pipeline中传递。后续,我们可以使用readOutbound()读取处理过后的数据,判断结果是否与预期一致。

    2. 使用writeInbound(),写入消息到Channel,让消息以inbound方向在pipeline中传递。后续,我们可以使用readInbound()读取处理过后的数据,判断结果是否与预期一致。

    9.2 Testing ChannelHandlers with EmbeddedChannel

    9.2.1 Testing inbound messages

    图9.2 展示了一个简单的ByteToMessageDecoder实现。如果有足够的数据,这个Decoder会产生固定大小的frame。如果没有足够的数据,没有达到这个固定的size值,它会等待接下来的数据,继续判断能否接着产生frame。

    具体代码实现如下:

    public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
        private final int frameLength;
    
        public FixedLengthFrameDecoder(int frameLength) {
            if (frameLength <= 0) {
                throw new IllegalArgumentException(
                        "frameLength must be a positive integer: " + frameLength);
            }
            this.frameLength = frameLength;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                List<Object> out) throws Exception {
            while (in.readableBytes() >= frameLength) {
                ByteBuf buf = in.readBytes(frameLength);
                out.add(buf);
            }
        }
    }
    

    那么如何进行单元测试呢,测试代码如下:

    public class FixedLengthFrameDecoderTest {
        @Test
        public void testFramesDecoded() {
            ByteBuf buf = Unpooled.buffer();
            for (int i = 0; i < 9; i++) {
                buf.writeByte(i);
            }
            ByteBuf input = buf.duplicate();
            EmbeddedChannel channel = new EmbeddedChannel(
                    new FixedLengthFrameDecoder(3));
            // write bytes
            assertTrue(channel.writeInbound(input.retain()));
            assertTrue(channel.finish());
            // read messages
            ByteBuf read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(3), read);
            read.release();
            read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(3), read);
            read.release();
            read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(3), read);
            read.release();
            assertNull(channel.readInbound());
            buf.release();
        }
    
        @Test
        public void testFramesDecoded2() {
            ByteBuf buf = Unpooled.buffer();
            for (int i = 0; i < 9; i++) {
                buf.writeByte(i);
            }
            ByteBuf input = buf.duplicate();
            EmbeddedChannel channel = new EmbeddedChannel(
                    new FixedLengthFrameDecoder(3));
            assertFalse(channel.writeInbound(input.readBytes(2)));
            assertTrue(channel.writeInbound(input.readBytes(7)));
            assertTrue(channel.finish());
            ByteBuf read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(3), read);
            read.release();
            read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(3), read);
            read.release();
            read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(3), read);
            read.release();
            assertNull(channel.readInbound());
            buf.release();
        }
    }
    

    9.2.2 Testing outbound messages

    我们需要测试一个编码器:AbsIntegerEncoder,它是Netty的MessageToMessageEncode的一个实现,功能是将整数取绝对值。

    我们的流程如下:

    1. EmbeddedChannel会将一个四字节负数按照outbound方向写入Channel

    2. 编码器会从到来的ByteBuf读取每个负数,调用Math.abs()获得绝对值。

    3. 编码器将绝对值写入到ChannelHandlerPipe

    编码器代码实现:

    public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext,
                ByteBuf in, List<Object> out) throws Exception {
            while (in.readableBytes() >= 4) {
                int value = Math.abs(in.readInt());
                out.add(value);
            }
        }
    }
    

    怎么测试?请看下文:

    public class AbsIntegerEncoderTest {
        @Test
        public void testEncoded() {
            ByteBuf buf = Unpooled.buffer();
            for (int i = 1; i < 10; i++) {
                buf.writeInt(i * -1);
            }
            EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
            assertTrue(channel.writeOutbound(buf));
            assertTrue(channel.finish());
            // read bytes
            for (int i = 1; i < 10; i++) {
                assertEquals(i, channel.readOutbound());
            }
            assertNull(channel.readOutbound());
        }
    }
    

    9.3 Testing exception handling

    为了测试异常处理,我们有如下的示例。
    为防止资源耗尽,当我们读取到的数据多于某个数值,我们会抛出一个TooLongFrameException


    在图9.4中,最大frame的大小为3字节,当一个frame的字节数大于3,它会被忽略,并且会抛出TooLongFrameException,其他的pipeline里的其他ChannelHandlers要么覆写exceptionCaught()进行捕获处理,要么会忽略这个异常。

    解码器代码:

    public class FrameChunkDecoder extends ByteToMessageDecoder {
        private final int maxFrameSize;
    
        public FrameChunkDecoder(int maxFrameSize) {
            this.maxFrameSize = maxFrameSize;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                List<Object> out) throws Exception {
            int readableBytes = in.readableBytes();
            if (readableBytes > maxFrameSize) {
                // discard the bytes
                in.clear();
                throw new TooLongFrameException();
            }
            ByteBuf buf = in.readBytes(readableBytes);
            out.add(buf);
        }
    }
    

    如何测试,请看:

    public class FrameChunkDecoderTest {
        @Test
        public void testFramesDecoded() {
            ByteBuf buf = Unpooled.buffer();
            for (int i = 0; i < 9; i++) {
                buf.writeByte(i);
            }
            ByteBuf input = buf.duplicate();
            EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
            assertTrue(channel.writeInbound(input.readBytes(2)));
            try {
                channel.writeInbound(input.readBytes(4));
                Assert.fail();
            } catch (TooLongFrameException e) {
                // expected exception
            }
            assertTrue(channel.writeInbound(input.readBytes(3)));
            assertTrue(channel.finish());
            // Read frames
            ByteBuf read = (ByteBuf) channel.readInbound();
            assertEquals(buf.readSlice(2), read);
            read.release();
            read = (ByteBuf) channel.readInbound();
            assertEquals(buf.skipBytes(4).readSlice(3), read);
            read.release();
            buf.release();
        }
    }
    

    10.The codec framework

    encoder,将outbound消息转换成易于传输的方式(大部分是字节流)。
    decoder,将inbound网络字节流转回成应用程序消息格式。

    10.2 Decoders

    两种场景需要使用到Decoders:

    1. 将字节流解码成消息--ByteToMessageDecoderReplayingDecoder
    2. 将一种消息类型解码成另一种类型--MessageToMessageDecoder

    10.2.1 ByteToMessageDecoder抽象类

    功能: 将字节流解码成消息或者另一种字节流。

    使用示例ToIntegerDecoder

    每次从ByteBuf读取四个字节,解码成int,添加到List里。当没有更多的数据添加到List,List里的内容会传递到下一个ChannelInboundHandler

    public class ToIntegerDecoder extends ByteToMessageDecoder {
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() >= 4) {
                out.add(in.readInt());
            }
        }
    }
    

    编解码框架里,消息处理完了,会自动调用ReferenceCountUtil.release(message),资源会自动释放。

    Reference counting in codecs
    As we mentioned in chapters 5 and 6, reference counting requires special attention. In the case of encoders and decoders, the procedure is quite simple: once a mes- sage has been encoded or decoded, it will automatically be released by a call to ReferenceCountUtil.release(message) . If you need to keep a reference for later use you can call ReferenceCountUtil.retain(message) . This increments the reference count, preventing the message from being released.

    10.2.2 ReplayingDecoder抽象类

    public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
    

    ReplayingDecoder继承于ByteToMessageDecoder,特点是我们不再需要调用readableBytes(),省了判断数据是否足够的逻辑。

    注意:

    1. 不是所有的ByteBuf的操作都被支持。如果不支持会抛出UnsupportedOperationException异常。

    2. ReplayingDecoder会比ByteToMessageDecoder稍慢。

    ToIntegerDecoder2:

    public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            out.add(in.readInt());
        }
    }
    

    更多的解码工具可以在io.netty.handler.codec下找到。

    1. io.netty.handler.codec.LineBasedFrameDecoder,通过换行符(\n或者\r\n)来解析消息。

    2. io.netty.handler.codec.http.HttpObjectDecoder,解析HTTP数据。

    10.2.3 MessageToMessageDecoder抽象类

    消息格式互相转换,如把一种类型的POJO转换成另外一种。

    public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
    

    API差不多

    示例:IntegerToStringDecoder

    public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
        @Override
        public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
            out.add(String.valueOf(msg));
        }
    }
    

    一个更贴切详细的例子是io.netty.handler.codec.http.HttpObjectAggregator

    TooLongFrameException防止资源耗尽:

    public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
        private static final int MAX_FRAME_SIZE = 1024;
    
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int readable = in.readableBytes();
            if (readable > MAX_FRAME_SIZE) {
                in.skipBytes(readable);
                throw new TooLongFrameException("Frame too big!");
            }
            // do something
        }
    }
    

    10.3 Encoders

    与解码器类似,Encoders分为两种:

    1. 将消息编码成字节流。
    2. 将一种消息编码成另一种格式的消息。

    10.3.1 MessageToByteEncoder抽象类

    示例ShortToByteEncoder

    public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
        @Override
        public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
            out.writeShort(msg);
        }
    }
    

    更具体的应用实践可以参见io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder

    10.4 编解码抽象类

    既能encode,又能decode,二合一。

    10.4.1 ByteToMessageCodec抽象类

    Any request/response protocol could be a good candidate for using the ByteToMessageCodec . For example, in an SMTP implementation, the codec would read incoming bytes and decode them to a custom message type, say SmtpRequest . On the receiving side, when a response is created, an SmtpResponse will be produced, which will be encoded back to bytes for transmission.

    10.4.2 MessageToMessageCodec抽象类

    public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
    
    public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
        @Override
        protected void encode(ChannelHandlerContext ctx,
            WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out)
            throws Exception {
            ByteBuf payload = msg.getData().duplicate().retain();
    
            switch (msg.getType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
    
                break;
    
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
    
                break;
    
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, payload));
    
                break;
    
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
    
                break;
    
            case PONG:
                out.add(new PongWebSocketFrame(payload));
    
                break;
    
            case PING:
                out.add(new PingWebSocketFrame(payload));
    
                break;
    
            default:
                throw new IllegalStateException("Unsupported websocket msg " + msg);
            }
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg,
            List<Object> out) throws Exception {
            ByteBuf payload = msg.getData().duplicate().retain();
    
            if (msg instanceof BinaryWebSocketFrame) {
                out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY,
                        payload));
            } else if (msg instanceof CloseWebSocketFrame) {
                out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE,
                        payload));
            } else if (msg instanceof PingWebSocketFrame) {
                out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING,
                        payload));
            } else if (msg instanceof PongWebSocketFrame) {
                out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG,
                        payload));
            } else if (msg instanceof TextWebSocketFrame) {
                out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT,
                        payload));
            } else if (msg instanceof ContinuationWebSocketFrame) {
                out.add(new MyWebSocketFrame(
                        MyWebSocketFrame.FrameType.CONTINUATION, payload));
            } else {
                throw new IllegalStateException("Unsupported websocket msg " + msg);
            }
        }
    
        public static final class MyWebSocketFrame {
            private final FrameType type;
            private final ByteBuf data;
    
            public WebSocketFrame(FrameType type, ByteBuf data) {
                this.type = type;
                this.data = data;
            }
    
            public FrameType getType() {
                return type;
            }
    
            public ByteBuf getData() {
                return data;
            }
            public enum FrameType {BINARY,
                CLOSE,
                PING,
                PONG,
                TEXT,
                CONTINUATION;
            }
        }
    }
    

    10.4.3 CombinedChannelDuplexHandler类

    将编码器解码器放在一块影响代码的重用性。CombinedChannelDuplexHandler可以解决这个问题。我们可以使用它而不直接使用codec抽象类。

    方法签名:

    public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
    

    下面是一个使用范例:
    解码器例子ByteToCharDecoder
    功能是一次读取2个字节,解码成char写到List

    public class ByteToCharDecoder extends ByteToMessageDecoder {
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
            while (in.readableBytes() >= 2) {
                out.add(in.readChar());
            }
        }
    }
    

    编码器例子CharToByteEncoder

    public class CharToByteEncoder extends MessageToByteEncoder<Character> {
        @Override
        public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out)
            throws Exception {
            out.writeChar(msg);
        }
    }
    

    是时候combine了:

    public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
        public CombinedByteCharCodec() {
            super(new ByteToCharDecoder(), new CharToByteEncoder());
        }
    }
    

    相关文章

      网友评论

          本文标题:《netty in action》读书笔记 PART1

          本文链接:https://www.haomeiwen.com/subject/iwqefqtx.html