美文网首页
I/O-利用netty实现简单的RPC

I/O-利用netty实现简单的RPC

作者: 麦大大吃不胖 | 来源:发表于2020-12-01 10:17 被阅读0次

by shihang.mai

1. 第一版

完整代码:

public class MyRPCTest {

    @Test
    public void statServer(){
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = boss;

        ServerBootstrap sbs = new ServerBootstrap();
        ChannelFuture bind = sbs.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        System.out.println("server accept client port:"+ch.remoteAddress().getPort());
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new ServerRequestHandler());
                    }
                }).bind(new InetSocketAddress(9090));
        try {
            bind.sync().channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //模拟consumer
    @Test
    public void get(){
        new Thread(()->{
                statServer();
        }).start();
        System.out.println("server started...");
        AtomicInteger num = new AtomicInteger(0);
        int size = 20;
        Thread[] threads = new Thread[size];
        for (int i = 0; i < size; i++) {
            threads[i]= new Thread(()->{
                Car car = proxyGet(Car.class);
                car.ooxx("hello"+num.incrementAndGet());
            });
        }
        for (Thread thread:threads) {
            thread.start();
        }

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static <T>T proxyGet(Class<T> interfaceInfo){
        //实现各个版本的动态代理
        ClassLoader loader = interfaceInfo.getClassLoader();
        Class<?>[] methodInfo = {interfaceInfo};

        return (T)Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                String name = interfaceInfo.getName();
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
                MyContent content = new MyContent();
                content.setArgs(args);
                content.setName(name);
                content.setMethodName(methodName);
                content.setParameterTypes(parameterTypes);


                ByteArrayOutputStream out = new ByteArrayOutputStream();
                ObjectOutputStream oout = new ObjectOutputStream(out);
                oout.writeObject(content);
                byte[] msgBody = out.toByteArray();
                Myheader header=createHeader(msgBody);
                out.reset();
                oout=new ObjectOutputStream(out);
                oout.writeObject(header);
                byte[] msgHeader = out.toByteArray();


                ClientFactory factory = ClientFactory.getFactory();
                NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress(9090));

                ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);

                long id = header.getRequestID();
                CountDownLatch countDownLatch = new CountDownLatch(1);
                ResponseHander.addCallBack(id, new Runnable() {
                    @Override
                    public void run() {
                        countDownLatch.countDown();
                    }
                });
                byteBuf.writeBytes(msgHeader);
                byteBuf.writeBytes(msgBody);
                ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
                channelFuture.sync();


                countDownLatch.await();





                return null;
            }


        });
    }

    public static   Myheader createHeader(byte[] msg) {
        Myheader header = new Myheader();
        int size = msg.length;
        int f = 0x14141414;
        //0x14 0001 0100
        long requestID = Math.abs(UUID.randomUUID().getLeastSignificantBits());
        header.setFlag(f);
        header.setDataLen(size);
        header.setRequestID(requestID);
        return header;
    }

}



class ServerRequestHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        ByteBuf sendBuf = buf.copy();
        System.out.println("channel start:"+buf.readableBytes());
        if(buf.readableBytes()>=107){
            byte[] bytes = new byte[107];
            buf.getBytes(buf.readerIndex(),bytes);
            ByteArrayInputStream in = new ByteArrayInputStream(bytes);
            ObjectInputStream oin = new ObjectInputStream(in);
            Myheader header = (Myheader) oin.readObject();
            System.out.println("server response id:"+header.getRequestID());

            if(buf.readableBytes()>=header.getDataLen()){
                byte [] data = new byte[(int)header.getDataLen()];
                buf.readBytes(data);
                ByteArrayInputStream din = new ByteArrayInputStream(data);
                ObjectInputStream doin = new ObjectInputStream(din);
                MyContent content= (MyContent)doin.readObject();
                System.out.println(content.getName());
            }else{
                System.out.println("channel else:"+buf.readableBytes());

            }

        }

        ChannelFuture channelFuture = ctx.writeAndFlush(sendBuf);
        channelFuture.sync();
    }
}

class ResponseHander{
    static ConcurrentHashMap<Long,Runnable> mapping = new ConcurrentHashMap<>();

    public static void addCallBack(long requestID,Runnable cb){
        mapping.putIfAbsent(requestID,cb);
    }

    public static void runCallBack(long requestID){
        Runnable runnable = mapping.get(requestID);
        runnable.run();
        removeCB(requestID);
    }

    public static void removeCB(long requestID){
        mapping.remove(requestID);
    }


}

interface Car{
    public void ooxx(String msg);

}

interface Fly{
     void xxoo(String msg);
}

class MyContent implements Serializable{
    String name;
    String methodName;
    Class<?>[] parameterTypes ;
    Object[] args;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }
}

class Myheader implements Serializable{
    //32bit可以设置很多信息
    int flag;
    long requestID;
    long dataLen;

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    public long getRequestID() {
        return requestID;
    }

    public void setRequestID(long requestID) {
        this.requestID = requestID;
    }

    public long getDataLen() {
        return dataLen;
    }

    public void setDataLen(long dataLen) {
        this.dataLen = dataLen;
    }
}

class ClientFactory{
    private ClientFactory(){}

    int poolSize = 1;

    NioEventLoopGroup clientWorker;

    Random rand = new Random();

    private static final ClientFactory factory;

    static{
        factory = new ClientFactory();
    }

    public static ClientFactory getFactory(){
        return factory;
    }
    ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();

    public synchronized NioSocketChannel getClient(InetSocketAddress address){
        ClientPool clientPool = outboxs.get(address);
        if(null==clientPool){
            outboxs.putIfAbsent(address,new ClientPool(poolSize));
            clientPool=outboxs.get(address);
        }

        int i = rand.nextInt(poolSize);
        if(null!=clientPool.Clients[i]&&clientPool.Clients[i].isActive()){
            return clientPool.Clients[I];
        }

        synchronized (clientPool.lock[I]){
            return clientPool.Clients[i] = create(address);
        }


    }

    private NioSocketChannel create(InetSocketAddress address){
         clientWorker = new NioEventLoopGroup(1);
        Bootstrap bs = new Bootstrap();
        ChannelFuture connect = bs.group(clientWorker)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new ClientResponses());
                    }
                }).connect(address);
        try {
            NioSocketChannel client =(NioSocketChannel) connect.sync().channel();
            return client;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;

    }

}

class ClientResponses extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        if(buf.readableBytes()>=107){
            byte[] bytes = new byte[107];
            buf.readBytes(bytes);
            ByteArrayInputStream in = new ByteArrayInputStream(bytes);
            ObjectInputStream oin = new ObjectInputStream(in);
            Myheader header = (Myheader) oin.readObject();
            System.out.println("client response id:"+header.getRequestID());

            ResponseHander.runCallBack(header.requestID);


        }
    }
}

class ClientPool{
    NioSocketChannel [] Clients;
    Object [] lock;
    ClientPool(int size){
        Clients = new NioSocketChannel[size];
        lock = new Object[size];
        for (int i = 0; i < size; i++) {
            lock[i] = new Object();
        }
    }
}
Netty RPC
  1. 主线程调用远程接口,走动态代理,invoke()。

  2. 在invoke()里,组装ByteBuf(header+content),利用Client(NioSocketChannel)发送组装好ByteBuf到Server端

  3. 而Client是通过ClientFactory调用getClient(address)从连接池中获得

  4. 每一个NioSocketChannel注册了事件ClientResponse,这个事件会调用

    ResponseHander.runCallBack(header.requestID)
    
  5. 在Server端注册ServerRequestHandler,当接收到信息后,往Client返回ByteBuf

  6. 在主逻辑中,为了发出请求后,主逻辑等待返回结果后,再往下执行。创建CountDownLatch,并加入回调逻辑,主线程await阻塞

    CountDownLatch countDownLatch = new CountDownLatch(1);
    ResponseHander.addCallBack(id, new Runnable() {
      @Override
      public void run() {
        countDownLatch.countDown();
      }
    });
    
  7. 当Client发送消息到Server端,Server端收到消息,执行ServerRequestHandler逻辑,往客户端发送消息,客户端收到消息,那么调用countDownLatch.countDown();主线程恢复

第一版bug
Netty RPC第一版bug
  1. 服务端的一次事件channelRead,可能接收到多个消息,导致一次的ByteBuf有多个header+content组合
  1. 服务端的一次事件channelRead,接收到的ByteBuf,里面含多个header+content组合,但是很可能最后一个header+content不是完整的组合,content不完整,在下一次的事件channelRead接收到的ByteBuf开头有上一次content的信息

2. 第二版

具体看步骤1 2 3 4

  1. bug1

    解决方法:循环读取

    ServerRequestHandler

    //1.
    if(buf.readableBytes()>=107)
    //2.
    buf.readBytes(bytes);
    

    ->

    //1.
    while(buf.readableBytes()>=107)
    //2.
    buf.getBytes(buf.readerIndex(),bytes);
    //2.1读取content前
    buf.readBytes(107);
    
  2. Bug2

    解决方法:

    • 将本次不能处理的整个header+content拼接下一次的ByteBuf(ServerDecode extends ByteToMessageDecoder自带)
    • 增加解码ServerDecode类extends ByteToMessageDecoder,将ServerRequestHandler逻辑转移到该类,并在服务端的pipeline中增加该类

完整代码:

MyRPCTest

public class MyRPCTest {

    @Test
    public void statServer(){
        NioEventLoopGroup boss = new NioEventLoopGroup(50);
        NioEventLoopGroup worker = boss;

        ServerBootstrap sbs = new ServerBootstrap();
        ChannelFuture bind = sbs.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        System.out.println("server accept client port:"+ch.remoteAddress().getPort());
                        ChannelPipeline p = ch.pipeline();
                        //4. 增加一个handler
                        p.addLast(new ServerDecode());
                        p.addLast(new ServerRequestHandler());
                    }
                }).bind(new InetSocketAddress(9090));
        try {
            bind.sync().channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //模拟consumer
    @Test
    public void get(){
        new Thread(()->{
                statServer();
        }).start();
        System.out.println("server started...");
        AtomicInteger num = new AtomicInteger(0);
        int size = 50;
        Thread[] threads = new Thread[size];
        for (int i = 0; i < size; i++) {
            threads[i]= new Thread(()->{
                Car car = proxyGet(Car.class);
                String arg = "hello" + num.incrementAndGet();
                String res = car.ooxx(arg);
                System.out.println("client over msg:"+res+" src arg:"+arg);
            });
        }
        for (Thread thread:threads) {
            thread.start();
        }

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static <T>T proxyGet(Class<T> interfaceInfo){
        //实现各个版本的动态代理
        ClassLoader loader = interfaceInfo.getClassLoader();
        Class<?>[] methodInfo = {interfaceInfo};

        return (T)Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                String name = interfaceInfo.getName();
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
                MyContent content = new MyContent();
                content.setArgs(args);
                content.setName(name);
                content.setMethodName(methodName);
                content.setParameterTypes(parameterTypes);


                ByteArrayOutputStream out = new ByteArrayOutputStream();
                ObjectOutputStream oout = new ObjectOutputStream(out);
                oout.writeObject(content);
                byte[] msgBody = out.toByteArray();
                Myheader header=createHeader(msgBody);
                out.reset();
                oout=new ObjectOutputStream(out);
                oout.writeObject(header);
                byte[] msgHeader = out.toByteArray();


                ClientFactory factory = ClientFactory.getFactory();
                NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress(9090));

                ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);

                long id = header.getRequestID();
                CompletableFuture<String> res = new CompletableFuture();
                ResponseMappingCallback.addCallBack(id, res);
                byteBuf.writeBytes(msgHeader);
                byteBuf.writeBytes(msgBody);
                ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
                channelFuture.sync();
                //阻塞
                return res.get();
            }


        });
    }

    public static   Myheader createHeader(byte[] msg) {
        Myheader header = new Myheader();
        int size = msg.length;
        int f = 0x14141414;
        //0x14 0001 0100
        long requestID = Math.abs(UUID.randomUUID().getLeastSignificantBits());
        header.setFlag(f);
        header.setDataLen(size);
        header.setRequestID(requestID);
        return header;
    }

}

//增加ServerDecode
class ServerDecode extends ByteToMessageDecoder{

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) throws Exception {
        //1. 从if->while 因为可能一次事件接收多个包
        while(buf.readableBytes()>=108){
            byte[] bytes = new byte[108];
            //2. 从readBytes->getBytes 保证断开的包,把header+body放到下一次buf处理
            buf.getBytes(buf.readerIndex(),bytes);
            ByteArrayInputStream in = new ByteArrayInputStream(bytes);
            ObjectInputStream oin = new ObjectInputStream(in);
            Myheader header = (Myheader) oin.readObject();
            if(buf.readableBytes()>=header.getDataLen()){
                //3. 移动指针到body开始位置 上面只是get并没移动指针
                buf.readBytes(108);
                byte [] data = new byte[(int)header.getDataLen()];
                buf.readBytes(data);
                ByteArrayInputStream din = new ByteArrayInputStream(data);
                ObjectInputStream doin = new ObjectInputStream(din);
                //A. 因现在服务端和客户端都公用一个解码器
                if(header.getFlag()==0x14141414){
                    MyContent content= (MyContent)doin.readObject();
                    out.add(new Packmsg(header,content));
                }else if(header.getFlag()==0x14141424){
                    MyContent content= (MyContent)doin.readObject();
                    out.add(new Packmsg(header,content));
                }
            }else{
                break;
            }

        }
    }
}

class ServerRequestHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packmsg requestPkg = (Packmsg) msg;
        String ioThreadName = Thread.currentThread().getName();
        //1. 当前的eventLoop处理  如果用这种写发io线程和executor的线程是同一个,因为一个连接就一个selector线性处理。
        // ctx.executor().execute(new Runnable() {
        //2. 分到的eventLoopGroup选择一个处理,那么io线程和executor的线程就是不同的线程
        ctx.executor().parent().next().execute(new Runnable() {
            @Override
            public void run() {
                String execThreadName = Thread.currentThread().getName();
                MyContent content = new MyContent();
                String s = "io threadName:" + ioThreadName + " exec threadName:" + execThreadName + " from args:" + requestPkg.content.getArgs()[0];
                //System.out.println(s);
                content.setRes(s);
                byte[] contentByte = SerDerUtil.ser(content);
                Myheader resHeader = new Myheader();
                resHeader.setRequestID(requestPkg.header.getRequestID());
                resHeader.setFlag(0x14141424);
                resHeader.setDataLen(contentByte.length);
                byte[] headerByte = SerDerUtil.ser(resHeader);
                ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(headerByte.length + contentByte.length);
                byteBuf.writeBytes(headerByte);
                byteBuf.writeBytes(contentByte);
                ctx.writeAndFlush(byteBuf);
            }
        });
    }
}

class ResponseMappingCallback {
    static ConcurrentHashMap<Long,CompletableFuture> mapping = new ConcurrentHashMap<>();

    public static void addCallBack(long requestID,CompletableFuture cb){
        mapping.putIfAbsent(requestID,cb);
    }

    public static void runCallBack(Packmsg msg){
        CompletableFuture cf = mapping.get(msg.header.getRequestID());
        cf.complete(msg.content.getRes());
        removeCB(msg.header.getRequestID());
    }

    public static void removeCB(long requestID){
        mapping.remove(requestID);
    }


}

interface Car{
    public String ooxx(String msg);

}

interface Fly{
     void xxoo(String msg);
}

class MyContent implements Serializable{
    String name;
    String methodName;
    Class<?>[] parameterTypes ;
    Object[] args;
    String res;

    public String getRes() {
        return res;
    }

    public void setRes(String res) {
        this.res = res;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }
}

class Myheader implements Serializable{
    //32bit可以设置很多信息
    int flag;
    long requestID;
    long dataLen;

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    public long getRequestID() {
        return requestID;
    }

    public void setRequestID(long requestID) {
        this.requestID = requestID;
    }

    public long getDataLen() {
        return dataLen;
    }

    public void setDataLen(long dataLen) {
        this.dataLen = dataLen;
    }
}

class ClientFactory{
    private ClientFactory(){}

    int poolSize = 10;

    NioEventLoopGroup clientWorker;

    Random rand = new Random();

    private static final ClientFactory factory;

    static{
        factory = new ClientFactory();
    }

    public static ClientFactory getFactory(){
        return factory;
    }
    ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();

    public synchronized NioSocketChannel getClient(InetSocketAddress address){
        ClientPool clientPool = outboxs.get(address);
        if(null==clientPool){
            outboxs.putIfAbsent(address,new ClientPool(poolSize));
            clientPool=outboxs.get(address);
        }

        int i = rand.nextInt(poolSize);
        if(null!=clientPool.Clients[i]&&clientPool.Clients[i].isActive()){
            return clientPool.Clients[I];
        }

        synchronized (clientPool.lock[I]){
            return clientPool.Clients[i] = create(address);
        }


    }

    private NioSocketChannel create(InetSocketAddress address){
         clientWorker = new NioEventLoopGroup(1);
        Bootstrap bs = new Bootstrap();
        ChannelFuture connect = bs.group(clientWorker)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new ServerDecode());
                        p.addLast(new ClientResponses());
                    }
                }).connect(address);
        try {
            NioSocketChannel client =(NioSocketChannel) connect.sync().channel();
            return client;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;

    }

}

class ClientResponses extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Packmsg responsepkg = (Packmsg) msg;
        ResponseMappingCallback.runCallBack(responsepkg);
    }
}

class ClientPool{
    NioSocketChannel [] Clients;
    Object [] lock;
    ClientPool(int size){
        Clients = new NioSocketChannel[size];
        lock = new Object[size];
        for (int i = 0; i < size; i++) {
            lock[i] = new Object();
        }
    }
}

Packmsg

public class Packmsg {
    Myheader header;
    MyContent content;

    public Myheader getHeader() {
        return header;
    }

    public void setHeader(Myheader header) {
        this.header = header;
    }

    public MyContent getContent() {
        return content;
    }

    public void setContent(MyContent content) {
        this.content = content;
    }

    public Packmsg(Myheader header, MyContent content) {
        this.header = header;
        this.content = content;
    }
}

SerDerUtil

public class SerDerUtil {
    static ByteArrayOutputStream out = new ByteArrayOutputStream();

    public synchronized static byte[] ser(Object msg){
        out.reset();
        ObjectOutputStream oout = null;
        byte[] msgBody = null;
        try {
            oout = new ObjectOutputStream(out);
            oout.writeObject(msg);
            msgBody = out.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return msgBody;
    }
}
Netty RPC第二版
  1. 主线程调用远程接口,走动态代理,invoke()。

  2. 在invoke()里,组装ByteBuf(header+content),利用Client(NioSocketChannel)发送组装好ByteBuf到Server端

  3. 而Client是通过ClientFactory调用getClient(address)从连接池中获得

  4. 每一个NioSocketChannel注册了事件ServerDecode和ClientResponse,

    • 引入ServerDecode是为了解决Bug2,并且把header+content-->Packmsg
    • ClientResponse这个事件会调用
    Packmsg responsepkg = (Packmsg) msg;
    ResponseMappingCallback.runCallBack(responsepkg);
    
  5. 在Server端注册ServerDecode和ServerRequestHandler,当接收到信息后,往Client返回ByteBuf

    • 引入ServerDecode是为了解决Bug2,并且把header+content-->Packmsg

    • ServerRequestHandler(已截取过代码)

      这里Server接收到数据后,有4种写法:

      • 在当前IO线程直接做业务处理,接收完就走业务处理 --> 处理业务线程 = IO线程

      • 在当前IO线程自行抛出线程new Thread()处理业务 --> 处理业务线程 != IO线程

      • 在当前IO线程用当前EventLoop执行(因为EventLoop也是一个执行器),和上面不同的是,这里是将接收到的信息变为task,然后task由EventLoop后续执行,单线程异步的-->处理业务线程 = IO线程

        class ServerRequestHandler extends ChannelInboundHandlerAdapter{
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                String ioThreadName = Thread.currentThread().getName();
                ctx.executor().execute(new Runnable() {
                    @Override
                    public void run() {
                        String execThreadName = Thread.currentThread().getName();
                        String s = "io threadName:" + ioThreadName + " exec threadName:" + execThreadName + " from args:" + requestPkg.content.getArgs()[0];
        
                    }
                });
            }
        }
        
 - 在当前线程调用EventLoopGroup,选择一个去执行,这样充分将业务均匀分布在各个selector去执行-->处理业务线程 != IO线程
 
   ```java
   class ServerRequestHandler extends ChannelInboundHandlerAdapter{
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
             String ioThreadName = Thread.currentThread().getName();
             ctx.executor().parent().next().execute(new Runnable() {
                 @Override
                 public void run() {
                     String execThreadName = Thread.currentThread().getName();
                     String s = "io threadName:" + ioThreadName + " exec threadName:" + execThreadName + " from args:" + requestPkg.content.getArgs()[0];
     
                 }
             });
         }
     }
   ```
  1. 在主逻辑中,为了发出请求后,主逻辑等待返回结果后,再往下执行。创建CompletableFuture,并加入回调逻辑,主线程get阻塞

    CompletableFuture<String> res = new CompletableFuture();
    ResponseMappingCallback.addCallBack(id, res);
    
  2. 当Client发送消息到Server端,Server端收到消息,执行ServerRequestHandler逻辑,往客户端发送消息,客户端收到消息,那么调用CompletableFuture.complete;主线程get恢复

相关文章

网友评论

      本文标题:I/O-利用netty实现简单的RPC

      本文链接:https://www.haomeiwen.com/subject/ksroiktx.html