美文网首页TCP/IP协议
t-io源码学习笔记

t-io源码学习笔记

作者: MisterZhai | 来源:发表于2017-08-23 20:03 被阅读87次

    前言

    t-io项目git

    http://git.oschina.net/tywo45/t-io

    简介略

    版本1.7.3

    这两天正好学习了下谭总的t-io的TCP相关部分,并且把showcase跑起来了。我是用maven引用的,2.0.0还未发布正式版,但是听谭总说2.0.0变化挺大的,加入了许多功能,先期待一把。

    由于是在millions_chan的文章基础上梳理的源码流程,但是现在的代码和几个月之前还是有所改动的,所以这里也按照原文章的结构,改了些引用代码,作为自己阅读的笔记,特此记录。

    感谢开源!

    Server

    从server端的start函数进去,创建一个channel group ,生成线程池,之后server端的accept操作回调就在channelGroup所拥有的线程池中执行

    AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(serverGroupContext.getGroupExecutor());
    

    然后将AsynchronousServerSocketChannel与channelGroup通过open关联起来

    serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
    

    将这个serverSocketChannel与要监听的IP和端口绑定起来

    serverSocketChannel.bind(listenAddress, 0);
    

    然后通过accept接受客户端的连接,将该连接回调给acceptCompletionHandler处理,acceptCompletionHandler实现了NIO中的接口CompletionHandler中的completed和failed方法,

    serverSocketChannel.accept(this, acceptCompletionHandler);
    

    在其实现方法中判断是否连接完成或者失败,然后继续接受客户端请求

    serverSocketChannel.accept(aioServer, this);
    

    如果服务器没有设置停止,则开始read数据

    if (!aioServer.isWaitingStop()) {
                    ReadCompletionHandler<SessionContext, P, R> readCompletionHandler = channelContext.getReadCompletionHandler();
                    ByteBuffer readByteBuffer = readCompletionHandler.getReadByteBuffer();//ByteBuffer.allocateDirect(channelContext.getGroupContext().getReadBufferSize());
                    readByteBuffer.position(0);
                    readByteBuffer.limit(readByteBuffer.capacity());
                    asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler);
                }
    

    在read中将数据回调给readCompletionHandler处理,ReadCompletionHandler实现了NIO的CompletionHandler接口的completed和failed方法

    completed方法

    @Override
        public void completed(Integer result, ByteBuffer byteBuffer) {
            //      GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
            if (result > 0) {
                if (channelContext.isTraceClient()) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("p_r_buf_len", result);
                    channelContext.traceClient(ChannelAction.RECEIVED_BUF, null, map);
                }
    
                //          ByteBuffer newByteBuffer = ByteBufferUtils.copy(readByteBuffer, 0, readByteBuffer.position());
                DecodeRunnable<SessionContext, P, R> decodeRunnable = channelContext.getDecodeRunnable();
                readByteBuffer.flip();
                decodeRunnable.setNewByteBuffer(readByteBuffer);
                decodeRunnable.run();
                //          decodeRunnable.addMsg(newByteBuffer);
                //          groupContext.getDecodeExecutor().execute(decodeRunnable);
            } else if (result == 0) {
                log.error("{}读到的数据长度为0", channelContext);
            } else if (result < 0) {
                if (result == -1) {
                    Aio.close(channelContext, null, "对方关闭了连接");
                    return;
                } else {
                    Aio.close(channelContext, null, "读数据时返回" + result);
                    return;
                }
            }
    
            if (AioUtils.checkBeforeIO(channelContext)) {
                AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
                readByteBuffer.position(0);
                readByteBuffer.limit(readByteBuffer.capacity());
                asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, this);
            }
        }
    

    其中有个isTraceClient可能是追踪数据信息的,还不太明白...

    readByteBuffer.flip();

    在我们写入数据后,byte buffer有个游标会一直指向写数据的最后一位,来告诉你下次写数据的时候继续往这个位置来写。flip()方法相当于把游标指向最开始,你便可以开始读取数据。

    当接受到值>0时,调用解码线程DecodeRunnable,并调用run方法启动解码,run方法代码较多,先贴上,然后慢慢分析,哈哈哈哈

    @Override
        public void run() {
            ByteBuffer byteBuffer = newByteBuffer;
            if (byteBuffer != null) {
                if (lastByteBuffer != null) {
                    byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
                    lastByteBuffer = null;
                }
            } else {
                return;
            }
    
            try {
                label_2: while (true) {
                    int initPosition = byteBuffer.position();
                    P packet = channelContext.getGroupContext().getAioHandler().decode(byteBuffer, channelContext);
    
                    if (packet == null)// 数据不够,组不了包
                    {
                        //                  if (log.isDebugEnabled())
                        //                  {
                        //                      log.debug("{},数据不够,组不了包", channelContext.toString());
                        //                  }
                        lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, byteBuffer.limit());
                        return;
                    } else //组包成功
                    {
                        channelContext.getStat().setLatestTimeOfReceivedPacket(SystemTimer.currentTimeMillis());
    
                        int afterDecodePosition = byteBuffer.position();
                        int len = afterDecodePosition - initPosition;
    
                        //                  if (len == 0)
                        //                  {
                        //                      String logstr = channelContext + "解码成功, " + packet.logstr() + "," + byteBuffer + " 但是却只消耗了0字节, 这有可能会导致死循环. " + ThreadUtils.stackTrace();
                        //                      log.error(logstr);
                        //                  }
    
                        channelContext.getGroupContext().getGroupStat().getReceivedPacket().incrementAndGet();
                        channelContext.getGroupContext().getGroupStat().getReceivedBytes().addAndGet(len);
    
                        channelContext.getStat().getReceivedPackets().incrementAndGet();
                        channelContext.getStat().getReceivedBytes().addAndGet(len);
    
                        channelContext.traceClient(ChannelAction.RECEIVED, packet, null);
    
                        packet.setByteCount(len);
                        
                        AioListener<SessionContext, P, R> aioListener = channelContext.getGroupContext().getAioListener();
                        try {
                            if (log.isInfoEnabled()) {
                                log.info("{} 收到消息 {}", channelContext, packet.logstr());
                            }
                            aioListener.onAfterReceived(channelContext, packet, len);
                        } catch (Exception e) {
                            log.error(e.toString(), e);
                        }
                        handler(channelContext, packet, len);
    
                        int remainingLength = byteBuffer.limit() - byteBuffer.position();
                        if (remainingLength > 0)//组包后,还剩有数据
                        {
                            if (log.isDebugEnabled()) {
                                log.debug("{},组包后,还剩有数据:{}", channelContext, remainingLength);
                            }
                            continue label_2;
                        } else//组包后,数据刚好用完
                        {
                            lastByteBuffer = null;
                            log.debug("{},组包后,数据刚好用完", channelContext);
                            return;
                        }
                    }
                }
            } catch (AioDecodeException e) {
                log.error(channelContext.toString() + "解码异常", e);
                Aio.close(channelContext, e, "解码异常:" + e.getMessage());
                return;
            }
        }
    
    
    

    如果byteBuffer不为null,则将上次未解码处理完的数据合并起来

    然后下面竟然有个label_2:标号

    调用实现getAioHandler的decode方法,也就是我们写的解码函数

    P packet = channelContext.getGroupContext().getAioHandler().decode(byteBuffer, channelContext);
    

    如果组包成功,将该包返回给该run继续处理,然后回调监听AioListener的onAfterReceived包通知

    然后将该包交由handler方法处理

    handler(channelContext, packet, len);
    

    如果剩余还有数据,跳转到刚才的标号继续while处理

    handler方法

    public static <SessionContext, P extends Packet, R> void handler(ChannelContext<SessionContext, P, R> channelContext, P packet, int byteCount) {
    
            GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
            // 两种模式处理消息与解码在同一个线程中处理和把packet丢到一个队列中,让线程池去处理  默认是在同一线程处理
            PacketHandlerMode packetHandlerMode = groupContext.getPacketHandlerMode();
    
            HandlerRunnable<SessionContext, P, R> handlerRunnable = channelContext.getHandlerRunnable();
            if (packetHandlerMode == PacketHandlerMode.QUEUE) {
    
                handlerRunnable.addMsg(packet);
                groupContext.getTioExecutor().execute(handlerRunnable);
            } else {
            
            // 回调HandlerRunnable方法执行处理包
                handlerRunnable.handler(packet);
            }
        }
    
    

    在启动服务器时设置了处理handle,最终都是实现了AioHandler的handler,encode,decode方法

    static ServerAioHandler<ShowcaseSessionContext, ShowcasePacket, Object> aioHandler = new ShowcaseServerAioHandler();
    

    所以在HandlerRunnable的handler方法中回调设置的groupContext.getAioHandler().handler(packet, channelContext);进行包应用层处理

    还有一个地方不明白在DecodeRunable中handler方法中

    /**
         * 处理消息与解码在同一个线程中处理
         */
        SINGLE_THREAD(1),
    
        /**
         * 把packet丢到一个队列中,让线程池去处理
         */
        QUEUE(2);
        
    点击这两个为什么会跳到get方法
    
    private PacketHandlerMode(int value) {
            this.value = value;
        }
        
    java 枚举(括号赋值)<http://www.cnblogs.com/mimosading/p/3472578.html>
    
    要注意的是:
    
    1. 通过括号赋值,而且必须带有一个参构造器和一个属性跟方法,否则编译出错
    2. 赋值必须都赋值或都不赋值,不能一部分赋值一部分不赋值;如果不赋值则不能写构造器,赋值编译也出错
        
    
    

    Client

    客户端使用了AsynchronousSocketChannel的open方法,同步发送则使用CountDownLatch控制

    /**
         * 
         * @param serverNode
         * @param bindIp
         * @param bindPort
         * @param initClientChannelContext
         * @param timeout 超时时间,单位秒
         * @param isSyn true: 同步, false: 异步
         * @return
         * @throws Exception
         * @author: tanyaowu
         */
        private ClientChannelContext<SessionContext, P, R> connect(Node serverNode, String bindIp, Integer bindPort,
                ClientChannelContext<SessionContext, P, R> initClientChannelContext, Integer timeout, boolean isSyn) throws Exception {
    
            AsynchronousSocketChannel asynchronousSocketChannel = null;
            ClientChannelContext<SessionContext, P, R> channelContext = null;
            boolean isReconnect = initClientChannelContext != null;
            //      ClientAioListener<SessionContext, P, R> clientAioListener = clientGroupContext.getClientAioListener();
    
            long start = SystemTimer.currentTimeMillis();
            asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup);
            long end = SystemTimer.currentTimeMillis();
            long iv = end - start;
            if (iv >= 100) {
                log.error("{}, open 耗时:{} ms", channelContext, iv);
            }
    
            asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
            asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
            asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    
            InetSocketAddress bind = null;
            if (bindPort != null && bindPort > 0) {
                if (StringUtils.isNotBlank(bindIp)) {
                    bind = new InetSocketAddress(bindIp, bindPort);
                } else {
                    bind = new InetSocketAddress(bindPort);
                }
            }
    
            if (bind != null) {
                asynchronousSocketChannel.bind(bind);
            }
    
            channelContext = initClientChannelContext;
    
            start = SystemTimer.currentTimeMillis();
    
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort());
    
            ConnectionCompletionVo<SessionContext, P, R> attachment = new ConnectionCompletionVo<>(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp,
                    bindPort);
    
            if (isSyn) {
                Integer _timeout = timeout;
                if (_timeout == null) {
                    _timeout = 5;
                }
    
                CountDownLatch countDownLatch = new CountDownLatch(1);
                attachment.setCountDownLatch(countDownLatch);
                asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientGroupContext.getConnectionCompletionHandler());
                countDownLatch.await(_timeout, TimeUnit.SECONDS);
                return attachment.getChannelContext();
            } else {
                asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientGroupContext.getConnectionCompletionHandler());
                return null;
            }
        }
    

    处理流程相关的类分析

    DecodeRunable、CloseRunnable实现runable接口,Handler和sendRunable类继承自AbstractQueueRunnable,其又继承自AbstractSynRunnable实现了SynRunnableIntf接口

    public interface SynRunnableIntf extends Runnable {
        public ReadWriteLock runningLock();
    
        public boolean isNeededExecute();
    
        public boolean isCanceled();
    
        public void setCanceled(boolean isCanceled);
    
        public void runTask();
    }
    
    

    AbstractSynRunnable在该接口基础上添加了方法:

    /**
         * @return the executor
         */
        public Executor getExecutor() {
            return executor;
        }
    
        /**
         * @param executor the executor to set
         */
        public void setExecutor(Executor executor) {
            this.executor = executor;
        }
    
    

    并且主要实现了Runnable的run方法:

    @Override
        public final void run() {
            if (isCanceled()) //任务已经被取消
            {
                return;
            }
    
            ReadWriteLock runningLock = runningLock();
            Lock writeLock = runningLock.writeLock();
            boolean trylock = writeLock.tryLock();
            if (!trylock) {
                return;
            }
    
            try {
                runTask();
            } catch (Exception e) {
                log.error(e.toString(), e);
            } finally {
                writeLock.unlock();
                if (isNeededExecute()) {
                    getExecutor().execute(this);
                }
            }
        }
    
    

    在run中,尝试获取runningLock,如果获取失败说明该runneable已经在执行了,可以立即退出。否则就运行runTask,最终根据是否需要继续执行决定要不要再次将该runnable提交到执行线程池中(比如处理完一个packet,发现该连接还有待处理的packet则需要继续处理)。

    QueueRunnableIntf<T>接口已经取消了

    t-io基础类介绍

    在处理输入输出的整个流程中,都需要用到它们。两个基础的类为:

    ChannelContext
    GroupContext
    

    刚开始看到时候并不是很理解这几个类的作用

    ChannelContext类包含:

    1.AsynchronousSocketChannel对象、与该连接有关的
    2.DecodeRunnable、HandlerRunnable、SendRunnable
    3.GroupContext
    4.ReadCompletionHandler和WriteCompletionHandler

    ChannelContext类可以理解代表一个socket连接,封装了:连接对应的AsynchronousSocketChannel对象;解码、处理接收到的数据和发送数据、关闭连接的几个Runnable;该连接所属的GroupContext引用;该连接的ReadCompletionHandler等。

    IDEA中查看class的属性和方法列表,点击上面P可以折叠显示

    在创建ChannelContext的中,并创建几个Runnable,先设置GroupContext,然后将自己加入group的连接管理中groupContext.connections.add(this);

    public void setGroupContext(GroupContext<SessionContext, P, R> groupContext) {
            this.groupContext = groupContext;
    
            if (groupContext != null) {
                decodeRunnable = new DecodeRunnable<>(this);
                //          closeRunnable = new CloseRunnable<>(this, null, null, groupContext.getCloseExecutor());
    
                //          handlerRunnableHighPrior = new HandlerRunnable<>(this, groupContext.getHandlerExecutorHighPrior());
                handlerRunnable = new HandlerRunnable<>(this, groupContext.getTioExecutor());
    
                //          sendRunnableHighPrior = new SendRunnable<>(this, groupContext.getSendExecutorHighPrior());
                sendRunnable = new SendRunnable<>(this, groupContext.getTioExecutor());
    
                groupContext.connections.add(this);
            }
        }
    

    然后设置asynchronousSocketChannel,并将连接存储到对端集合clientNodes中:

    public void setAsynchronousSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel) {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
    
            if (asynchronousSocketChannel != null) {
                try {
                    Node clientNode = createClientNode(asynchronousSocketChannel);
                    setClientNode(clientNode);
                } catch (IOException e) {
                    log.info(e.toString(), e);
                    assignAnUnknownClientNode();
                }
            } else {
                assignAnUnknownClientNode();
            }
        }
    

    GroupContext管理着当前实例的所有连接connections、连接对应的用户users以及用户间的分组关系groups,在创建的时候,创建好两个线程池

    public final ClientNodes<SessionContext, P, R> clientNodes = new ClientNodes<>();
        public final ChannelContextSetWithLock<SessionContext, P, R> connections = new ChannelContextSetWithLock<>();
        public final ChannelContextSetWithLock<SessionContext, P, R> connecteds = new ChannelContextSetWithLock<>();
        public final ChannelContextSetWithLock<SessionContext, P, R> closeds = new ChannelContextSetWithLock<>();
    
        public final Groups<SessionContext, P, R> groups = new Groups<>();
        public final Users<SessionContext, P, R> users = new Users<>();
        public final Ids<SessionContext, P, R> ids = new Ids<>();
    

    管理的两个线程池

    /** The group executor. */
        protected SynThreadPoolExecutor tioExecutor = null;
    
        protected ThreadPoolExecutor groupExecutor = null;
    

    业务逻辑处理

    读取数据

    在读取数据的方法里,将读取到的数据byteBuffer提交到decodeRunnable的数据队列中,而后继续调用read方法读取对端发送来的数据。

    数据组包

    在decodeRunnable中,调用AioHandler的decode方法来获取数据包,然后提交给HandlerRunnable处理。在处理数据时主要考虑了半包和粘包的情况:

    在这个循环中,首先从队列中获取数据,获取到数据后,看看是否存在粘包多出来的数据lastByteBuffer, 如果存在则将两部分数据合并。而后调用AioHandler的decode方法处理数据,如果由于半包导致解包失败,则继续从队列中获取数据,组合起来尝试解包;如果解包成功,则将多余的数据放在lastByteBuffer,并且更新各种统计信息。最后,将组好的数据包通过submit方法传递给HandlerRunnable处理。

    数据处理

    HandlerRunnable从数据队列中获取组好的包,调用handler方法,然后调用AioHandler的hanlder解除处理数据包:

    groupContext.getAioHandler().handler(packet, channelContext);
    
    @Override
        public void runTask() {
            P packet = null;
            while ((packet = msgQueue.poll()) != null) {
                handler(packet);
            }
        }
    

    发送数据

    发送数据使用Aio类的静态方法send将packet添加到SendRunnable的数据队列中,然后将sendRunable提交到线程池中运行,CountDownLatch还不是太理解用法,需要再学习下

    private static <SessionContext, P extends Packet, R> Boolean send(final ChannelContext<SessionContext, P, R> channelContext, final P packet, CountDownLatch countDownLatch,
                PacketSendMode packetSendMode) {
            try {
                if (packet == null) {
                    return false;
                }
                
                if (channelContext == null || channelContext.isClosed() || channelContext.isRemoved()) {
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                    if (channelContext != null) {
                        log.error("{}, isClosed:{}, isRemoved:{}, stack:{} ", channelContext, channelContext.isClosed(), channelContext.isRemoved(), ThreadUtils.stackTrace());
                    }
                    return false;
                }
    
                boolean isSingleBlock = countDownLatch != null && (packetSendMode == PacketSendMode.SINGLE_BLOCK);
    
                SendRunnable<SessionContext, P, R> sendRunnable = channelContext.getSendRunnable();
                PacketWithMeta<P> packetWithMeta = null;
                boolean isAdded = false;
                if (countDownLatch == null) {
                    isAdded = sendRunnable.addMsg(packet);
                } else {
                    packetWithMeta = new PacketWithMeta<>(packet, countDownLatch);
                    isAdded = sendRunnable.addMsg(packetWithMeta);
                }
    
                if (!isAdded) {
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                    return false;
                }
    
                //SynThreadPoolExecutor synThreadPoolExecutor = channelContext.getGroupContext().getGroupExecutor();
                channelContext.getGroupContext().getTioExecutor().execute(sendRunnable);
    
                if (isSingleBlock) {
                    long timeout = 10;
                    try {
                        channelContext.traceBlockPacket(SynPacketAction.BEFORE_WAIT, packet, countDownLatch, null);
                        Boolean awaitFlag = countDownLatch.await(timeout, TimeUnit.SECONDS);
                        channelContext.traceBlockPacket(SynPacketAction.AFTER__WAIT, packet, countDownLatch, null);
                        //log.error("{} after await, packet:{}, countDownLatch:{}", channelContext, packet.logstr(), countDownLatch);
    
                        if (!awaitFlag) {
                            log.error("{} 同步发送超时, timeout:{}s, packet:{}", channelContext, timeout, packet.logstr());
                        }
                    } catch (InterruptedException e) {
                        log.error(e.toString(), e);
                    }
    
                    Boolean isSentSuccess = packetWithMeta.getIsSentSuccess();
                    return isSentSuccess;
                } else {
                    return null;
                }
            } catch (Exception e) {
                log.error(e.toString(), e);
                return null;
            } finally {
                //          if (isSingleBlock)
                //          {
                //              org.tio.core.GroupContext.SYN_SEND_SEMAPHORE.release();
                //          }
            }
    
        }
    

    再sendRunable中的runTask中,在线程池中,会尝试一次发送所有等待发送的packet,不过对单次发送的packet设置了一个上限,而后对每个packet编码,汇总到一个ByteBuffer中:

    if (queueSize > 1) {
                ByteBuffer[] byteBuffers = new ByteBuffer[queueSize];
                int allBytebufferCapacity = 0;
    
                int packetCount = 0;
                List<Object> packets = new ArrayList<>(queueSize);
                for (int i = 0; i < queueSize; i++) {
                    if ((obj = msgQueue.poll()) != null) {
                        boolean isPacket = obj instanceof Packet;
                        if (isPacket) {
                            p = (P) obj;
                            packets.add(p);
                        } else {
                            packetWithMeta = (PacketWithMeta<P>) obj;
                            p = packetWithMeta.getPacket();
                            packets.add(packetWithMeta);
                        }
    
                        ByteBuffer byteBuffer = getByteBuffer(p, groupContext, aioHandler);
    
                        channelContext.traceClient(ChannelAction.BEFORE_SEND, p, null);
    
                        allBytebufferCapacity += byteBuffer.limit();
                        packetCount++;
                        byteBuffers[i] = byteBuffer;
                    } else {
                        break;
                    }
                }
    
                ByteBuffer allByteBuffer = ByteBuffer.allocate(allBytebufferCapacity);
                byte[] dest = allByteBuffer.array();
                for (ByteBuffer byteBuffer : byteBuffers) {
                    if (byteBuffer != null) {
                        int length = byteBuffer.limit();
                        int position = allByteBuffer.position();
                        System.arraycopy(byteBuffer.array(), 0, dest, position, length);
                        allByteBuffer.position(position + length);
                    }
                }
                sendByteBuffer(allByteBuffer, packetCount, packets);
            } else {
                if ((obj = msgQueue.poll()) != null) {
                    boolean isPacket = obj instanceof Packet;
                    if (isPacket) {
                        p = (P) obj;
                        sendPacket(p);
                    } else {
                        packetWithMeta = (PacketWithMeta<P>) obj;
                        p = packetWithMeta.getPacket();
                        sendPacket(packetWithMeta);
                    }
                }
            }
    

    最终在sendRunnable的sendByteBuffer中完成数据发送,注意发送前需要获取一个信号量,保证同一时间对一个连接只有一个线程在调用发送动作,并在writeCompleteHandler中记录当前连接发送数据,并更新当前连接活动时间,为keepalive做准备:

    public void sendByteBuffer(ByteBuffer byteBuffer, Integer packetCount, Object packets) {
            if (byteBuffer == null) {
                log.error("{},byteBuffer is null", channelContext);
                return;
            }
    
            if (!AioUtils.checkBeforeIO(channelContext)) {
                return;
            }
    
            byteBuffer.flip();
            AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
            WriteCompletionHandler<SessionContext, P, R> writeCompletionHandler = channelContext.getWriteCompletionHandler();
            try {
                //          long start = SystemTimer.currentTimeMillis();
                writeCompletionHandler.getWriteSemaphore().acquire();
                //          long end = SystemTimer.currentTimeMillis();
                //          long iv = end - start;
                //          if (iv > 100) {
                //              //log.error("{} 等发送锁耗时:{} ms", channelContext, iv);
                //          }
    
            } catch (InterruptedException e) {
                log.error(e.toString(), e);
            }
            
            WriteCompletionVo writeCompletionVo = new WriteCompletionVo(byteBuffer, packets);
            asynchronousSocketChannel.write(byteBuffer, writeCompletionVo, writeCompletionHandler);
    
            channelContext.getStat().setLatestTimeOfSentPacket(SystemTimer.currentTimeMillis());
        }
    

    关闭连接

    再AIO的静态方法里面执行close线程

    1.关闭AsynchronousSocketChannel
    2.判断是否需要重连ReconnConf
    3.更新close计数,发送close监听回调
    4.必须先取消任务再清空队列
    5.是否需要重新连接

    ThreadPoolExecutor closePoolExecutor = channelContext.getGroupContext().getTioExecutor();
                closePoolExecutor.execute(new CloseRunnable<>(channelContext, throwable, remark, isNeedRemove));
    
    @Override
        public void run() {
            try {
                GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
                AioListener<SessionContext, P, R> aioListener = groupContext.getAioListener();
    
                try {
                    AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
                    if (asynchronousSocketChannel != null && asynchronousSocketChannel.isOpen()) {
                        try {
                            asynchronousSocketChannel.close();
                        } catch (Exception e) {
                            log.error(e.toString(), e);
                        }
                    }
                } catch (Throwable e) {
                    log.error(e.toString(), e);
                }
    
                boolean isClientChannelContext = channelContext instanceof ClientChannelContext;
                //          ReconnConf<SessionContext, P, R> reconnConf = channelContext.getGroupContext().getReconnConf();
                boolean isRemove = this.isNeedRemove;
                if (!isRemove) {
                    if (isClientChannelContext) {
                        ClientChannelContext<SessionContext, P, R> clientChannelContext = (ClientChannelContext<SessionContext, P, R>) channelContext;
    
                        if (!ReconnConf.isNeedReconn(clientChannelContext, false)) {
                            isRemove = true;
                        }
                    } else {
                        isRemove = true;
                    }
                }
    
                try {
                    channelContext.getStat().setTimeClosed(SystemTimer.currentTimeMillis());
                    aioListener.onBeforeClose(channelContext, throwable, remark, isRemove);
                } catch (Throwable e) {
                    log.error(e.toString(), e);
                }
    
                ReentrantReadWriteLock reentrantReadWriteLock = channelContext.getCloseLock();//.getLock();
                WriteLock writeLock = reentrantReadWriteLock.writeLock();
                boolean isLock = writeLock.tryLock();
    
                try {
                    if (!isLock) {
                        if (isRemove) {
                            if (channelContext.isRemoved()) {
                                return;
                            } else {
                                writeLock.lock();
                            }
                        } else {
                            return;
                        }
                    }
    
                    channelContext.traceClient(ChannelAction.UNCONNECT, null, null);
    
                    if (channelContext.isClosed() && !isRemove) {
                        log.info("{}已经关闭,备注:{},异常:{}", channelContext, remark, throwable == null ? "无" : throwable.toString());
                        return;
                    }
    
                    if (channelContext.isRemoved()) {
                        log.info("{}已经删除,备注:{},异常:{}", channelContext, remark, throwable == null ? "无" : throwable.toString());
                        return;
                    }
    
                    //必须先取消任务再清空队列
                    //              channelContext.getDecodeRunnable().setCanceled(true);
                    channelContext.getHandlerRunnable().setCanceled(true);
                    //      channelContext.getHandlerRunnableHighPrior().setCanceled(true);
                    channelContext.getSendRunnable().setCanceled(true);
                    //      channelContext.getSendRunnableHighPrior().setCanceled(true);
    
                    channelContext.getDecodeRunnable().clearMsgQueue();
                    channelContext.getHandlerRunnable().clearMsgQueue();
                    //      channelContext.getHandlerRunnableHighPrior().clearMsgQueue();
                    channelContext.getSendRunnable().clearMsgQueue();
                    //      channelContext.getSendRunnableHighPrior().clearMsgQueue();
    
                    log.info("{} 准备关闭连接, isNeedRemove:{}, {}", channelContext, isRemove, remark);
    
                    try {
                        if (isRemove) {
                            MaintainUtils.removeFromMaintain(channelContext);
                        } else {
                            if (!groupContext.isShortConnection()) {
                                groupContext.closeds.add(channelContext);
                                groupContext.connecteds.remove(channelContext);
    
                                if (StringUtils.isNotBlank(channelContext.getUserid())) {
                                    try {
                                        Aio.unbindUser(channelContext);
                                    } catch (Throwable e) {
                                        log.error(e.toString(), e);
                                    }
                                }
    
                                try {
                                    Aio.unbindGroup(channelContext);
                                } catch (Throwable e) {
                                    log.error(e.toString(), e);
                                }
                            }
                        }
    
                        try {
                            channelContext.setClosed(true);
                            channelContext.setRemoved(isRemove);
                            channelContext.getGroupContext().getGroupStat().getClosed().incrementAndGet();
                            channelContext.getStat().setTimeClosed(SystemTimer.currentTimeMillis());
                        } catch (Exception e) {
                            log.error(e.toString(), e);
                        }
    
                        try {
                            aioListener.onAfterClose(channelContext, throwable, remark, isRemove);
                        } catch (Throwable e) {
                            log.error(e.toString(), e);
                        }
                    } catch (Throwable e) {
                        log.error(e.toString(), e);
                    } finally {
                        if (!isRemove && channelContext.isClosed() && (isClientChannelContext)) //不删除且没有连接上,则加到重连队列中
                        {
                            ClientChannelContext<SessionContext, P, R> clientChannelContext = (ClientChannelContext<SessionContext, P, R>) channelContext;
                            ReconnConf.put(clientChannelContext);
                        }
                    }
    
                } catch (Exception e) {
                    log.error(throwable.toString(), e);
                } finally {
                    writeLock.unlock();
                }
            } finally {
                channelContext.setWaitingClose(false);
            }
        }
    

    参考文章

    talent-aio源码阅读小记(一)http://www.jianshu.com/p/522446599d39

    talent-aio源码阅读小记(二)http://www.jianshu.com/p/6e3c4d99e72e

    相关文章

      网友评论

        本文标题:t-io源码学习笔记

        本文链接:https://www.haomeiwen.com/subject/ivejdxtx.html