美文网首页
Netty 入门案例

Netty 入门案例

作者: yongguang423 | 来源:发表于2018-09-29 06:39 被阅读12次
/**
 * 1. 双线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. 绑定服务监听端口并启动服务
 */
package com.bjsxt.socket.netty.first;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server4HelloWorld {
    // 监听线程组,监听客户端请求
    private EventLoopGroup acceptorGroup = null;
    // 处理客户端相关操作线程组,负责处理与客户端的数据通讯
    private EventLoopGroup clientGroup = null;
    // 服务启动相关配置信息
    private ServerBootstrap bootstrap = null;
    public Server4HelloWorld(){
        init();
    }
    private void init(){
        // 初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量。
        acceptorGroup = new NioEventLoopGroup();
        clientGroup = new NioEventLoopGroup();
        // 初始化服务的配置
        bootstrap = new ServerBootstrap();
        // 绑定线程组
        bootstrap.group(acceptorGroup, clientGroup);
        // 设定通讯模式为NIO, 同步非阻塞
        bootstrap.channel(NioServerSocketChannel.class);
        // 设定缓冲区大小, 缓存区的单位是字节。
        bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
        // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
        bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
            .option(ChannelOption.SO_RCVBUF, 16*1024)
            .option(ChannelOption.SO_KEEPALIVE, true);
    }
    /**
     * 监听处理逻辑。
     * @param port 监听端口。
     * @param acceptorHandlers 处理器, 如何处理客户端请求。
     * @return
     * @throws InterruptedException
     */
    public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
        
        /*
         * childHandler是服务的Bootstrap独有的方法。是用于提供处理对象的。
         * 可以一次性增加若干个处理逻辑。是类似责任链模式的处理方式。
         * 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A-》B顺序依次处理。
         * 
         * ChannelInitializer - 用于提供处理器的一个模型对象。
         *  其中定义了一个方法,initChannel方法。
         *   方法是用于初始化处理逻辑责任链条的。
         *   可以保证服务端的Bootstrap只初始化一次处理器,尽量提供处理逻辑的重用。
         *   避免反复的创建处理器对象。节约资源开销。
         */
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(acceptorHandlers);
            }
        });
        // bind方法 - 绑定监听端口的。ServerBootstrap可以绑定多个监听端口。 多次调用bind方法即可
        // sync - 开始监听逻辑。 返回一个ChannelFuture。 返回结果代表的是监听成功后的一个对应的未来结果
        // 可以使用ChannelFuture实现后续的服务器和客户端的交互。
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    
    /**
     * shutdownGracefully - 方法是一个安全关闭的方法。可以保证不放弃任何一个已接收的客户端请求。
     */
    public void release(){
        this.acceptorGroup.shutdownGracefully();
        this.clientGroup.shutdownGracefully();
    }
    
    public static void main(String[] args){
        ChannelFuture future = null;
        Server4HelloWorld server = null;
        try{
            server = new Server4HelloWorld();
            future = server.doAccept(9999,new Server4HelloWorldHandler());
            System.out.println("server started.");
            
            // 关闭连接的。
            future.channel().closeFuture().sync();
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            if(null != server){
                server.release();
            }
        }
    }
    
}

/**
 * @Sharable注解 - 
 *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
 *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
 *  如果handler是一个Sharable的,一定避免定义可写的实例变量。
 *  bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new XxxHandler());
            }
        });
 */
package com.bjsxt.socket.netty.first;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;

@Sharable
public class Server4HelloWorldHandler extends ChannelHandlerAdapter {
    
    /**
     * 业务处理逻辑
     * 用于处理读取数据请求的逻辑。
     * ctx - 上下文对象。其中包含于客户端建立连接的所有资源。 如: 对应的Channel
     * msg - 读取到的数据。 默认类型是ByteBuf,是Netty自定义的。是对ByteBuffer的封装。 不需要考虑复位问题。
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取读取的数据, 是一个缓冲。
        ByteBuf readBuffer = (ByteBuf) msg;
        // 创建一个字节数组,用于保存缓存中的数据。
        byte[] tempDatas = new byte[readBuffer.readableBytes()];
        // 将缓存中的数据读取到字节数组中。
        readBuffer.readBytes(tempDatas);
        String message = new String(tempDatas, "UTF-8");
        System.out.println("from client : " + message);
        if("exit".equals(message)){
            ctx.close();
            return;
        }
        String line = "server message to client!";
        // 写操作自动释放缓存,避免内存溢出问题。
        ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
        // 注意,如果调用的是write方法。不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行。
        // ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
        // ctx.flush();
    }

    /**
     * 异常处理逻辑, 当客户端异常退出的时候,也会运行。
     * ChannelHandlerContext关闭,也代表当前与客户端连接的资源关闭。
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

}

/**
 * 1. 单线程组
 * 2. Bootstrap配置启动信息
 * 3. 注册业务处理Handler
 * 4. connect连接服务,并发起请求
 */
package com.bjsxt.socket.netty.first;

import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * 因为客户端是请求的发起者,不需要监听。
 * 只需要定义唯一的一个线程组即可。
 */
public class Client4HelloWorld {
    
    // 处理请求和处理服务端响应的线程组
    private EventLoopGroup group = null;
    // 客户端启动相关配置信息
    private Bootstrap bootstrap = null;
    
    public Client4HelloWorld(){
        init();
    }
    
    private void init(){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        // 绑定线程组
        bootstrap.group(group);
        // 设定通讯模式为NIO
        bootstrap.channel(NioSocketChannel.class);
    }
    
    public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
        /*
         * 客户端的Bootstrap没有childHandler方法。只有handler方法。
         * 方法含义等同ServerBootstrap中的childHandler
         * 在客户端必须绑定处理器,也就是必须调用handler方法。
         * 服务器必须绑定处理器,必须调用childHandler方法。
         */
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(handlers);
            }
        });
        // 建立连接。
        ChannelFuture future = this.bootstrap.connect(host, port).sync();
        return future;
    }
    
    public void release(){
        this.group.shutdownGracefully();
    }
    
    public static void main(String[] args) {
        Client4HelloWorld client = null;
        ChannelFuture future = null;
        try{
            client = new Client4HelloWorld();
            future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());
            
            Scanner s = null;
            while(true){
                s = new Scanner(System.in);
                System.out.print("enter message send to server (enter 'exit' for close client) > ");
                String line = s.nextLine();
                if("exit".equals(line)){
                    // addListener - 增加监听,当某条件满足的时候,触发监听器。
                    // ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接。
                    future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
                        .addListener(ChannelFutureListener.CLOSE);
                    break;
                }
                future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
                TimeUnit.SECONDS.sleep(1);
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(null != future){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(null != client){
                client.release();
            }
        }
    }
    
}

package com.bjsxt.socket.netty.first;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class Client4HelloWorldHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            ByteBuf readBuffer = (ByteBuf) msg;
            byte[] tempDatas = new byte[readBuffer.readableBytes()];
            readBuffer.readBytes(tempDatas);
            System.out.println("from server : " + new String(tempDatas, "UTF-8"));
        }finally{
            // 用于释放缓存。避免内存溢出
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exceptionCaught method run...");
        // cause.printStackTrace();
        ctx.close();
    }

    /*@Override // 断开连接时执行
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive method run...");
    }

    @Override // 连接通道建立成功时执行
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive method run...");
    }

    @Override // 每次读取完成时执行
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete method run...");
    }*/

}

相关文章

网友评论

      本文标题:Netty 入门案例

      本文链接:https://www.haomeiwen.com/subject/fqxvoftx.html