美文网首页
Netty 实现简单的通讯

Netty 实现简单的通讯

作者: LinJF | 来源:发表于2019-09-26 10:54 被阅读0次

一、netty入门

一,Netty是什么

1,Netty是由JBOSS提供的一个java开源框架。

2,Netty是JAR包,一般使用ALL-IN-ONE的JAR包就可以开发了。

3,Netty不需要运行在Tomcat这类服务器中,他是单独构建一个服务器。

4,Netty可以构建HTTP服务器,socket服务器,websocket服务器等。

5,Netty其实是对JDK1.4以后提供的NIO的封装,NIO就是new i/o,JDK1.7推出了NIO2。

为什么要学netty?

这里借用知乎上一个回答:

作为一个学Java的,如果没有研究过Netty,那么你对Java语言的使用和理解仅仅停留在表面水平,会点SSH,写几个MVC,访问数据库和缓存,这些只是初等Java程序员干的事。如果你要进阶,想了解Java服务器的深层高阶知识,Netty绝对是一个必须要过的门槛。有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。如果你想知道Nginx是怎么写出来的,如果你想知道Tomcat和Jetty是如何实现的,如果你也想实现一个简单的Redis服务器,那都应该好好理解一下Netty,它们高性能的原理都是类似的。
我们回顾一下传统的HTTP服务器的原理

1.创建一个ServerSocket,监听并绑定一个端口
2.一系列客户端来请求这个端口
3.服务器使用Accept,获得一个来自客户端的Socket连接对象
4.启动一个新线程处理连接
     1.读Socket,得到字节流
     2.解码协议,得到Http请求对象
     3.处理Http请求,得到一个结果,封装成一个HttpResponse对象
     4.编码协议,将结果序列化字节流
     5.写Socket,将字节流发给客户端
5.继续循环步骤3

HTTP服务器之所以称为HTTP服务器,是因为编码解码协议是HTTP协议,如果协议是Redis协议,那它就成了Redis服务器,如果协议是WebSocket,那它就成了WebSocket服务器,等等。使用Netty你就可以定制编解码协议,实现自己的特定协议的服务器。

二,Netty服务架构图

以下官方图展示了Netty基本提供的服务:

Netty服务架构图.png

主要提供的就是HTTP服务器,socket服务器,websocket服务器。

三,Netty原理架构图

从下面的原创图,可以基本看出Netty构建服务器的原理:

Netty原理架构图.png

1,组件名词解释

(1),Bootstrap / ServerBootstrap(建立连接):

Netty引导组件,简化NIO的开发步骤,是一个Netty程序的开始,作用是配置和串联各个组件。

(2)EventLoopGroup(事件循环组):

是EventLoop组合,可以包含多个EventLoop。创建一个EventLoopGroup的时候,内部包含的方法就会创建一个子对象EventLoop。

(3)EventLoop(事件循环):

循环服务Channel,可以包含多个Channel。

(4)Channel(通道):

代表一个Scoket连接,或者其他IO操作组件。 Channel是通讯的载体

(5)ChannelInitializer(初始化连接):

主要提供了一个传输通道ChannelPipeline。

(6)ChannelPipeline(传输通道):

主要是管理各种ChannelHandler业务控制器,提供一个链式管理模式。

(7)ChannelHandler(业务控制器):

主要业务写入的地方,由开发人员写入,Netty也提供了很多写好的控制器和适配器,可以直接引用。

(8)ChannelInboundHandler(通道传入控制器):

继承至ChannelHandler,在传输通道中对传入事件进行控制。

(9)ChannelOutboundHandler(通道传出控制器):

继承至ChannelHandler,在传输通道中对传出事件进行控制。

(10)Decoder(解码):

网络传输都是byte传输,所以Netty首先接收到的是byte,需要进行解码,编程JAVA对象。

Netty提供了很多解码器,包括服务架构图(最上)显示的Google Protobuf编码,这是Google提供的跨平台的小体积编码方式,在Netty中可以直接解码。

(11)Encoder(编码):

和解码类似,在传出服务器的时候,需要编码成byte传输给客户端。

(12)Future / ChannelFuture(消息返回,图上没有):

Netty提供的返回结果,类似回调函数,告知你执行结果是什么。

2,构建服务器基本方法

从原理架构图可以看出,构建一个Netty服务器基本需要3个步骤:

1),ServerBootstrap(建立连接):构建一个Scoket或者其他连接,通过事件循环建立通道。

2),ChannelInitializer(初始化连接):构建传输通道,用于管理控制器。

3),ChannelHandler(业务控制器):构建业务控制器,最基本的是解码,编码,信息传入/传出。

同样,构建一个客户端,同样也是这些步骤,不过建立连接是使用Bootstrap,并且使用一个事件循环,而服务器一般使用两个事件循环。

四,应用场景

互联网行业
游戏行业
大数据领域
企业软件
通信行业

五,接下来以实现一个简单的及时通讯作为示例来帮助理解

pom依赖

                <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
        </dependency>

Server 服务端

SimpleChatServerHandler.java

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;


//让我们从 handler (处理器)的实现开始,handler 是由 Netty 生成用来处理 I/O 事件的。

//SimpleChatServerHandler 继承自 SimpleChannelInboundHandler,这个类实现了ChannelInboundHandler接口,
// ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要
// 继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    //覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端连接时,客户端的 Channel 存入ChannelGroup列表中,
    // 并通知列表中的其他客户端 Channel
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            //writeAndFlush()方法分为两步, 先 write 再 flush
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        }
        channels.add(ctx.channel());
    }
    //覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,
    // 并通知列表中的其他客户端 Channel
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
        }
        channels.remove(ctx.channel());
    }

    //覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。
    // 其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String s) throws Exception { // (4)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
            } else {
                channel.writeAndFlush("[txbb]" + s + "\n");
            }
        }
    }

    //覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
    }

    //覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");
    }

    //exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在
    // 处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而
    // 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

SimpleChatServerInitializer.java

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


//SimpleChatServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等
public class SimpleChatServerInitializer extends
        ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());//解码
        pipeline.addLast("encoder", new StringEncoder());//编码
        pipeline.addLast("handler", new SimpleChatServerHandler());

        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
    }
}

SimpleChatServer.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleChatServer {

    private int port;

    public SimpleChatServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        // 配置nio线程组 创建两个线程组

        // NioEventLoopGroup是用于处理I / O操作的多线程事件循环器,Netty提供了很多不同的EventLoopGroup的
        // 实现处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2第一个经常被称为'boss',
        // 使用接收进来的连接。第二个经常被称为'worker',使用处理已经被接收的连接,一旦'boss'接收到连接,
        // 如何知道多少个线程已经被使用,如何映射到已经创建的Channel上都需要依赖于EventLoopGroup的实现,
        // 并且可以通过构造函数来配置他们的关系
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)    主线程,连接线成  bossGroup主要用来轮询获取客户端的连接,其实就是一个死循环  是负责处理TCP/IP连接的
        EventLoopGroup workerGroup = new NioEventLoopGroup();  // 处理线程组  workergroup中取出一个管道channel来建立连接  是负责处理Channel(通道)的I/O事件
        try {
            //ServerBootstrap是一个启动NIO服务的辅助启动类。您可以在这个服务中直接使用Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)  // server端引导类,管理两个线程组
                    //此处我们指定使用NioServerSocketChannel类来说明一个新的Channel如何接收进来的连接。
                    .channel(NioServerSocketChannel.class) // (3)
                    //此处的事件处理类经常会被使用处理过一个最近的已经接收到的Channel。SimpleChatServerInitializer继承自ChannelInitializer是一个特殊的处理类,
                    // 他的目的是帮助用户配置一个新的Channel。也许你想通过增加一些处理类类SimpleChatServerHandler来配置一个新的Channel或其对应的ChannelPipeline来
                    // 实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到pipline上,然后提取这些匿名类到最顶层的类上。
                    // 绑定客户端连接时候触发操作
                    .childHandler(new SimpleChatServerInitializer())  //(4)

                    //option()是提供给NioServerSocketChannel以接收进来的连接。childOption()是提供给由父管道ServerChannel接收到的连接,
                    // 在这个例子中也是NioServerSocketChannel。

                    //您可以设置此处指定的Channel实现的配置参数。我们正在编写一个TCP / IP的服务端,因此我们被允许设置socket的参数选项tcpNoDelay和keepAlive。
                    // 请参考ChannelOption和详细的ChannelConfig实现的接口文档最初可以对ChannelOption的有一个大概的认识。

                    // SO_BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存
                    // 放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                    //初始化服务端可连接队列,指定了队列的大小128
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    //保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            System.out.println("SimpleChatServer 启动了");

            //剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的8080端口。当然现在你可以多次调用bind()方法(基于不同绑定地址)
            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
            f.channel().closeFuture().sync();

        } finally {
            // 退出,释放线程池资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("SimpleChatServer 关闭了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new SimpleChatServer(port).run();

    }
}

Client 客户端

SimpleChatClientInitializer.java

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());//解码
        pipeline.addLast("encoder", new StringEncoder());//编码
        pipeline.addLast("handler", new SimpleChatClientHandler());
    }
}

SimpleChatClientHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

//客户端的处理类比较简单,只需要将读到的信息打印出来即可
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }
}

Client 客户端1

SimpleChatClient.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

//客户端1
public class SimpleChatClient {
    public static void main(String[] args) throws Exception{
        new SimpleChatClient("localhost", 8080).run();
    }

    private final String host;
    private final int port;

    public SimpleChatClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();// 处理线程组  workergroup中取出一个管道channel来建立连接
        try {
            Bootstrap bootstrap  = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleChatClientInitializer());

            // 绑定 ip 端口 创建连接 调用sync()方法会阻塞直到服务器完成绑定  然后服务器再获取通道channel
            Channel channel = bootstrap.connect(host, port).sync().channel();

            //定义向服务器发送的内容  system.in  控制台输入   in.readLine() 获取值 每次读一行。换句话说,用户输入一行内容,然后回车,这些内容一次性读取进来。
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

            while(true){
                //writeAndFlush()方法分为两步, 先 write 再 flush
                channel.writeAndFlush(in.readLine() + "\r\n");
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 退出,释放线程池资源
            group.shutdownGracefully();
        }

    }
}

Client 客户端2

SimpleChatServerHandler.java

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

//客户端2
public class SimpleChatClient {
    public static void main(String[] args) throws Exception{
        new SimpleChatClient("localhost", 8080).run();
    }

    private final String host;
    private final int port;

    public SimpleChatClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();// 处理线程组  workergroup中取出一个管道channel来建立连接
        try {
            Bootstrap bootstrap  = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleChatClientInitializer());

            // 绑定 ip 端口 创建连接 调用sync()方法会阻塞直到服务器完成绑定  然后服务器再获取通道channel
            Channel channel = bootstrap.connect(host, port).sync().channel();

            //定义向服务器发送的内容  system.in  控制台输入   in.readLine() 获取值 每次读一行。换句话说,用户输入一行内容,然后回车,这些内容一次性读取进来。
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

            while(true){
                //writeAndFlush()方法分为两步, 先 write 再 flush
                channel.writeAndFlush(in.readLine() + "\r\n");
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 退出,释放线程池资源
            group.shutdownGracefully();
        }

    }
}

如果对传输数据有要求的话 ,可以参考下protobuf 的封装。

相关文章

网友评论

      本文标题:Netty 实现简单的通讯

      本文链接:https://www.haomeiwen.com/subject/gntbuctx.html