前言
t-io项目git
http://git.oschina.net/tywo45/t-io
简介略
版本1.7.3
这两天正好学习了下谭总的t-io的TCP相关部分,并且把showcase跑起来了。我是用maven引用的,2.0.0还未发布正式版,但是听谭总说2.0.0变化挺大的,加入了许多功能,先期待一把。
由于是在millions_chan的文章基础上梳理的源码流程,但是现在的代码和几个月之前还是有所改动的,所以这里也按照原文章的结构,改了些引用代码,作为自己阅读的笔记,特此记录。
感谢开源!
Server
从server端的start函数进去,创建一个channel group ,生成线程池,之后server端的accept操作回调就在channelGroup所拥有的线程池中执行
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(serverGroupContext.getGroupExecutor());
然后将AsynchronousServerSocketChannel与channelGroup通过open关联起来
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
将这个serverSocketChannel与要监听的IP和端口绑定起来
serverSocketChannel.bind(listenAddress, 0);
然后通过accept接受客户端的连接,将该连接回调给acceptCompletionHandler处理,acceptCompletionHandler实现了NIO中的接口CompletionHandler中的completed和failed方法,
serverSocketChannel.accept(this, acceptCompletionHandler);
在其实现方法中判断是否连接完成或者失败,然后继续接受客户端请求
serverSocketChannel.accept(aioServer, this);
如果服务器没有设置停止,则开始read数据
if (!aioServer.isWaitingStop()) {
ReadCompletionHandler<SessionContext, P, R> readCompletionHandler = channelContext.getReadCompletionHandler();
ByteBuffer readByteBuffer = readCompletionHandler.getReadByteBuffer();//ByteBuffer.allocateDirect(channelContext.getGroupContext().getReadBufferSize());
readByteBuffer.position(0);
readByteBuffer.limit(readByteBuffer.capacity());
asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, readCompletionHandler);
}
在read中将数据回调给readCompletionHandler处理,ReadCompletionHandler实现了NIO的CompletionHandler接口的completed和failed方法
completed方法
@Override
public void completed(Integer result, ByteBuffer byteBuffer) {
// GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
if (result > 0) {
if (channelContext.isTraceClient()) {
Map<String, Object> map = new HashMap<>();
map.put("p_r_buf_len", result);
channelContext.traceClient(ChannelAction.RECEIVED_BUF, null, map);
}
// ByteBuffer newByteBuffer = ByteBufferUtils.copy(readByteBuffer, 0, readByteBuffer.position());
DecodeRunnable<SessionContext, P, R> decodeRunnable = channelContext.getDecodeRunnable();
readByteBuffer.flip();
decodeRunnable.setNewByteBuffer(readByteBuffer);
decodeRunnable.run();
// decodeRunnable.addMsg(newByteBuffer);
// groupContext.getDecodeExecutor().execute(decodeRunnable);
} else if (result == 0) {
log.error("{}读到的数据长度为0", channelContext);
} else if (result < 0) {
if (result == -1) {
Aio.close(channelContext, null, "对方关闭了连接");
return;
} else {
Aio.close(channelContext, null, "读数据时返回" + result);
return;
}
}
if (AioUtils.checkBeforeIO(channelContext)) {
AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
readByteBuffer.position(0);
readByteBuffer.limit(readByteBuffer.capacity());
asynchronousSocketChannel.read(readByteBuffer, readByteBuffer, this);
}
}
其中有个isTraceClient可能是追踪数据信息的,还不太明白...
readByteBuffer.flip();
在我们写入数据后,byte buffer有个游标会一直指向写数据的最后一位,来告诉你下次写数据的时候继续往这个位置来写。flip()方法相当于把游标指向最开始,你便可以开始读取数据。
当接受到值>0时,调用解码线程DecodeRunnable,并调用run方法启动解码,run方法代码较多,先贴上,然后慢慢分析,哈哈哈哈
@Override
public void run() {
ByteBuffer byteBuffer = newByteBuffer;
if (byteBuffer != null) {
if (lastByteBuffer != null) {
byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
lastByteBuffer = null;
}
} else {
return;
}
try {
label_2: while (true) {
int initPosition = byteBuffer.position();
P packet = channelContext.getGroupContext().getAioHandler().decode(byteBuffer, channelContext);
if (packet == null)// 数据不够,组不了包
{
// if (log.isDebugEnabled())
// {
// log.debug("{},数据不够,组不了包", channelContext.toString());
// }
lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, byteBuffer.limit());
return;
} else //组包成功
{
channelContext.getStat().setLatestTimeOfReceivedPacket(SystemTimer.currentTimeMillis());
int afterDecodePosition = byteBuffer.position();
int len = afterDecodePosition - initPosition;
// if (len == 0)
// {
// String logstr = channelContext + "解码成功, " + packet.logstr() + "," + byteBuffer + " 但是却只消耗了0字节, 这有可能会导致死循环. " + ThreadUtils.stackTrace();
// log.error(logstr);
// }
channelContext.getGroupContext().getGroupStat().getReceivedPacket().incrementAndGet();
channelContext.getGroupContext().getGroupStat().getReceivedBytes().addAndGet(len);
channelContext.getStat().getReceivedPackets().incrementAndGet();
channelContext.getStat().getReceivedBytes().addAndGet(len);
channelContext.traceClient(ChannelAction.RECEIVED, packet, null);
packet.setByteCount(len);
AioListener<SessionContext, P, R> aioListener = channelContext.getGroupContext().getAioListener();
try {
if (log.isInfoEnabled()) {
log.info("{} 收到消息 {}", channelContext, packet.logstr());
}
aioListener.onAfterReceived(channelContext, packet, len);
} catch (Exception e) {
log.error(e.toString(), e);
}
handler(channelContext, packet, len);
int remainingLength = byteBuffer.limit() - byteBuffer.position();
if (remainingLength > 0)//组包后,还剩有数据
{
if (log.isDebugEnabled()) {
log.debug("{},组包后,还剩有数据:{}", channelContext, remainingLength);
}
continue label_2;
} else//组包后,数据刚好用完
{
lastByteBuffer = null;
log.debug("{},组包后,数据刚好用完", channelContext);
return;
}
}
}
} catch (AioDecodeException e) {
log.error(channelContext.toString() + "解码异常", e);
Aio.close(channelContext, e, "解码异常:" + e.getMessage());
return;
}
}
如果byteBuffer不为null,则将上次未解码处理完的数据合并起来
然后下面竟然有个label_2:标号
调用实现getAioHandler的decode方法,也就是我们写的解码函数
P packet = channelContext.getGroupContext().getAioHandler().decode(byteBuffer, channelContext);
如果组包成功,将该包返回给该run继续处理,然后回调监听AioListener的onAfterReceived包通知
然后将该包交由handler方法处理
handler(channelContext, packet, len);
如果剩余还有数据,跳转到刚才的标号继续while处理
handler方法
public static <SessionContext, P extends Packet, R> void handler(ChannelContext<SessionContext, P, R> channelContext, P packet, int byteCount) {
GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
// 两种模式处理消息与解码在同一个线程中处理和把packet丢到一个队列中,让线程池去处理 默认是在同一线程处理
PacketHandlerMode packetHandlerMode = groupContext.getPacketHandlerMode();
HandlerRunnable<SessionContext, P, R> handlerRunnable = channelContext.getHandlerRunnable();
if (packetHandlerMode == PacketHandlerMode.QUEUE) {
handlerRunnable.addMsg(packet);
groupContext.getTioExecutor().execute(handlerRunnable);
} else {
// 回调HandlerRunnable方法执行处理包
handlerRunnable.handler(packet);
}
}
在启动服务器时设置了处理handle,最终都是实现了AioHandler的handler,encode,decode方法
static ServerAioHandler<ShowcaseSessionContext, ShowcasePacket, Object> aioHandler = new ShowcaseServerAioHandler();
所以在HandlerRunnable的handler方法中回调设置的groupContext.getAioHandler().handler(packet, channelContext);进行包应用层处理
还有一个地方不明白在DecodeRunable中handler方法中
/**
* 处理消息与解码在同一个线程中处理
*/
SINGLE_THREAD(1),
/**
* 把packet丢到一个队列中,让线程池去处理
*/
QUEUE(2);
点击这两个为什么会跳到get方法
private PacketHandlerMode(int value) {
this.value = value;
}
java 枚举(括号赋值)<http://www.cnblogs.com/mimosading/p/3472578.html>
要注意的是:
1. 通过括号赋值,而且必须带有一个参构造器和一个属性跟方法,否则编译出错
2. 赋值必须都赋值或都不赋值,不能一部分赋值一部分不赋值;如果不赋值则不能写构造器,赋值编译也出错
Client
客户端使用了AsynchronousSocketChannel的open方法,同步发送则使用CountDownLatch控制
/**
*
* @param serverNode
* @param bindIp
* @param bindPort
* @param initClientChannelContext
* @param timeout 超时时间,单位秒
* @param isSyn true: 同步, false: 异步
* @return
* @throws Exception
* @author: tanyaowu
*/
private ClientChannelContext<SessionContext, P, R> connect(Node serverNode, String bindIp, Integer bindPort,
ClientChannelContext<SessionContext, P, R> initClientChannelContext, Integer timeout, boolean isSyn) throws Exception {
AsynchronousSocketChannel asynchronousSocketChannel = null;
ClientChannelContext<SessionContext, P, R> channelContext = null;
boolean isReconnect = initClientChannelContext != null;
// ClientAioListener<SessionContext, P, R> clientAioListener = clientGroupContext.getClientAioListener();
long start = SystemTimer.currentTimeMillis();
asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup);
long end = SystemTimer.currentTimeMillis();
long iv = end - start;
if (iv >= 100) {
log.error("{}, open 耗时:{} ms", channelContext, iv);
}
asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
InetSocketAddress bind = null;
if (bindPort != null && bindPort > 0) {
if (StringUtils.isNotBlank(bindIp)) {
bind = new InetSocketAddress(bindIp, bindPort);
} else {
bind = new InetSocketAddress(bindPort);
}
}
if (bind != null) {
asynchronousSocketChannel.bind(bind);
}
channelContext = initClientChannelContext;
start = SystemTimer.currentTimeMillis();
InetSocketAddress inetSocketAddress = new InetSocketAddress(serverNode.getIp(), serverNode.getPort());
ConnectionCompletionVo<SessionContext, P, R> attachment = new ConnectionCompletionVo<>(channelContext, this, isReconnect, asynchronousSocketChannel, serverNode, bindIp,
bindPort);
if (isSyn) {
Integer _timeout = timeout;
if (_timeout == null) {
_timeout = 5;
}
CountDownLatch countDownLatch = new CountDownLatch(1);
attachment.setCountDownLatch(countDownLatch);
asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientGroupContext.getConnectionCompletionHandler());
countDownLatch.await(_timeout, TimeUnit.SECONDS);
return attachment.getChannelContext();
} else {
asynchronousSocketChannel.connect(inetSocketAddress, attachment, clientGroupContext.getConnectionCompletionHandler());
return null;
}
}
处理流程相关的类分析
DecodeRunable、CloseRunnable实现runable接口,Handler和sendRunable类继承自AbstractQueueRunnable,其又继承自AbstractSynRunnable实现了SynRunnableIntf接口
public interface SynRunnableIntf extends Runnable {
public ReadWriteLock runningLock();
public boolean isNeededExecute();
public boolean isCanceled();
public void setCanceled(boolean isCanceled);
public void runTask();
}
AbstractSynRunnable在该接口基础上添加了方法:
/**
* @return the executor
*/
public Executor getExecutor() {
return executor;
}
/**
* @param executor the executor to set
*/
public void setExecutor(Executor executor) {
this.executor = executor;
}
并且主要实现了Runnable的run方法:
@Override
public final void run() {
if (isCanceled()) //任务已经被取消
{
return;
}
ReadWriteLock runningLock = runningLock();
Lock writeLock = runningLock.writeLock();
boolean trylock = writeLock.tryLock();
if (!trylock) {
return;
}
try {
runTask();
} catch (Exception e) {
log.error(e.toString(), e);
} finally {
writeLock.unlock();
if (isNeededExecute()) {
getExecutor().execute(this);
}
}
}
在run中,尝试获取runningLock,如果获取失败说明该runneable已经在执行了,可以立即退出。否则就运行runTask,最终根据是否需要继续执行决定要不要再次将该runnable提交到执行线程池中(比如处理完一个packet,发现该连接还有待处理的packet则需要继续处理)。
QueueRunnableIntf<T>接口已经取消了
t-io基础类介绍
在处理输入输出的整个流程中,都需要用到它们。两个基础的类为:
ChannelContext
GroupContext
刚开始看到时候并不是很理解这几个类的作用
ChannelContext类包含:
1.AsynchronousSocketChannel对象、与该连接有关的
2.DecodeRunnable、HandlerRunnable、SendRunnable
3.GroupContext
4.ReadCompletionHandler和WriteCompletionHandler
ChannelContext类可以理解代表一个socket连接,封装了:连接对应的AsynchronousSocketChannel对象;解码、处理接收到的数据和发送数据、关闭连接的几个Runnable;该连接所属的GroupContext引用;该连接的ReadCompletionHandler等。
IDEA中查看class的属性和方法列表,点击上面P可以折叠显示
在创建ChannelContext的中,并创建几个Runnable,先设置GroupContext,然后将自己加入group的连接管理中groupContext.connections.add(this);
public void setGroupContext(GroupContext<SessionContext, P, R> groupContext) {
this.groupContext = groupContext;
if (groupContext != null) {
decodeRunnable = new DecodeRunnable<>(this);
// closeRunnable = new CloseRunnable<>(this, null, null, groupContext.getCloseExecutor());
// handlerRunnableHighPrior = new HandlerRunnable<>(this, groupContext.getHandlerExecutorHighPrior());
handlerRunnable = new HandlerRunnable<>(this, groupContext.getTioExecutor());
// sendRunnableHighPrior = new SendRunnable<>(this, groupContext.getSendExecutorHighPrior());
sendRunnable = new SendRunnable<>(this, groupContext.getTioExecutor());
groupContext.connections.add(this);
}
}
然后设置asynchronousSocketChannel,并将连接存储到对端集合clientNodes中:
public void setAsynchronousSocketChannel(AsynchronousSocketChannel asynchronousSocketChannel) {
this.asynchronousSocketChannel = asynchronousSocketChannel;
if (asynchronousSocketChannel != null) {
try {
Node clientNode = createClientNode(asynchronousSocketChannel);
setClientNode(clientNode);
} catch (IOException e) {
log.info(e.toString(), e);
assignAnUnknownClientNode();
}
} else {
assignAnUnknownClientNode();
}
}
GroupContext管理着当前实例的所有连接connections、连接对应的用户users以及用户间的分组关系groups,在创建的时候,创建好两个线程池
public final ClientNodes<SessionContext, P, R> clientNodes = new ClientNodes<>();
public final ChannelContextSetWithLock<SessionContext, P, R> connections = new ChannelContextSetWithLock<>();
public final ChannelContextSetWithLock<SessionContext, P, R> connecteds = new ChannelContextSetWithLock<>();
public final ChannelContextSetWithLock<SessionContext, P, R> closeds = new ChannelContextSetWithLock<>();
public final Groups<SessionContext, P, R> groups = new Groups<>();
public final Users<SessionContext, P, R> users = new Users<>();
public final Ids<SessionContext, P, R> ids = new Ids<>();
管理的两个线程池
/** The group executor. */
protected SynThreadPoolExecutor tioExecutor = null;
protected ThreadPoolExecutor groupExecutor = null;
业务逻辑处理
读取数据
在读取数据的方法里,将读取到的数据byteBuffer提交到decodeRunnable的数据队列中,而后继续调用read方法读取对端发送来的数据。
数据组包
在decodeRunnable中,调用AioHandler的decode方法来获取数据包,然后提交给HandlerRunnable处理。在处理数据时主要考虑了半包和粘包的情况:
在这个循环中,首先从队列中获取数据,获取到数据后,看看是否存在粘包多出来的数据lastByteBuffer, 如果存在则将两部分数据合并。而后调用AioHandler的decode方法处理数据,如果由于半包导致解包失败,则继续从队列中获取数据,组合起来尝试解包;如果解包成功,则将多余的数据放在lastByteBuffer,并且更新各种统计信息。最后,将组好的数据包通过submit方法传递给HandlerRunnable处理。
数据处理
HandlerRunnable从数据队列中获取组好的包,调用handler方法,然后调用AioHandler的hanlder解除处理数据包:
groupContext.getAioHandler().handler(packet, channelContext);
@Override
public void runTask() {
P packet = null;
while ((packet = msgQueue.poll()) != null) {
handler(packet);
}
}
发送数据
发送数据使用Aio类的静态方法send将packet添加到SendRunnable的数据队列中,然后将sendRunable提交到线程池中运行,CountDownLatch还不是太理解用法,需要再学习下
private static <SessionContext, P extends Packet, R> Boolean send(final ChannelContext<SessionContext, P, R> channelContext, final P packet, CountDownLatch countDownLatch,
PacketSendMode packetSendMode) {
try {
if (packet == null) {
return false;
}
if (channelContext == null || channelContext.isClosed() || channelContext.isRemoved()) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
if (channelContext != null) {
log.error("{}, isClosed:{}, isRemoved:{}, stack:{} ", channelContext, channelContext.isClosed(), channelContext.isRemoved(), ThreadUtils.stackTrace());
}
return false;
}
boolean isSingleBlock = countDownLatch != null && (packetSendMode == PacketSendMode.SINGLE_BLOCK);
SendRunnable<SessionContext, P, R> sendRunnable = channelContext.getSendRunnable();
PacketWithMeta<P> packetWithMeta = null;
boolean isAdded = false;
if (countDownLatch == null) {
isAdded = sendRunnable.addMsg(packet);
} else {
packetWithMeta = new PacketWithMeta<>(packet, countDownLatch);
isAdded = sendRunnable.addMsg(packetWithMeta);
}
if (!isAdded) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
return false;
}
//SynThreadPoolExecutor synThreadPoolExecutor = channelContext.getGroupContext().getGroupExecutor();
channelContext.getGroupContext().getTioExecutor().execute(sendRunnable);
if (isSingleBlock) {
long timeout = 10;
try {
channelContext.traceBlockPacket(SynPacketAction.BEFORE_WAIT, packet, countDownLatch, null);
Boolean awaitFlag = countDownLatch.await(timeout, TimeUnit.SECONDS);
channelContext.traceBlockPacket(SynPacketAction.AFTER__WAIT, packet, countDownLatch, null);
//log.error("{} after await, packet:{}, countDownLatch:{}", channelContext, packet.logstr(), countDownLatch);
if (!awaitFlag) {
log.error("{} 同步发送超时, timeout:{}s, packet:{}", channelContext, timeout, packet.logstr());
}
} catch (InterruptedException e) {
log.error(e.toString(), e);
}
Boolean isSentSuccess = packetWithMeta.getIsSentSuccess();
return isSentSuccess;
} else {
return null;
}
} catch (Exception e) {
log.error(e.toString(), e);
return null;
} finally {
// if (isSingleBlock)
// {
// org.tio.core.GroupContext.SYN_SEND_SEMAPHORE.release();
// }
}
}
再sendRunable中的runTask中,在线程池中,会尝试一次发送所有等待发送的packet,不过对单次发送的packet设置了一个上限,而后对每个packet编码,汇总到一个ByteBuffer中:
if (queueSize > 1) {
ByteBuffer[] byteBuffers = new ByteBuffer[queueSize];
int allBytebufferCapacity = 0;
int packetCount = 0;
List<Object> packets = new ArrayList<>(queueSize);
for (int i = 0; i < queueSize; i++) {
if ((obj = msgQueue.poll()) != null) {
boolean isPacket = obj instanceof Packet;
if (isPacket) {
p = (P) obj;
packets.add(p);
} else {
packetWithMeta = (PacketWithMeta<P>) obj;
p = packetWithMeta.getPacket();
packets.add(packetWithMeta);
}
ByteBuffer byteBuffer = getByteBuffer(p, groupContext, aioHandler);
channelContext.traceClient(ChannelAction.BEFORE_SEND, p, null);
allBytebufferCapacity += byteBuffer.limit();
packetCount++;
byteBuffers[i] = byteBuffer;
} else {
break;
}
}
ByteBuffer allByteBuffer = ByteBuffer.allocate(allBytebufferCapacity);
byte[] dest = allByteBuffer.array();
for (ByteBuffer byteBuffer : byteBuffers) {
if (byteBuffer != null) {
int length = byteBuffer.limit();
int position = allByteBuffer.position();
System.arraycopy(byteBuffer.array(), 0, dest, position, length);
allByteBuffer.position(position + length);
}
}
sendByteBuffer(allByteBuffer, packetCount, packets);
} else {
if ((obj = msgQueue.poll()) != null) {
boolean isPacket = obj instanceof Packet;
if (isPacket) {
p = (P) obj;
sendPacket(p);
} else {
packetWithMeta = (PacketWithMeta<P>) obj;
p = packetWithMeta.getPacket();
sendPacket(packetWithMeta);
}
}
}
最终在sendRunnable的sendByteBuffer中完成数据发送,注意发送前需要获取一个信号量,保证同一时间对一个连接只有一个线程在调用发送动作,并在writeCompleteHandler中记录当前连接发送数据,并更新当前连接活动时间,为keepalive做准备:
public void sendByteBuffer(ByteBuffer byteBuffer, Integer packetCount, Object packets) {
if (byteBuffer == null) {
log.error("{},byteBuffer is null", channelContext);
return;
}
if (!AioUtils.checkBeforeIO(channelContext)) {
return;
}
byteBuffer.flip();
AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
WriteCompletionHandler<SessionContext, P, R> writeCompletionHandler = channelContext.getWriteCompletionHandler();
try {
// long start = SystemTimer.currentTimeMillis();
writeCompletionHandler.getWriteSemaphore().acquire();
// long end = SystemTimer.currentTimeMillis();
// long iv = end - start;
// if (iv > 100) {
// //log.error("{} 等发送锁耗时:{} ms", channelContext, iv);
// }
} catch (InterruptedException e) {
log.error(e.toString(), e);
}
WriteCompletionVo writeCompletionVo = new WriteCompletionVo(byteBuffer, packets);
asynchronousSocketChannel.write(byteBuffer, writeCompletionVo, writeCompletionHandler);
channelContext.getStat().setLatestTimeOfSentPacket(SystemTimer.currentTimeMillis());
}
关闭连接
再AIO的静态方法里面执行close线程
1.关闭AsynchronousSocketChannel
2.判断是否需要重连ReconnConf
3.更新close计数,发送close监听回调
4.必须先取消任务再清空队列
5.是否需要重新连接
ThreadPoolExecutor closePoolExecutor = channelContext.getGroupContext().getTioExecutor();
closePoolExecutor.execute(new CloseRunnable<>(channelContext, throwable, remark, isNeedRemove));
@Override
public void run() {
try {
GroupContext<SessionContext, P, R> groupContext = channelContext.getGroupContext();
AioListener<SessionContext, P, R> aioListener = groupContext.getAioListener();
try {
AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
if (asynchronousSocketChannel != null && asynchronousSocketChannel.isOpen()) {
try {
asynchronousSocketChannel.close();
} catch (Exception e) {
log.error(e.toString(), e);
}
}
} catch (Throwable e) {
log.error(e.toString(), e);
}
boolean isClientChannelContext = channelContext instanceof ClientChannelContext;
// ReconnConf<SessionContext, P, R> reconnConf = channelContext.getGroupContext().getReconnConf();
boolean isRemove = this.isNeedRemove;
if (!isRemove) {
if (isClientChannelContext) {
ClientChannelContext<SessionContext, P, R> clientChannelContext = (ClientChannelContext<SessionContext, P, R>) channelContext;
if (!ReconnConf.isNeedReconn(clientChannelContext, false)) {
isRemove = true;
}
} else {
isRemove = true;
}
}
try {
channelContext.getStat().setTimeClosed(SystemTimer.currentTimeMillis());
aioListener.onBeforeClose(channelContext, throwable, remark, isRemove);
} catch (Throwable e) {
log.error(e.toString(), e);
}
ReentrantReadWriteLock reentrantReadWriteLock = channelContext.getCloseLock();//.getLock();
WriteLock writeLock = reentrantReadWriteLock.writeLock();
boolean isLock = writeLock.tryLock();
try {
if (!isLock) {
if (isRemove) {
if (channelContext.isRemoved()) {
return;
} else {
writeLock.lock();
}
} else {
return;
}
}
channelContext.traceClient(ChannelAction.UNCONNECT, null, null);
if (channelContext.isClosed() && !isRemove) {
log.info("{}已经关闭,备注:{},异常:{}", channelContext, remark, throwable == null ? "无" : throwable.toString());
return;
}
if (channelContext.isRemoved()) {
log.info("{}已经删除,备注:{},异常:{}", channelContext, remark, throwable == null ? "无" : throwable.toString());
return;
}
//必须先取消任务再清空队列
// channelContext.getDecodeRunnable().setCanceled(true);
channelContext.getHandlerRunnable().setCanceled(true);
// channelContext.getHandlerRunnableHighPrior().setCanceled(true);
channelContext.getSendRunnable().setCanceled(true);
// channelContext.getSendRunnableHighPrior().setCanceled(true);
channelContext.getDecodeRunnable().clearMsgQueue();
channelContext.getHandlerRunnable().clearMsgQueue();
// channelContext.getHandlerRunnableHighPrior().clearMsgQueue();
channelContext.getSendRunnable().clearMsgQueue();
// channelContext.getSendRunnableHighPrior().clearMsgQueue();
log.info("{} 准备关闭连接, isNeedRemove:{}, {}", channelContext, isRemove, remark);
try {
if (isRemove) {
MaintainUtils.removeFromMaintain(channelContext);
} else {
if (!groupContext.isShortConnection()) {
groupContext.closeds.add(channelContext);
groupContext.connecteds.remove(channelContext);
if (StringUtils.isNotBlank(channelContext.getUserid())) {
try {
Aio.unbindUser(channelContext);
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
try {
Aio.unbindGroup(channelContext);
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
}
try {
channelContext.setClosed(true);
channelContext.setRemoved(isRemove);
channelContext.getGroupContext().getGroupStat().getClosed().incrementAndGet();
channelContext.getStat().setTimeClosed(SystemTimer.currentTimeMillis());
} catch (Exception e) {
log.error(e.toString(), e);
}
try {
aioListener.onAfterClose(channelContext, throwable, remark, isRemove);
} catch (Throwable e) {
log.error(e.toString(), e);
}
} catch (Throwable e) {
log.error(e.toString(), e);
} finally {
if (!isRemove && channelContext.isClosed() && (isClientChannelContext)) //不删除且没有连接上,则加到重连队列中
{
ClientChannelContext<SessionContext, P, R> clientChannelContext = (ClientChannelContext<SessionContext, P, R>) channelContext;
ReconnConf.put(clientChannelContext);
}
}
} catch (Exception e) {
log.error(throwable.toString(), e);
} finally {
writeLock.unlock();
}
} finally {
channelContext.setWaitingClose(false);
}
}
参考文章
talent-aio源码阅读小记(一)http://www.jianshu.com/p/522446599d39
talent-aio源码阅读小记(二)http://www.jianshu.com/p/6e3c4d99e72e
网友评论