美文网首页
Netty学习--传输

Netty学习--传输

作者: 何何与呵呵呵 | 来源:发表于2019-01-09 08:55 被阅读0次
    传输迁移
    • 未使用Netty 的阻塞网络编程
    public class PlainOioServer {
        public void serve(int port) throws IOException {
            // 将服务器绑定到指定端口
            final ServerSocket socket = new ServerSocket(port);
            try {
                for (;;) {
                    final Socket clientSocket = socket.accept(); // 接受连接
                    System.out.println("Accepted connection from " + clientSocket);
                    new Thread(new Runnable() { // 创建一个新的线程来处理该连接
                        @Override
                        public void run() {
                            OutputStream out;
                            try {
                                out = clientSocket.getOutputStream();
                                out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); // 将消息写给已连接的客户端
                                out.flush();
                                clientSocket.close(); // 关闭连接
                            }
                            catch (IOException e) {
                                e.printStackTrace();
                            }
                            finally {
                                try {
                                    clientSocket.close();
                                }
                                catch (IOException ex) {
                                    // ignore on close
                                }
                            }
                        }
                    }).start();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 未使用Netty 的异步网络编程
    public void serve(int port) throws IOException {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket ssocket = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(port);
            ssocket.bind(address); // 将服务器绑定到选定的端口
            Selector selector = Selector.open(); // 打开Selector来处理 Channel
            serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 将ServerSocket注册到Selector以接受连接
            final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
            for (;;) {
                try {
                    selector.select(); // 等待需要处理的新事件;阻塞将一直持续到下一个传入事件
                } catch (IOException ex) {
                    ex.printStackTrace();
                    // handle exception
                    break;
                }
                Set<SelectionKey> readyKeys = selector.selectedKeys(); // 获取所有接收事件的Selection-Key实例
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        if (key.isAcceptable()) { // 检查事件是否是一个新的已经就绪可以被接受的连接
                            ServerSocketChannel server =
                                    (ServerSocketChannel)key.channel();
                            SocketChannel client = server.accept();
                            client.configureBlocking(false);
                            // 接受客户端,并将它注册到选择器
                            client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                            System.out.println("Accepted connection from " + client);
                        }
                        if (key.isWritable()) { // 检查套接字是否已经准备好写数据
                            SocketChannel client =
                                    (SocketChannel)key.channel();
                            ByteBuffer buffer =
                                    (ByteBuffer)key.attachment();
                            while (buffer.hasRemaining()) {
                                if (client.write(buffer) == 0) { // 将数据写到已连接的客户端
                                    break;
                                }
                            }
                            client.close();
                        }
                    } catch (IOException ex) {
                        key.cancel();
                        try {
                            key.channel().close();
                        } catch (IOException cex) {
                          // ignore on close
                        }
                    }
                }
            }
        }
    
    • 使用Netty 的阻塞网络处理
     public void server(int port) throws Exception {
            final ByteBuf buf = Unpooled.unreleasableBuffer(
                    Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
            EventLoopGroup group = new OioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // 创建Server-Bootstrap
                b.group(group)
                        .channel(OioServerSocketChannel.class) // 使用OioEventLoopGroup以允许阻塞模式(旧的I/O)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new ChannelInitializer<SocketChannel>() { // 指定Channel-Initializer,对于每个已接受的连接都调用它
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(
                                        new ChannelInboundHandlerAdapter() {
                                            @Override
                                            public void channelActive(
                                                    ChannelHandlerContext ctx)
                                                    throws Exception {
                                                ctx.writeAndFlush(buf.duplicate()) // 将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
                                                        .addListener(
                                                                ChannelFutureListener.CLOSE);
                                            }
                                        });
                            }
                        });
                ChannelFuture f = b.bind().sync(); // 绑定服务器以接受连接
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully().sync();// 释放所有的资源
            }
        }
    
    • 使用Netty 的异步网络处理
     public void server(int port) throws Exception {
            final ByteBuf buf = Unpooled.unreleasableBuffer(
                    Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
           EventLoopGroup group = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // 创建Server-Bootstrap
                b.group(group).channel(NioServerSocketChannel.class) // 使用OioEventLoopGroup以允许阻塞模式(旧的I/O)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new ChannelInitializer<SocketChannel>() { // 指定Channel-Initializer,对于每个已接受的连接都调用它
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
                                ch.pipeline().addLast(
                                        new ChannelInboundHandlerAdapter() {
                                            @Override
                                            public void channelActive(
                                                    ChannelHandlerContext ctx)
                                                    throws Exception {
                                                ctx.writeAndFlush(buf.duplicate()) // 将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
                                                        .addListener(
                                                                ChannelFutureListener.CLOSE);
                                            }
                                        });
                            }
                        });
                ChannelFuture f = b.bind().sync(); // 绑定服务器以接受连接
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully().sync();// 释放所有的资源
            }
        }
    

    通过代码比较,netty基本不受影响(通用性强).

    传输API
    Channel 接口的层次结构

    如图所示,每个Channel 都将会被分配一个ChannelPipeline 和ChannelConfig。ChannelConfig 包含了该Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个ChannelConfig 的子类型。Comparable保证每一个channel都是独一无二的.
    ChannelPipeline 持有所有将应用于入站和出站数据以及事件的ChannelHandler 实例.

    • 将数据从一种格式转换为另一种格式;
    • 提供异常的通知;
    • 提供Channel 变为活动的或者非活动的通知;
    • 提供当Channel 注册到EventLoop 或者从EventLoop 注销时的通知;
    • 提供有关用户自定义事件的通知。

    channel是线程安全的,多个线程可以共用一个channel.

    内置的传输
    • NIO | io.netty.channel.socket.nio | 使用java.nio.channels 包作为基础——基于选择器的方式
    • Epoll | io.netty.channel.epoll | 由JNI 驱动的epoll()和非阻塞IO。这个传输支持只有在Linux 上可用的多种特性,如SO_REUSEPORT,比NIO 传输更快,而且是完全非阻塞的
    • OIO | io.netty.channel.socket.oio | 使用java.net 包作为基础——使用阻塞流
    • Local | io.netty.channel.local | 可以在VM 内部通过管道进行通信的本地传输
    • Embedded | io.netty.channel.embedded | Embedded 传输,允许使用ChannelHandler 而又不需要一个真正的基于网络的传输。这在测试你的ChannelHandler 实现时非常有用
    NIO——非阻塞I/O
    选择并处理状态的变化
    Epoll—用于Linux 的本地非阻塞传输

    流程和NIO一样,失去了NIO的通用性,但运行在linux上更快,只需要将NioEventLoopGroup替换成EpollEventLoopGroup,并且将NioServerSocketChannel.class 替换为EpollServerSocketChannel.class 即可.

    OIO—旧的阻塞I/O

    建立在java.net 包的阻塞实现之上,不是异步的.Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式.

    OIO 的处理逻辑
    用于JVM 内部通信的Local 传输

    用于在同一个JVM 中运行的客户端和服务器程序之间的异步通信.

    Embedded 传输

    可以将一组ChannelHandler 作为帮助器类嵌入到其他的ChannelHandler 内部。通过这种方式,你将可以扩展一个ChannelHandler 的功能,而又不需要修改其内部代码。

    相关文章

      网友评论

          本文标题:Netty学习--传输

          本文链接:https://www.haomeiwen.com/subject/kgsyrqtx.html