美文网首页
Netty学习

Netty学习

作者: kafeimao | 来源:发表于2021-01-10 19:54 被阅读0次

    参考视频 https://www.bilibili.com/video/BV1DJ411m7NR

    目录

    1、BIO实现tcp通讯
    2、NIO实现tcp通讯
    3、线程模型
    4、Netty入门,实现tcp协议通讯
    5、Netty核心组件
    6、Netty实现群聊
    7、Netty实现http服务器
    8、Netty实现dubbo
    9、Netty实现websocket长连接
    10、Netty源码解析

    1、BIO实现tcp通讯

    服务端

    public class BioServer {
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            ServerSocket serverSocket = new ServerSocket(6666);
            System.out.println("服务启动");
            while (true) {
                Socket accept = serverSocket.accept();
                System.out.println("连接到一个客户端");
                executorService.execute(new Runnable() {
                    @SneakyThrows
                    @Override
                    public void run() {
                        handle(accept);
                    }
                });
            }
        }
        private static void handle(Socket accept) throws IOException {
            InputStream inputStream = accept.getInputStream();
            byte[] bytes = new byte[1024];
            while (true) {
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println(new String(bytes, 0, read));
                } else {
                    break;
                }
            }
            accept.close();
        }
    }
    

    2、NIO实现tcp通讯

    服务端

    public class NioServer {
        public static void main(String[] args) throws Exception {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            Selector selector = Selector.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(6666));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            while(true){
                if(selector.select(1000)==0){
                    System.out.println("等待了一秒,无连接");
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if(key.isAcceptable()){
                        SocketChannel accept = serverSocketChannel.accept();
                        accept.configureBlocking(false);
                        accept.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                    }
                    if(key.isReadable()){
                        SocketChannel channel = (SocketChannel)key.channel();
                        ByteBuffer attachment = (ByteBuffer)key.attachment();
                        channel.read(attachment);
                        System.out.println("from 客户端"+ new String(attachment.array()));
                    }
                    iterator.remove();
                }
            }
        }
    }
    

    客户端

    public class NioClient {
        public static void main(String[] args) throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
            if(!socketChannel.connect(inetSocketAddress)){
                while (!socketChannel.finishConnect()){
                    System.out.println("因为链接需要时间,没有成功时,可以做别的事");
                }
            }
            String str = "hello,nio";
            ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
            socketChannel.write(byteBuffer);
            System.in.read();
        }
    }
    
    

    3、线程模型

    注释:黄色表示对象,蓝色表示线程,白色表示方法
    1.传统线程模型


    image.png

    每个客户端发出连接请求,都有一个对应的线程进行处理
    Reactor线程模型:I/O多路复用结合线程池,就是reactor的基本思想
    2.单reactor单线程

    image.png

    所有请求都给reactor,reactor根据不同请求进行处理,在同一个线程中
    3.单reactor多线程


    image.png

    4.主从reactor


    image.png

    4、Netty入门实现tcp通讯

    依赖

    <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.20.Final</version>
            </dependency>
    

    服务端

    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
             //1、创建两个线程组,bossGroup和workGroup
             //2、boosGroup只处理连接请求,真正和客户端业务处理,会交给workGroup
             //3、两个都是无线循环
            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            NioEventLoopGroup workGroup = new NioEventLoopGroup();
            try{
                //创建服务器端的启动对象
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                //配置参数
                serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)//服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new NettyServerHandler());
                            }
                        });//给workGroup的对应的管道设置处理器
                System.out.println("服务器准备好了");
                //绑定端口,并且同步
                ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
                //监听关闭通道(这里涉及到netty的异步模型)
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    

    服务端处理器

    /**
     * 自定义的handler 需要继承netty规定的HandlerAdapter
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        /**
         * 读取客户端数据事件
         * @param ctx 上下文对象,含有管道,通道,地址
         * @param msg  客户端发送的数据
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server ctx = "+ ctx);
            //将msg转换为ByteBuf
            ByteBuf byteBuf = (ByteBuf)msg;
            System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        }
        /**
         * 数据读取完毕
         * @param ctx
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //给客户端回复消息,发送消息到缓冲,并刷新
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
        }
        /**
         * 发生异常事件
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    客户端

    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
            //客户端需要一个事件循环组
            NioEventLoopGroup clientGroup = new NioEventLoopGroup();
            try {
                //客户端启动对象
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(clientGroup)
                        .channel(NioSocketChannel.class)//客户端通道的实现类
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new NettyClientHandler());
                            }
                        });
                System.out.println("客户端ok");
                //启动客户端连接服务端
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
                //监听通道关闭
                channelFuture.channel().closeFuture().sync();
            }finally {
                clientGroup.shutdownGracefully();
            }
        }
    }
    

    客户端处理器

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        //通道就绪事件
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("ctx = "+ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", CharsetUtil.UTF_8));
        }
        //读取数据事件
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    5、Netty核心组件

    ServerBootstrap:引导服务端启动的对象
    Bootstrap:引导客户端启动的对象
    Future:Netty中的所有操作都是异步的,但是可以注册一个监听,Future和ChannelFuture就是具体的实现
    Channel:能够用于执行网络io,根据不同的协议,都有对应的channel
    Selector:选择器,不断查询注册在其上面的channel是否有触发事件
    ChannelHandler:


    image.png

    ChannelPipeline:包含了channelHandler的一个list


    image.png

    6、Netty实现群聊系统

    服务端

    public class GroupChatServer {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,128)
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                                pipeline.addLast("decoder",new StringDecoder());//加入一个解码器
                                pipeline.addLast("encoder",new StringEncoder());//加入编码器
                                pipeline.addLast(new GroupChatServerHandler());
                            }
                        });
                System.out.println("服务器启动");
                ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    

    服务端处理器

    public class GroupChatServerHandler  extends SimpleChannelInboundHandler<String> {
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        /**
         * 一旦建立连接,第一个执行
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.add(channel);
            //将客户端加入聊天的信息,推送给其他在线的客户
            channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天室"+ simpleDateFormat.format(new Date()));
        }
        /**
         * 表示channel是活跃状态
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + "上线了");
        }
        /**
         * 表示channel不活跃了
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress()+ "离线了");
        }
        /**
         * 表示断开连接
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"退出了群聊");
        }
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            Channel channel = channelHandlerContext.channel();
            channelGroup.forEach(ch -> {
                if(channel!=ch){
                    ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息"+s+"\n");
                }else {
                    ch.writeAndFlush("[自己]发送了消息"+s+"\n");
                }
            });
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    客户端

    public class GroupChatClient {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder",new StringDecoder());//加入一个解码器
                                pipeline.addLast("encoder",new StringEncoder());//加入编码器
                                pipeline.addLast(new GroupChatClientHandler());
                            }
                        });
                System.out.println("客户端启动");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
                Channel channel = channelFuture.channel();
                Scanner scanner = new Scanner(System.in);
                while(scanner.hasNextLine()){
                    String msg = scanner.nextLine();
                    channel.writeAndFlush(msg+"\n");
                }
            }finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    

    客户端处理器

    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println(s.trim());
        }
    }
    

    Netty实现http服务器

    服务端

    public class TestServer {
        public static void main(String[] args) throws Exception{
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("myHttpServerCodec",new HttpServerCodec());
                                pipeline.addLast("myHandle",new TestHttpServerHandler());
                            }
                        });
                ChannelFuture cf = bootstrap.bind(9998).sync();
                cf.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    

    处理器

    public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
            if(httpObject instanceof HttpRequest){
                System.out.println("msg 的类型"+httpObject.getClass().getName());
                System.out.println("客户端地址"+ channelHandlerContext.channel().remoteAddress());
                HttpRequest httpRequest = (HttpRequest)httpObject;
                URI uri = new URI(httpRequest.uri());
                if("/favicon.ico".equals(uri.getPath())){
                    System.out.println("请求网站图标,不做响应");
                    return;
                }
                ByteBuf byteContent = Unpooled.copiedBuffer("我是服务器", CharsetUtil.UTF_8);
                DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteContent);
                httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=utf-8");
                httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteContent.readableBytes());
                channelHandlerContext.writeAndFlush(httpResponse);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
        }
    }
    

    8、Netty实现dubbo

    待提供的服务

    public interface HelloService {
       String hello(String msg);
    }
    

    服务实现

    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String msg) {
            System.out.println("收到消费者消息:"+msg);
            if(msg!=null){
                return "你好,我收到"+msg;
            }else {
                return "你好";
            }
        }
    }
    

    dubbo服务提供者

    public class NettyServer {
        public static void main(String[] args) {
            NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workGroup = new NioEventLoopGroup(8);
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(boosGroup,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder",new StringDecoder());
                                pipeline.addLast("encoder",new StringEncoder());
                                pipeline.addLast(new NettyServerHandler());
                            }
                        });
                System.out.println("服务端开始提供服务");
                ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                boosGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    

    服务端处理器

    public class NettyServerHandler extends ChannelInboundHandlerAdapter  {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("msg="+msg);
            if(msg.toString().startsWith(ClientBootstrap.provideName)){
                String helloResult = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                ctx.writeAndFlush(helloResult);
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
        }
    }
    

    dubbo服务消费者

    public class NettyClient {
       private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
       private static NettyClientHandler client;
       public Object getBean(final Class<?> serverClass,final String providerName){
           return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serverClass},(proxy,method,args)->{
              if(client==null){
                  initClient();
              }
              client.setPara(providerName+args[0]);
              return executorService.submit(client).get();
           });
       }
       private static void  initClient(){
           client = new NettyClientHandler();
    
           NioEventLoopGroup clientGroup = new NioEventLoopGroup();
           Bootstrap bootstrap = new Bootstrap();
           bootstrap.group(clientGroup)
                   .channel(NioSocketChannel.class)
                   .option(ChannelOption.TCP_NODELAY,true)
                   .handler(new ChannelInitializer<SocketChannel>() {
                       @Override
                       protected void initChannel(SocketChannel socketChannel) throws Exception {
                           ChannelPipeline pipeline = socketChannel.pipeline();
                           pipeline.addLast(new StringDecoder());
                           pipeline.addLast(new StringEncoder());
                           pipeline.addLast(client);
                       }
                   });
           try {
               ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9998).sync();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
    

    客户端处理器

    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
        private ChannelHandlerContext context;
        private String result;
        private String para;
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            context = ctx;
        }
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            result = msg.toString();
            notify();
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
        @Override
        public synchronized Object call() throws Exception {
            context.writeAndFlush(para);
            wait();
            return result;
        }
        void setPara(String para){
            this.para = para;
        }
    }
    

    消费服务

    public class ClientBootstrap {
        public static final String provideName = "helloService#hello#";
    
        public static void main(String[] args) {
            NettyClient nettyClient = new NettyClient();
            HelloService helloService = (HelloService)nettyClient.getBean(HelloService.class, provideName);
            String hello = helloService.hello("你好 dubbo");
            System.out.println("res:"+hello);
        }
    }
    

    9、Netty实现websocket长连接

    服务端

    public class WebSocketServer {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            NioEventLoopGroup workGroup = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new HttpServerCodec());
                                pipeline.addLast(new ChunkedWriteHandler());
                                pipeline.addLast(new HttpObjectAggregator(8192));
                                pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                                pipeline.addLast(new WebSocketHandler());
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(9998).sync();
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    

    服务端处理器

    public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
            System.out.println("服务器收到消息"+textWebSocketFrame.text());
            channelHandlerContext.writeAndFlush(new TextWebSocketFrame("hello 客户端"));
        }
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded被调用"+ctx.channel().id().asLongText());
            System.out.println("handlerAdded被调用"+ctx.channel().id().asShortText());
        }
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerRemoved被调用"+ctx.channel().id().asShortText());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    10、Netty源码解析

    相关文章

      网友评论

          本文标题:Netty学习

          本文链接:https://www.haomeiwen.com/subject/qmbonktx.html