美文网首页TCP/IP协议
t-io源码学习笔记

t-io源码学习笔记

作者: MisterZhai | 来源:发表于2017-08-23 20:03 被阅读87次

前言

t-io项目git

http://git.oschina.net/tywo45/t-io

简介略

版本1.7.3

这两天正好学习了下谭总的t-io的TCP相关部分,并且把showcase跑起来了。我是用maven引用的,2.0.0还未发布正式版,但是听谭总说2.0.0变化挺大的,加入了许多功能,先期待一把。

由于是在millions_chan的文章基础上梳理的源码流程,但是现在的代码和几个月之前还是有所改动的,所以这里也按照原文章的结构,改了些引用代码,作为自己阅读的笔记,特此记录。

感谢开源!

Server

从server端的start函数进去,创建一个channel group ,生成线程池,之后server端的accept操作回调就在channelGroup所拥有的线程池中执行

AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(serverGroupContext.getGroupExecutor());

然后将AsynchronousServerSocketChannel与channelGroup通过open关联起来

serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);

将这个serverSocketChannel与要监听的IP和端口绑定起来

serverSocketChannel.bind(listenAddress, 0);

然后通过accept接受客户端的连接,将该连接回调给acceptCompletionHandler处理,acceptCompletionHandler实现了NIO中的接口CompletionHandler中的completed和failed方法,

serverSocketChannel.accept(this, acceptCompletionHandler);

在其实现方法中判断是否连接完成或者失败,然后继续接受客户端请求

serverSocketChannel.accept(aioServer, this);

如果服务器没有设置停止,则开始read数据

if (!aioServer.isWaitingStop()) {
                ReadCompletionHandler<SessionContext, P, R> readCompletionHandler = channelContext.getReadCompletionHandler();
                ByteBuffer readByteBuffer = readCompletionHandler.getReadByteBuffer();//ByteBuffer.allocateDirect(channelContext.getGroupContext().getReadBufferSize());
                readByteBuffer.position(0);
                readByteBuffer.limit(readByteBuffer.capacity());
                asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler);
            }

在read中将数据回调给readCompletionHandler处理,ReadCompletionHandler实现了NIO的CompletionHandler接口的completed和failed方法

completed方法

@Override
    public void completed(Integer result, ByteBuffer byteBuffer) {
        //      GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
        if (result > 0) {
            if (channelContext.isTraceClient()) {
                Map<String, Object> map = new HashMap<>();
                map.put("p_r_buf_len", result);
                channelContext.traceClient(ChannelAction.RECEIVED_BUF, null, map);
            }

            //          ByteBuffer newByteBuffer = ByteBufferUtils.copy(readByteBuffer, 0, readByteBuffer.position());
            DecodeRunnable<SessionContext, P, R> decodeRunnable = channelContext.getDecodeRunnable();
            readByteBuffer.flip();
            decodeRunnable.setNewByteBuffer(readByteBuffer);
            decodeRunnable.run();
            //          decodeRunnable.addMsg(newByteBuffer);
            //          groupContext.getDecodeExecutor().execute(decodeRunnable);
        } else if (result == 0) {
            log.error("{}读到的数据长度为0", channelContext);
        } else if (result < 0) {
            if (result == -1) {
                Aio.close(channelContext, null, "对方关闭了连接");
                return;
            } else {
                Aio.close(channelContext, null, "读数据时返回" + result);
                return;
            }
        }

        if (AioUtils.checkBeforeIO(channelContext)) {
            AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
            readByteBuffer.position(0);
            readByteBuffer.limit(readByteBuffer.capacity());
            asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, this);
        }
    }

其中有个isTraceClient可能是追踪数据信息的,还不太明白...

readByteBuffer.flip();

在我们写入数据后,byte buffer有个游标会一直指向写数据的最后一位,来告诉你下次写数据的时候继续往这个位置来写。flip()方法相当于把游标指向最开始,你便可以开始读取数据。

当接受到值>0时,调用解码线程DecodeRunnable,并调用run方法启动解码,run方法代码较多,先贴上,然后慢慢分析,哈哈哈哈

@Override
    public void run() {
        ByteBuffer byteBuffer = newByteBuffer;
        if (byteBuffer != null) {
            if (lastByteBuffer != null) {
                byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
                lastByteBuffer = null;
            }
        } else {
            return;
        }

        try {
            label_2: while (true) {
                int initPosition = byteBuffer.position();
                P packet = channelContext.getGroupContext().getAioHandler().decode(byteBuffer, channelContext);

                if (packet == null)// 数据不够,组不了包
                {
                    //                  if (log.isDebugEnabled())
                    //                  {
                    //                      log.debug("{},数据不够,组不了包", channelContext.toString());
                    //                  }
                    lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, byteBuffer.limit());
                    return;
                } else //组包成功
                {
                    channelContext.getStat().setLatestTimeOfReceivedPacket(SystemTimer.currentTimeMillis());

                    int afterDecodePosition = byteBuffer.position();
                    int len = afterDecodePosition - initPosition;

                    //                  if (len == 0)
                    //                  {
                    //                      String logstr = channelContext + "解码成功, " + packet.logstr() + "," + byteBuffer + " 但是却只消耗了0字节, 这有可能会导致死循环. " + ThreadUtils.stackTrace();
                    //                      log.error(logstr);
                    //                  }

                    channelContext.getGroupContext().getGroupStat().getReceivedPacket().incrementAndGet();
                    channelContext.getGroupContext().getGroupStat().getReceivedBytes().addAndGet(len);

                    channelContext.getStat().getReceivedPackets().incrementAndGet();
                    channelContext.getStat().getReceivedBytes().addAndGet(len);

                    channelContext.traceClient(ChannelAction.RECEIVED, packet, null);

                    packet.setByteCount(len);
                    
                    AioListener<SessionContext, P, R> aioListener = channelContext.getGroupContext().getAioListener();
                    try {
                        if (log.isInfoEnabled()) {
                            log.info("{} 收到消息 {}", channelContext, packet.logstr());
                        }
                        aioListener.onAfterReceived(channelContext, packet, len);
                    } catch (Exception e) {
                        log.error(e.toString(), e);
                    }
                    handler(channelContext, packet, len);

                    int remainingLength = byteBuffer.limit() - byteBuffer.position();
                    if (remainingLength > 0)//组包后,还剩有数据
                    {
                        if (log.isDebugEnabled()) {
                            log.debug("{},组包后,还剩有数据:{}", channelContext, remainingLength);
                        }
                        continue label_2;
                    } else//组包后,数据刚好用完
                    {
                        lastByteBuffer = null;
                        log.debug("{},组包后,数据刚好用完", channelContext);
                        return;
                    }
                }
            }
        } catch (AioDecodeException e) {
            log.error(channelContext.toString() + "解码异常", e);
            Aio.close(channelContext, e, "解码异常:" + e.getMessage());
            return;
        }
    }


如果byteBuffer不为null,则将上次未解码处理完的数据合并起来

然后下面竟然有个label_2:标号

调用实现getAioHandler的decode方法,也就是我们写的解码函数

P packet = channelContext.getGroupContext().getAioHandler().decode(byteBuffer, channelContext);

如果组包成功,将该包返回给该run继续处理,然后回调监听AioListener的onAfterReceived包通知

然后将该包交由handler方法处理

handler(channelContext, packet, len);

如果剩余还有数据,跳转到刚才的标号继续while处理

handler方法

public static <SessionContext, P extends Packet, R> void handler(ChannelContext<SessionContext, P, R> channelContext, P packet, int byteCount) {

        GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
        // 两种模式处理消息与解码在同一个线程中处理和把packet丢到一个队列中,让线程池去处理  默认是在同一线程处理
        PacketHandlerMode packetHandlerMode = groupContext.getPacketHandlerMode();

        HandlerRunnable<SessionContext, P, R> handlerRunnable = channelContext.getHandlerRunnable();
        if (packetHandlerMode == PacketHandlerMode.QUEUE) {

            handlerRunnable.addMsg(packet);
            groupContext.getTioExecutor().execute(handlerRunnable);
        } else {
        
        // 回调HandlerRunnable方法执行处理包
            handlerRunnable.handler(packet);
        }
    }

在启动服务器时设置了处理handle,最终都是实现了AioHandler的handler,encode,decode方法

static ServerAioHandler<ShowcaseSessionContext, ShowcasePacket, Object> aioHandler = new ShowcaseServerAioHandler();

所以在HandlerRunnable的handler方法中回调设置的groupContext.getAioHandler().handler(packet, channelContext);进行包应用层处理

还有一个地方不明白在DecodeRunable中handler方法中

/**
     * 处理消息与解码在同一个线程中处理
     */
    SINGLE_THREAD(1),

    /**
     * 把packet丢到一个队列中,让线程池去处理
     */
    QUEUE(2);
    
点击这两个为什么会跳到get方法

private PacketHandlerMode(int value) {
        this.value = value;
    }
    
java 枚举(括号赋值)<http://www.cnblogs.com/mimosading/p/3472578.html>

要注意的是:

1. 通过括号赋值,而且必须带有一个参构造器和一个属性跟方法,否则编译出错
2. 赋值必须都赋值或都不赋值,不能一部分赋值一部分不赋值;如果不赋值则不能写构造器,赋值编译也出错
    

Client

客户端使用了AsynchronousSocketChannel的open方法,同步发送则使用CountDownLatch控制

/**
     * 
     * @param serverNode
     * @param bindIp
     * @param bindPort
     * @param initClientChannelContext
     * @param timeout 超时时间,单位秒
     * @param isSyn true: 同步, false: 异步
     * @return
     * @throws Exception
     * @author: tanyaowu
     */
    private ClientChannelContext<SessionContext, P, R> connect(Node serverNode, String bindIp, Integer bindPort,
            ClientChannelContext<SessionContext, P, R> initClientChannelContext, Integer timeout, boolean isSyn) throws Exception {

        AsynchronousSocketChannel asynchronousSocketChannel = null;
        ClientChannelContext<SessionContext, P, R> channelContext = null;
        boolean isReconnect = initClientChannelContext != null;
        //      ClientAioListener<SessionContext, P, R> clientAioListener = clientGroupContext.getClientAioListener();

        long start = SystemTimer.currentTimeMillis();
        asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup);
        long end = SystemTimer.currentTimeMillis();
        long iv = end - start;
        if (iv >= 100) {
            log.error("{}, open 耗时:{} ms", channelContext, iv);
        }

        asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);

        InetSocketAddress bind = null;
        if (bindPort != null && bindPort > 0) {
            if (StringUtils.isNotBlank(bindIp)) {
                bind = new InetSocketAddress(bindIp, bindPort);
            } else {
                bind = new InetSocketAddress(bindPort);
            }
        }

        if (bind != null) {
            asynchronousSocketChannel.bind(bind);
        }

        channelContext = initClientChannelContext;

        start = SystemTimer.currentTimeMillis();

        InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort());

        ConnectionCompletionVo<SessionContext, P, R> attachment = new ConnectionCompletionVo<>(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp,
                bindPort);

        if (isSyn) {
            Integer _timeout = timeout;
            if (_timeout == null) {
                _timeout = 5;
            }

            CountDownLatch countDownLatch = new CountDownLatch(1);
            attachment.setCountDownLatch(countDownLatch);
            asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientGroupContext.getConnectionCompletionHandler());
            countDownLatch.await(_timeout, TimeUnit.SECONDS);
            return attachment.getChannelContext();
        } else {
            asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientGroupContext.getConnectionCompletionHandler());
            return null;
        }
    }

处理流程相关的类分析

DecodeRunable、CloseRunnable实现runable接口,Handler和sendRunable类继承自AbstractQueueRunnable,其又继承自AbstractSynRunnable实现了SynRunnableIntf接口

public interface SynRunnableIntf extends Runnable {
    public ReadWriteLock runningLock();

    public boolean isNeededExecute();

    public boolean isCanceled();

    public void setCanceled(boolean isCanceled);

    public void runTask();
}

AbstractSynRunnable在该接口基础上添加了方法:

/**
     * @return the executor
     */
    public Executor getExecutor() {
        return executor;
    }

    /**
     * @param executor the executor to set
     */
    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

并且主要实现了Runnable的run方法:

@Override
    public final void run() {
        if (isCanceled()) //任务已经被取消
        {
            return;
        }

        ReadWriteLock runningLock = runningLock();
        Lock writeLock = runningLock.writeLock();
        boolean trylock = writeLock.tryLock();
        if (!trylock) {
            return;
        }

        try {
            runTask();
        } catch (Exception e) {
            log.error(e.toString(), e);
        } finally {
            writeLock.unlock();
            if (isNeededExecute()) {
                getExecutor().execute(this);
            }
        }
    }

在run中,尝试获取runningLock,如果获取失败说明该runneable已经在执行了,可以立即退出。否则就运行runTask,最终根据是否需要继续执行决定要不要再次将该runnable提交到执行线程池中(比如处理完一个packet,发现该连接还有待处理的packet则需要继续处理)。

QueueRunnableIntf<T>接口已经取消了

t-io基础类介绍

在处理输入输出的整个流程中,都需要用到它们。两个基础的类为:

ChannelContext
GroupContext

刚开始看到时候并不是很理解这几个类的作用

ChannelContext类包含:

1.AsynchronousSocketChannel对象、与该连接有关的
2.DecodeRunnable、HandlerRunnable、SendRunnable
3.GroupContext
4.ReadCompletionHandler和WriteCompletionHandler

ChannelContext类可以理解代表一个socket连接,封装了:连接对应的AsynchronousSocketChannel对象;解码、处理接收到的数据和发送数据、关闭连接的几个Runnable;该连接所属的GroupContext引用;该连接的ReadCompletionHandler等。

IDEA中查看class的属性和方法列表,点击上面P可以折叠显示

在创建ChannelContext的中,并创建几个Runnable,先设置GroupContext,然后将自己加入group的连接管理中groupContext.connections.add(this);

public void setGroupContext(GroupContext<SessionContext, P, R> groupContext) {
        this.groupContext = groupContext;

        if (groupContext != null) {
            decodeRunnable = new DecodeRunnable<>(this);
            //          closeRunnable = new CloseRunnable<>(this, null, null, groupContext.getCloseExecutor());

            //          handlerRunnableHighPrior = new HandlerRunnable<>(this, groupContext.getHandlerExecutorHighPrior());
            handlerRunnable = new HandlerRunnable<>(this, groupContext.getTioExecutor());

            //          sendRunnableHighPrior = new SendRunnable<>(this, groupContext.getSendExecutorHighPrior());
            sendRunnable = new SendRunnable<>(this, groupContext.getTioExecutor());

            groupContext.connections.add(this);
        }
    }

然后设置asynchronousSocketChannel,并将连接存储到对端集合clientNodes中:

public void setAsynchronousSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel) {
        this.asynchronousSocketChannel = asynchronousSocketChannel;

        if (asynchronousSocketChannel != null) {
            try {
                Node clientNode = createClientNode(asynchronousSocketChannel);
                setClientNode(clientNode);
            } catch (IOException e) {
                log.info(e.toString(), e);
                assignAnUnknownClientNode();
            }
        } else {
            assignAnUnknownClientNode();
        }
    }

GroupContext管理着当前实例的所有连接connections、连接对应的用户users以及用户间的分组关系groups,在创建的时候,创建好两个线程池

public final ClientNodes<SessionContext, P, R> clientNodes = new ClientNodes<>();
    public final ChannelContextSetWithLock<SessionContext, P, R> connections = new ChannelContextSetWithLock<>();
    public final ChannelContextSetWithLock<SessionContext, P, R> connecteds = new ChannelContextSetWithLock<>();
    public final ChannelContextSetWithLock<SessionContext, P, R> closeds = new ChannelContextSetWithLock<>();

    public final Groups<SessionContext, P, R> groups = new Groups<>();
    public final Users<SessionContext, P, R> users = new Users<>();
    public final Ids<SessionContext, P, R> ids = new Ids<>();

管理的两个线程池

/** The group executor. */
    protected SynThreadPoolExecutor tioExecutor = null;

    protected ThreadPoolExecutor groupExecutor = null;

业务逻辑处理

读取数据

在读取数据的方法里,将读取到的数据byteBuffer提交到decodeRunnable的数据队列中,而后继续调用read方法读取对端发送来的数据。

数据组包

在decodeRunnable中,调用AioHandler的decode方法来获取数据包,然后提交给HandlerRunnable处理。在处理数据时主要考虑了半包和粘包的情况:

在这个循环中,首先从队列中获取数据,获取到数据后,看看是否存在粘包多出来的数据lastByteBuffer, 如果存在则将两部分数据合并。而后调用AioHandler的decode方法处理数据,如果由于半包导致解包失败,则继续从队列中获取数据,组合起来尝试解包;如果解包成功,则将多余的数据放在lastByteBuffer,并且更新各种统计信息。最后,将组好的数据包通过submit方法传递给HandlerRunnable处理。

数据处理

HandlerRunnable从数据队列中获取组好的包,调用handler方法,然后调用AioHandler的hanlder解除处理数据包:

groupContext.getAioHandler().handler(packet, channelContext);
@Override
    public void runTask() {
        P packet = null;
        while ((packet = msgQueue.poll()) != null) {
            handler(packet);
        }
    }

发送数据

发送数据使用Aio类的静态方法send将packet添加到SendRunnable的数据队列中,然后将sendRunable提交到线程池中运行,CountDownLatch还不是太理解用法,需要再学习下

private static <SessionContext, P extends Packet, R> Boolean send(final ChannelContext<SessionContext, P, R> channelContext, final P packet, CountDownLatch countDownLatch,
            PacketSendMode packetSendMode) {
        try {
            if (packet == null) {
                return false;
            }
            
            if (channelContext == null || channelContext.isClosed() || channelContext.isRemoved()) {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
                if (channelContext != null) {
                    log.error("{}, isClosed:{}, isRemoved:{}, stack:{} ", channelContext, channelContext.isClosed(), channelContext.isRemoved(), ThreadUtils.stackTrace());
                }
                return false;
            }

            boolean isSingleBlock = countDownLatch != null && (packetSendMode == PacketSendMode.SINGLE_BLOCK);

            SendRunnable<SessionContext, P, R> sendRunnable = channelContext.getSendRunnable();
            PacketWithMeta<P> packetWithMeta = null;
            boolean isAdded = false;
            if (countDownLatch == null) {
                isAdded = sendRunnable.addMsg(packet);
            } else {
                packetWithMeta = new PacketWithMeta<>(packet, countDownLatch);
                isAdded = sendRunnable.addMsg(packetWithMeta);
            }

            if (!isAdded) {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
                return false;
            }

            //SynThreadPoolExecutor synThreadPoolExecutor = channelContext.getGroupContext().getGroupExecutor();
            channelContext.getGroupContext().getTioExecutor().execute(sendRunnable);

            if (isSingleBlock) {
                long timeout = 10;
                try {
                    channelContext.traceBlockPacket(SynPacketAction.BEFORE_WAIT, packet, countDownLatch, null);
                    Boolean awaitFlag = countDownLatch.await(timeout, TimeUnit.SECONDS);
                    channelContext.traceBlockPacket(SynPacketAction.AFTER__WAIT, packet, countDownLatch, null);
                    //log.error("{} after await, packet:{}, countDownLatch:{}", channelContext, packet.logstr(), countDownLatch);

                    if (!awaitFlag) {
                        log.error("{} 同步发送超时, timeout:{}s, packet:{}", channelContext, timeout, packet.logstr());
                    }
                } catch (InterruptedException e) {
                    log.error(e.toString(), e);
                }

                Boolean isSentSuccess = packetWithMeta.getIsSentSuccess();
                return isSentSuccess;
            } else {
                return null;
            }
        } catch (Exception e) {
            log.error(e.toString(), e);
            return null;
        } finally {
            //          if (isSingleBlock)
            //          {
            //              org.tio.core.GroupContext.SYN_SEND_SEMAPHORE.release();
            //          }
        }

    }

再sendRunable中的runTask中,在线程池中,会尝试一次发送所有等待发送的packet,不过对单次发送的packet设置了一个上限,而后对每个packet编码,汇总到一个ByteBuffer中:

if (queueSize > 1) {
            ByteBuffer[] byteBuffers = new ByteBuffer[queueSize];
            int allBytebufferCapacity = 0;

            int packetCount = 0;
            List<Object> packets = new ArrayList<>(queueSize);
            for (int i = 0; i < queueSize; i++) {
                if ((obj = msgQueue.poll()) != null) {
                    boolean isPacket = obj instanceof Packet;
                    if (isPacket) {
                        p = (P) obj;
                        packets.add(p);
                    } else {
                        packetWithMeta = (PacketWithMeta<P>) obj;
                        p = packetWithMeta.getPacket();
                        packets.add(packetWithMeta);
                    }

                    ByteBuffer byteBuffer = getByteBuffer(p, groupContext, aioHandler);

                    channelContext.traceClient(ChannelAction.BEFORE_SEND, p, null);

                    allBytebufferCapacity += byteBuffer.limit();
                    packetCount++;
                    byteBuffers[i] = byteBuffer;
                } else {
                    break;
                }
            }

            ByteBuffer allByteBuffer = ByteBuffer.allocate(allBytebufferCapacity);
            byte[] dest = allByteBuffer.array();
            for (ByteBuffer byteBuffer : byteBuffers) {
                if (byteBuffer != null) {
                    int length = byteBuffer.limit();
                    int position = allByteBuffer.position();
                    System.arraycopy(byteBuffer.array(), 0, dest, position, length);
                    allByteBuffer.position(position + length);
                }
            }
            sendByteBuffer(allByteBuffer, packetCount, packets);
        } else {
            if ((obj = msgQueue.poll()) != null) {
                boolean isPacket = obj instanceof Packet;
                if (isPacket) {
                    p = (P) obj;
                    sendPacket(p);
                } else {
                    packetWithMeta = (PacketWithMeta<P>) obj;
                    p = packetWithMeta.getPacket();
                    sendPacket(packetWithMeta);
                }
            }
        }

最终在sendRunnable的sendByteBuffer中完成数据发送,注意发送前需要获取一个信号量,保证同一时间对一个连接只有一个线程在调用发送动作,并在writeCompleteHandler中记录当前连接发送数据,并更新当前连接活动时间,为keepalive做准备:

public void sendByteBuffer(ByteBuffer byteBuffer, Integer packetCount, Object packets) {
        if (byteBuffer == null) {
            log.error("{},byteBuffer is null", channelContext);
            return;
        }

        if (!AioUtils.checkBeforeIO(channelContext)) {
            return;
        }

        byteBuffer.flip();
        AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
        WriteCompletionHandler<SessionContext, P, R> writeCompletionHandler = channelContext.getWriteCompletionHandler();
        try {
            //          long start = SystemTimer.currentTimeMillis();
            writeCompletionHandler.getWriteSemaphore().acquire();
            //          long end = SystemTimer.currentTimeMillis();
            //          long iv = end - start;
            //          if (iv > 100) {
            //              //log.error("{} 等发送锁耗时:{} ms", channelContext, iv);
            //          }

        } catch (InterruptedException e) {
            log.error(e.toString(), e);
        }
        
        WriteCompletionVo writeCompletionVo = new WriteCompletionVo(byteBuffer, packets);
        asynchronousSocketChannel.write(byteBuffer, writeCompletionVo, writeCompletionHandler);

        channelContext.getStat().setLatestTimeOfSentPacket(SystemTimer.currentTimeMillis());
    }

关闭连接

再AIO的静态方法里面执行close线程

1.关闭AsynchronousSocketChannel
2.判断是否需要重连ReconnConf
3.更新close计数,发送close监听回调
4.必须先取消任务再清空队列
5.是否需要重新连接

ThreadPoolExecutor closePoolExecutor = channelContext.getGroupContext().getTioExecutor();
            closePoolExecutor.execute(new CloseRunnable<>(channelContext, throwable, remark, isNeedRemove));
@Override
    public void run() {
        try {
            GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
            AioListener<SessionContext, P, R> aioListener = groupContext.getAioListener();

            try {
                AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
                if (asynchronousSocketChannel != null && asynchronousSocketChannel.isOpen()) {
                    try {
                        asynchronousSocketChannel.close();
                    } catch (Exception e) {
                        log.error(e.toString(), e);
                    }
                }
            } catch (Throwable e) {
                log.error(e.toString(), e);
            }

            boolean isClientChannelContext = channelContext instanceof ClientChannelContext;
            //          ReconnConf<SessionContext, P, R> reconnConf = channelContext.getGroupContext().getReconnConf();
            boolean isRemove = this.isNeedRemove;
            if (!isRemove) {
                if (isClientChannelContext) {
                    ClientChannelContext<SessionContext, P, R> clientChannelContext = (ClientChannelContext<SessionContext, P, R>) channelContext;

                    if (!ReconnConf.isNeedReconn(clientChannelContext, false)) {
                        isRemove = true;
                    }
                } else {
                    isRemove = true;
                }
            }

            try {
                channelContext.getStat().setTimeClosed(SystemTimer.currentTimeMillis());
                aioListener.onBeforeClose(channelContext, throwable, remark, isRemove);
            } catch (Throwable e) {
                log.error(e.toString(), e);
            }

            ReentrantReadWriteLock reentrantReadWriteLock = channelContext.getCloseLock();//.getLock();
            WriteLock writeLock = reentrantReadWriteLock.writeLock();
            boolean isLock = writeLock.tryLock();

            try {
                if (!isLock) {
                    if (isRemove) {
                        if (channelContext.isRemoved()) {
                            return;
                        } else {
                            writeLock.lock();
                        }
                    } else {
                        return;
                    }
                }

                channelContext.traceClient(ChannelAction.UNCONNECT, null, null);

                if (channelContext.isClosed() && !isRemove) {
                    log.info("{}已经关闭,备注:{},异常:{}", channelContext, remark, throwable == null ? "无" : throwable.toString());
                    return;
                }

                if (channelContext.isRemoved()) {
                    log.info("{}已经删除,备注:{},异常:{}", channelContext, remark, throwable == null ? "无" : throwable.toString());
                    return;
                }

                //必须先取消任务再清空队列
                //              channelContext.getDecodeRunnable().setCanceled(true);
                channelContext.getHandlerRunnable().setCanceled(true);
                //      channelContext.getHandlerRunnableHighPrior().setCanceled(true);
                channelContext.getSendRunnable().setCanceled(true);
                //      channelContext.getSendRunnableHighPrior().setCanceled(true);

                channelContext.getDecodeRunnable().clearMsgQueue();
                channelContext.getHandlerRunnable().clearMsgQueue();
                //      channelContext.getHandlerRunnableHighPrior().clearMsgQueue();
                channelContext.getSendRunnable().clearMsgQueue();
                //      channelContext.getSendRunnableHighPrior().clearMsgQueue();

                log.info("{} 准备关闭连接, isNeedRemove:{}, {}", channelContext, isRemove, remark);

                try {
                    if (isRemove) {
                        MaintainUtils.removeFromMaintain(channelContext);
                    } else {
                        if (!groupContext.isShortConnection()) {
                            groupContext.closeds.add(channelContext);
                            groupContext.connecteds.remove(channelContext);

                            if (StringUtils.isNotBlank(channelContext.getUserid())) {
                                try {
                                    Aio.unbindUser(channelContext);
                                } catch (Throwable e) {
                                    log.error(e.toString(), e);
                                }
                            }

                            try {
                                Aio.unbindGroup(channelContext);
                            } catch (Throwable e) {
                                log.error(e.toString(), e);
                            }
                        }
                    }

                    try {
                        channelContext.setClosed(true);
                        channelContext.setRemoved(isRemove);
                        channelContext.getGroupContext().getGroupStat().getClosed().incrementAndGet();
                        channelContext.getStat().setTimeClosed(SystemTimer.currentTimeMillis());
                    } catch (Exception e) {
                        log.error(e.toString(), e);
                    }

                    try {
                        aioListener.onAfterClose(channelContext, throwable, remark, isRemove);
                    } catch (Throwable e) {
                        log.error(e.toString(), e);
                    }
                } catch (Throwable e) {
                    log.error(e.toString(), e);
                } finally {
                    if (!isRemove && channelContext.isClosed() && (isClientChannelContext)) //不删除且没有连接上,则加到重连队列中
                    {
                        ClientChannelContext<SessionContext, P, R> clientChannelContext = (ClientChannelContext<SessionContext, P, R>) channelContext;
                        ReconnConf.put(clientChannelContext);
                    }
                }

            } catch (Exception e) {
                log.error(throwable.toString(), e);
            } finally {
                writeLock.unlock();
            }
        } finally {
            channelContext.setWaitingClose(false);
        }
    }

参考文章

talent-aio源码阅读小记(一)http://www.jianshu.com/p/522446599d39

talent-aio源码阅读小记(二)http://www.jianshu.com/p/6e3c4d99e72e

相关文章

网友评论

    本文标题:t-io源码学习笔记

    本文链接:https://www.haomeiwen.com/subject/ivejdxtx.html