美文网首页
网络编程之Netty为王

网络编程之Netty为王

作者: 舞鹤Roc | 来源:发表于2021-04-30 09:56 被阅读0次

守护最好的官方文档

一、定义

官方定义

Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。

个人理解

高性能Java网络编程框架(High performance Java networking)、异步IO、基于事件驱动、优秀的开源通信框架

二、特性

  • 多路复用的IO模型(bossGroup,workGroup)
  • 未遵循Servlet标准
  • 没有路由映射功能
  • 支持长链接(可以像websocket一样服务器可以向浏览器推送)
Netty的特性.jpeg

三、版本

总结下来就是【3废弃、4主流、5鸡肋】,目前都建议使用4.x版本,因为

  • 使用Executor代替ThreadFactory
  • 更简单更精确的缓冲区泄漏追踪
  • 全局唯一的Channel ID
  • 更灵活的线程模型
了解具体的可以参考:

四、案列

使用Netty做一个聊天室的效果,学会Netty的基本写法套路(客户端、服务队、核心逻辑ServerHandler)。
代码如下,趣味性和技术性共赏。

客户端

package io.netty.example.chat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * 客户端 连接不上服务端会自动关闭  Connection refused / System.in 手动阻塞不会
 *
 * @author wuhe on 4/2/21
 */
public class ChatClient {
    public static void main(String[] args) {
        // 客户端只需要一个事件循环组
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            // 服务端的是ServerBootstrap、NioServerSocketChannel
            Bootstrap bootstrap = new Bootstrap();
            // 客户端不使用childHandler
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 管道
                    ChannelPipeline pipeline = ch.pipeline();

                    //添加的都是handler
                    // 解码器decoder
                    pipeline.addLast("name", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                    // 编码器encoder
                    pipeline.addLast(new LengthFieldPrepender(4));

                    // 字符串的编解码器
                    pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                    pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

                    // 自定义处理器
                    pipeline.addLast("testClientHandler", new SimpleChannelInboundHandler<String>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                            System.out.println(msg);
                        }

                        // 发生了异常之后,一般是把连接关闭掉
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                            cause.printStackTrace();
                            ctx.close();
                        }

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        }
                    });
                }
            });

            // 客户端是connect 服务端是bind
            Channel clintChannel = bootstrap.connect("localhost", 8888).sync().channel();

            for (; ; ) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                clintChannel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

服务端

package io.netty.example.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;


/**
 * Server =>  Initializer => ServerHandler => Client
 *
 * @see io.netty.example.http.MyHttpServer
 */
public class ChatServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 管道
                            ChannelPipeline pipeline = ch.pipeline();

                            //添加的都是handler
                            // 解码器decoder
                            pipeline.addLast("name", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            // 编码器encoder
                            pipeline.addLast(new LengthFieldPrepender(4));

                            // 字符串的编解码器
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

                            // 自定义处理器
                            pipeline.addLast("testServerHandler", new ServerHandler());
                        }
                    });
            ChannelFuture sync = bootstrap.bind(8888).sync();
            sync.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

核心的ServerHandler

package io.netty.example.chat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class ServerHandler extends SimpleChannelInboundHandler<String> {
    // channel组
    // new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
    // ||
    // \/
    // this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println("【" + channel.remoteAddress() + "】:" + msg);

        channelGroup.forEach(ch -> {
            if (ch != channel) {
                ch.writeAndFlush("【" + channel.remoteAddress() + "】:" + msg);
            } else {
                channel.writeAndFlush("【我】:" + msg);
            }
        });
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "准备出发======>>>>>>>\n");
        System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "准备出发======>>>>>>>\n");
        // 不需要手动remove TODO 寻找源码
        // channelGroup.remove(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "已经离开======>>>>>>>\n");
        System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "已经离开======>>>>>>>\n");
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "进入了快乐星球======>>>>>>>\n");
        channelGroup.add(channel);
        System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "进入了快乐星球======>>>>>>>\n");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "退出了快乐星球======>>>>>>>\n");
        System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "退出了快乐星球======>>>>>>>\n");
    }

    // 发生了异常之后,一般是把连接关闭掉
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

效果截图

服务端.png
客户端01.png
客户端02.png

五、扩展了解

学习Netty能学习到什么

BIO、NIO、线程池、阻塞队列、网络编程、TCP/UDP、事件调度机制、线程模型、设计模式

还有哪些HTTP客户端

  • JDK原生的URLConnection
  • Apache的 Http Client
  • Netty的异步 Http Client
  • Spring的 RestTemplate
  • Spring Cloud Netflix中首选的Feign(集成上述:底层默认URLConnection)

谁在用Netty

  • Dubbo
  • RocketMQ
  • Elasticsearch
  • gRPC

学而不厌

相关文章

网友评论

      本文标题:网络编程之Netty为王

      本文链接:https://www.haomeiwen.com/subject/ovcnrltx.html