美文网首页
尚硅谷Netty 案例

尚硅谷Netty 案例

作者: 手扶拖拉机_6e4d | 来源:发表于2021-06-14 16:43 被阅读0次
  • 导入netty的library包


    image.png

下载所有的netty包 io.netty:netty-all

image.png
选择4.1.20的版本: io.netty:netty-all:4.1.20.Final
image.png
  • Netty demo1

package cn.netty.project1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        /*
            bossGroup和workerGroup含有的子线程(NioEventLoop)个数默认等于cp核数 * 2
         */
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            // 创建服务器的启动对象,配置参数
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
// 注意:可以通过一个集合管理SocketChannel,在推送消息时,可以将业务加入到各个channel对应的NIOEventLoop的taskQueue或者scheduleTaskQueue
                            System.out.println("客户端SocketChannel hashCode=" + socketChannel.hashCode());
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });

            System.out.println("...服务器is ready...");

            //绑定一个端口并且同步生成了一个ChannelFuture对象
            // 启动服务器并启动监听
            ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();

            // 对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

package cn.netty.project1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.nio.ByteBuffer;

/**
 说明:
    自定义一个Handler需要继承netty规定好的某个HandlerAdapter适配器
    这时候我们自定义的Handler才能称之为handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件:可以读取客户端发送的消息
     * @param ctx : 上下文对象,含有管道pipeline,通道channel,地址
     * @param msg: 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx=" + ctx);

        /*
            将客户端发送的数据msg转成byteBuffer
         */
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息=" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址=" + ctx.channel().remoteAddress()); // 拿到客户端地址
    }

    /**
     * 读取数据完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 将数据写入缓存并刷新
        // 对发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hell 客户端", CharsetUtil.UTF_8));
    }

    /**
     * 处理异常,关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
package cn.netty.project1;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        // 客户端需要一个事件循环组
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        try{
            // 创建客户端启动对象
            Bootstrap bootstrap = new Bootstrap();
            // 设置相关参数
            bootstrap.group(nioEventLoopGroup) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("客户端is ok...");

            //启动客户端连接服务端
            // 关于channel要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();

            // 给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            nioEventLoopGroup.shutdownGracefully();
        }
    }
}
package cn.netty.project1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

/**
    说明:
    1.Inbound代表入栈
 */
public class NettyClientHandler extends  ChannelInboundHandlerAdapter{ //SimpleChannelInboundHandler<HttpContent>

    /**
     * 通道准备就绪就会调用该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
    }

    /*
        当通道有读取事件时就会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;

        System.out.println("服务器回复的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务器端输出:
...服务器is ready...
server ctx=ChannelHandlerContext(NettyServerHandler#0, [id: 0x9bb8b3b5, L:/127.0.0.1:6668 - R:/127.0.0.1:65274])
客户端发送的消息=hello server
客户端地址=/127.0.0.1:65274

客户端输出:
客户端is ok...
服务器回复的消息:hell 客户端
服务器地址:/127.0.0.1:6668

  • TaskQueue自定义任务
  • TaskQueue自定义任务

任务队列中的task有3种经典使用场景
1.用户程序自定义的普通任务


image.png

2.用户自定义定时任务
3.非当前Reactor线程调用Channel的各种方法
例如在推送系统的业务线程里面,根据用户的标识找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景,最终的write会提交到任务队列中后被异步消费

demo:自定义普通任务

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 模拟耗时操作(异步执行->提交到该channel对应的NIOEventLoop的taskQueue中)
        Thread.sleep(10000); // 睡眠10秒
        ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!", CharsetUtil.UTF_8));
        System.out.println("go on...");
    }

服务器端输出:
服务器is ready...
go on...

客户端输出:
客户端is ok...
服务器回复的消息:first hello java!second hello 客户端
服务器地址:/127.0.0.1:6668

把耗时操作放入taskQueue
会先输出“second hello 客户端”, 再输出“first hello java!”

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000); // 睡眠10秒
                    ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!", CharsetUtil.UTF_8));
                } catch (Exception e){
                    System.out.println("捕获异常=" + e.getMessage());
                }
            }
        });
    }

客户端输出:
客户端is ok...
服务器回复的消息:second hello 客户端
服务器地址:/127.0.0.1:6668
服务器回复的消息:first hello java!
服务器地址:/127.0.0.1:6668

2.自定义scheduleTask

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 用户自定义定时任务(提交到scheduleTaskQueue)
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000); // 睡眠5秒
                    ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
                } catch (Exception e){
                    System.out.println("捕获异常=" + e.getMessage());
                }
            }
        }, 5, TimeUnit.SECONDS);

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10 * 1000); // 睡眠5秒
                    ctx.writeAndFlush(Unpooled.copiedBuffer("third hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
                } catch (Exception e){
                    System.out.println("捕获异常=" + e.getMessage());
                }
            }
        }, 10, TimeUnit.SECONDS);

        System.out.println("go on...");   // 在这一行打断点
    }

ctx->pipeline->channel->eventLoop->scheduledTaskQueue


image.png

客户端输出:
客户端is ok...
服务器回复的消息:second hello 客户端当前时间=23:30:26
服务器地址:/127.0.0.1:6668
服务器回复的消息:first hello java!当前时间=23:30:36
服务器地址:/127.0.0.1:6668
服务器回复的消息:third hello java!当前时间=23:30:46
服务器地址:/127.0.0.1:6668

方案再说明:

  • 1.Netty抽象出两组线程池,BossGroup专门负责接受客户端连接,WorkerGroup专门负责网络读写操作
  • 2.NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道
  • 3.NioEventLoop内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
  • 4.NioEventLoopGroup下包含多个NioEventLoop
    每个NioEventLoop中包含一个Selector,一个taskQueue
    每个NioEventLoop的Selector上可以注册监听多个NioChannel
  • 5.每个NioChannel只会绑定在唯一的NioEventLoop上
  • 6.每个NioChannel都绑定有一个自己的ChannelPipeLine

群聊系统
编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通信(非阻塞)
实现多人群聊
服务器端:监测用户上线、离线,并实现消息转发功能
客户端:通过channel可以无阻塞发送消息给其他所有用户,同时可以接受其他用户发送的消息(由服务器转发得到)


image.png
//GroupChatServer
package cn.netty.groupchat2;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupChatServer {

    private int port;

    public GroupChatServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        new GroupChatServer(7011).run();
    }

    public void run() throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline =  socketChannel.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });

            System.out.println("netty 服务器启动");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //监听关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

// GroupChatServerHandler
package cn.netty.groupchat2;


import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    //使用一个hashmap 管理
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();

    //定义一个channle 组,管理所有的channel
    //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        //获取到当前channel
        Channel channel = channelHandlerContext.channel();
        //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
        channelGroup.forEach((ch)->{
            if (channel != ch){  //不是当前的channel,转发消息
                ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
            } else { //回显自己发送的消息给自己
                ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
            }
        });
    }


    /**
     * handlerAdded 表示连接建立,一旦连接,第一个被执行,将当前channel 加入到  channelGroup
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);
    }

    /**
     * 表示channel 处于不活动状态, 提示 xx离线了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
        System.out.println("channelGroup size" + channelGroup.size());
    }

    /**
     * 表示channel 处于活动状态, 提示 xx上线
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上线了~");
    }

    /**
     * 断开连接, 将xx客户离开信息推送给当前在线的客户
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channel.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
        System.out.println("channelGroup size" + channelGroup.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

//GroupChatClient
package cn.netty.groupchat2;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class GroupChatClient {
    private int port ;
    private String host ;

    public GroupChatClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void run() throws Exception{
        NioEventLoopGroup group = new NioEventLoopGroup();

        try (Scanner scanner = new Scanner(System.in)){
            Bootstrap bootstrap = new Bootstrap().group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress()+ "--------");

            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg + "\r\n");
            }

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        new GroupChatClient( 7011, "127.0.0.1").run();
    }
}
// GroupChatClientHandler
package cn.netty.groupchat2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

相关文章

网友评论

      本文标题:尚硅谷Netty 案例

      本文链接:https://www.haomeiwen.com/subject/izefsltx.html