美文网首页
初尝Netty(二):简单聊天

初尝Netty(二):简单聊天

作者: 请不要问我是谁 | 来源:发表于2019-01-06 19:57 被阅读0次

    客户端连接服务器后服务器显示客户端上线,客户端断开服务器连接后服务器显示客户端下线,客户端发送一条消息,服务器和其他客户端都能接收到消息。
    基本框架依然与第一个Echo程序相同,服务器与客户端都包含三部分代码。

    MyChatServer

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * @author kun
     */
    public class MyServer {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new MyServerInitializer());
                ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    MyChatServerInitializer

    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.Delimiters;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    /**
     * @author kun
     */
    public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            # 将特殊的分隔符作为消息分隔符,回车换行符是他的一种
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
            pipeline.addLast(new MyServerHandler());
        }
    }
    

    MyChatServerHandler

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    /**
     * @author kun
     */
    public class MyServerHandler extends SimpleChannelInboundHandler<String> {
        # 记录已经连接到服务器的通道,用来广播
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.forEach(ch->{
                if (channel!=ch){
                    ch.writeAndFlush(channel.remoteAddress()+"发送消息:"+msg+"\n");
                }else {
                    ch.writeAndFlush("[自己]"+msg+"\n");
                }
            });
            System.out.println(msg);
        }
        # 在ChannelHandler添加到ChannelPipeline时触发
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[服务器]-"+channel.remoteAddress()+"加入\n");
            channelGroup.add(channel);
        }
        # ChannelHandler从ChannelPipeline中被删除的时候触发
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[服务器]-"+channel.remoteAddress()+"离开\n");
            System.out.println("剩余客户端:"+channelGroup.size());
        }
        # 通道开启时触发
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            System.out.println(channel.remoteAddress()+"上线");
        }
        # 通道关闭时触发
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            System.out.println(channel.remoteAddress()+"下线");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    MyChatClient

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    
    /**
     * @author kun
     */
    public class MyClient {
        public static void main(String[] args) {
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new MyClientInitializer());
                Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
                BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                # 死循环,将命令行的输入发送到服务器
                for (;;){
                    channel.writeAndFlush(br.readLine()+"\r\n");
                }
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            }finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    

    MyChatClientInitializer

    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.Delimiters;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    /**
     * @author kun
     */
    public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
            pipeline.addLast(new MyClientHandler());
        }
    }
    

    MyChatClientHandler

    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * @author kun
     */
    public class MyClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg);
        }
    }
    

    结果
    先开启服务端,再一次开启三个客户端

    服务器
    客户端1
    客户端2
    客户端3

    相关文章

      网友评论

          本文标题:初尝Netty(二):简单聊天

          本文链接:https://www.haomeiwen.com/subject/kwferqtx.html