摘要
本节讲解ClientCnxnSocket以及ClientCnxnSocketNIO
涉及一些NIO的知识,希望自行了解。
相关源码分析很少,没有什么参照
主要讲解
ClientCnxnSocket抽象类结构
readConnectResult方法 读取server的connect的response
readLength方法 读取buffer长度并给incomingBuffer分配对应大小空间
ClientCnxnSocketNIO实现
findSendablePacket函数
根据sasl以及outgoingQueue情况获取发送的Packet
doIO函数
读就绪时,读取response
写就绪时,从findSendablePacket找到可发送的Packet
doTransport函数
如果是连接就绪,调用sendThread连接操作
若读写就绪,调用doIO函数
connect,createSock,registerAndConnect函数
完成client到server的socket连接(仅为网络连接,并没有和server进行IO,更没有得到server的connect的response)
简介
ClientCnxnSocket定义了底层Socket通信的接口.默认是现实ClientCnxnSocketNIO.
类图如下
ClientCnxnSocket类图源码
抽象类ClientCnxnSocket
主要讲解变量以及已经实现的方法
属性
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);
protected boolean initialized;//是否初始化
/**
* This buffer is only used to read the length of the incoming message.
*/
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);//仅仅用来读取 incoming message的长度
/**
* After the length is read, a new incomingBuffer is allocated in
* readLength() to receive the full message.
*/
protected ByteBuffer incomingBuffer = lenBuffer;
protected long sentCount = 0;//send次数
protected long recvCount = 0;//接收次数
protected long lastHeard;//上次接收时间
protected long lastSend;//上次发送时间
protected long now;//当前时间
protected ClientCnxn.SendThread sendThread;//客户端通信的发送线程
/**
* The sessionId is only available here for Log and Exception messages.
* Otherwise the socket doesn't need to know it.
*/
protected long sessionId;//仅仅用来辅助log和Exception记录用的
方法
void introduce(ClientCnxn.SendThread sendThread, long sessionId) {//设置sendThread以及sessionId
this.sendThread = sendThread;
this.sessionId = sessionId;
}
void updateNow() {//更新now时间
now = System.currentTimeMillis();
}
int getIdleRecv() {//获取接收的闲置时间
return (int) (now - lastHeard);
}
int getIdleSend() {//获取发送的闲置时间
return (int) (now - lastSend);
}
long getSentCount() {//发送次数
return sentCount;
}
long getRecvCount() {//接收次数
return recvCount;
}
void updateLastHeard() {//更新最后一次监听的时间
this.lastHeard = now;
}
void updateLastSend() {//更新最后一次发送的时间
this.lastSend = now;
}
void updateLastSendAndHeard() {//同时更新最后一次监听和发送的时间
this.lastSend = now;
this.lastHeard = now;
}
protected void readLength() throws IOException {//读取incoming message的length
int len = incomingBuffer.getInt();
if (len < 0 || len >= ClientCnxn.packetLen) {//默认长度[0,4M]之间
throw new IOException("Packet len" + len + " is out of range!");
}
incomingBuffer = ByteBuffer.allocate(len);//分配对应长度的空间
}
void readConnectResult() throws IOException {//读取connect的response
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");//反序列化,看是否是只读的
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);//sendThread完成connect时一些参数验证以及zk state更新以及事件处理
}
主要就是各种次数,时间的设置以及获取
其次就是注意readLength和readConnectResult方法即可
子类ClientCnxnSocketNIO
类图如下
ClientCnxnSocketNIO类图属性
private static final Logger LOG = LoggerFactory
.getLogger(ClientCnxnSocketNIO.class);
private final Selector selector = Selector.open();
private SelectionKey sockKey;
主要就是NIO的东西
方法
按照一定的顺序来讲
client连接时
org.apache.zookeeper.ClientCnxn.SendThread#run
org.apache.zookeeper.ClientCnxn.SendThread#startConnect
org.apache.zookeeper.ClientCnxnSocketNIO#connect
@Override
void connect(InetSocketAddress addr) throws IOException {//参数是某一个zk server的地址
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);//注册SelectionKey到zk server
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;//还没有初始化,connect ok了但是还读到server的response
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
里面调用了createSock和registerAndConnect方法,如下
/**
* create a socket channel.
* @return the created socket channel
* @throws IOException
*/
SocketChannel createSock() throws IOException {//创建SocketChannel
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);//非阻塞
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}
/**
* register with the selection and connect
* @param sock the {@link SocketChannel}
* @param addr the address of remote host
* @throws IOException
*/
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);//注册,监听connect事件
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {//如果立即建立了连接
sendThread.primeConnection();//client把watches和authData等数据发过去,并更新SelectionKey为读写
}
}
这里注意一点
registerAndConnect中如果立即connect就调用sendThread.primeConnection();
如果没有立即connect上,那么就在下面介绍的doTransport中等待SocketChannel finishConnect再调用
client 和 server的网络交互
主要函数
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);//找到就绪的keys个数
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {//如果就绪的是connect事件,这个出现在registerAndConnect函数没有立即连接成功
if (sc.finishConnect()) {//如果次数完成了连接
updateLastSendAndHeard();//更新时间
sendThread.primeConnection();//client把watches和authData等数据发过去,并更新SelectionKey为读写
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//如果就绪的是读或者写事件
doIO(pendingQueue, outgoingQueue, cnxn);//利用pendingQueue和outgoingQueue进行IO
}
}
if (sendThread.getZkState().isConnected()) {//如果zk的state是已连接
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {//如果有可以发送的packet
enableWrite();//允许写
}
}
}
selected.clear();//清空
}
参数pendingQueue 以及 outgoingQueue简单介绍如下
outgoingQueue 是请求发送队列,是client存储需要被发送到server端的Packet队列
pendingQueue是已经从client发送,但是要等待server响应的packet队列
后面章节再细讲
主要调用了doIO 以及 findSendablePacket方法
doIO方法如下
/**
* @return true if a packet was received
* @throws InterruptedException
* @throws IOException
*/
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {//若读就绪
int rc = sock.read(incomingBuffer);//读出len
if (rc < 0) {//如果<0,表示读到末尾了,这种情况出现在连接关闭的时候
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {//如果还有数据
incomingBuffer.flip();//切换到读模式
if (incomingBuffer == lenBuffer) {
recvCount++;//接收次数+1
readLength();//获取len并给incomingBuffer分配对应空间
} else if (!initialized) {//如果client和server的连接还没有初始化
readConnectResult();//读取connect 回复
enableRead();//启用读
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {//如果有可以发送的packet
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();//允许写,因为有要发送的packet
}
lenBuffer.clear();
incomingBuffer = lenBuffer;//还原incomingBuffer
updateLastHeard();
initialized = true;//client和server连接初始化完成
} else { //如果已连接,并且已经给incomingBuffer分配了对应len的空间
sendThread.readResponse(incomingBuffer);//读取response
lenBuffer.clear();
incomingBuffer = lenBuffer;//还原incomingBuffer
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {//若写就绪
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());//找到可以发送的Packet
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();//如果packet还没有生成byteBuffer,那就生成byteBuffer
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);//从待发送队列中取出该packet
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);//加入待回复的队列
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();//如果没有要发的,就禁止写
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
流程接见如下
主要分为读或者写两个case
读:
没有初始化就完成初始化
读取len再给incomingBuffer分配对应空间
读取对应的response
写:
找到可以发送的Packet
如果Packet的byteBuffer没有创建,那么就创建
byteBuffer写入socketChannel
把Packet从outgoingQueue中取出来,放到pendingQueue中
相关读写的处理
主要注意,读的时候是分两次读的
第一次只读len,然后给incomingBuffer分配对应的空间
第二次再把剩下的内容读完
findSendablePacket方法如下
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
boolean clientTunneledAuthenticationInProgress) {//bool参数是表示 如果当前client和server在处理sasl的权限
synchronized (outgoingQueue) {
if (outgoingQueue.isEmpty()) {//如果没有要发送的
return null;
}
if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
|| !clientTunneledAuthenticationInProgress) {//如果有要发送的 或者 没有在处理sasl的权限
return outgoingQueue.getFirst();
}
// Since client's authentication with server is in progress,
// send only the null-header packet queued by primeConnection().
// This packet must be sent so that the SASL authentication process
// can proceed, but all other packets should wait until
// SASL authentication completes.
ListIterator<Packet> iter = outgoingQueue.listIterator();
while (iter.hasNext()) {
Packet p = iter.next();
if (p.requestHeader == null) {//如果在处理sasl的权限,那么只有requestHeader为null的Packet可以被发送
// We've found the priming-packet. Move it to the beginning of the queue.
iter.remove();
outgoingQueue.add(0, p);
return p;
} else {
// Non-priming packet: defer it until later, leaving it in the queue
// until authentication completes.
if (LOG.isDebugEnabled()) {
LOG.debug("deferring non-priming packet: " + p +
"until SASL authentication completes.");
}
}
}
// no sendable packet found.
return null;
}
}
主要流程简介如下
如果没有要发送的就返回null
如果有要发送的或者client没有在处理sasl的权限,那么就拿队列第一个
如果在处理sasl,那么遍历队列,把没有requestHeader为null的放到队头,返回该packet
这个地方主要涉及到sasl验证,并不是很了解这个机制,没有深究
其他函数
@Override
boolean isConnected() {//这个只是说SelectionKey有没有初始化,来标示,并不是真正的Connected
return sockKey != null;
}
部分函数表格列举
函数 | 备注 |
---|---|
void cleanup() | socketChannel关闭,SelectionKey置空 |
void close() | selector关闭 |
SocketAddress getRemoteSocketAddress() | 获取远端地址 |
SocketAddress getLocalSocketAddress() | 获取本地地址 |
synchronized void wakeupCnxn() | 唤醒selector |
void testableCloseSocket() | 测试socket关闭 |
synchronized void enableWrite() | 开启写 |
public synchronized void disableWrite() | 禁止写 |
synchronized private void enableRead() | 开启读 |
synchronized void enableReadWriteOnly() | 仅允许读写 |
Selector getSelector() | 获取selector |
void sendPacket(Packet p) | 发送packet |
思考
何时调用sendThread.primeConnection();以及里面干了什么
如果瞬间连上,就直接调用
否则的话就等到sc.finishConnect()再调用
这个函数完成了一些watches和authData的传递以及允许更改SelectionKey,允许clientCnxnSocket可读写,
org.apache.zookeeper.ClientCnxnSocket#initialized意义
参数指的是zk client收到的zk server的正确response之后,才算初始化成功
不是说NIO中的connect上了就算成功
两者的区别在于NIO的SelectionKey
前者已经从connect变化到了write和read
后者仅限于connect
org.apache.zookeeper.ClientCnxnSocketNIO#doIO处理读就绪的时候,为什么分两次
第一次只读len,然后给incomingBuffer分配对应的空间
第二次再把剩下的内容读完
唯一能够想到的优点就是节省空间了
请求发送与接收 流程图
请求发送与接收吐槽以及问题
1.方法没有注释,甚至是错的注释
如错误的方法注释 org.apache.zookeeper.ClientCnxnSocketNIO#doIO
2.ClientCnxnSocketNIO中connect以及state相关的函数太多了,有点绕
3.SelectionKey中,读写一会开一会关的目的是什么,代码看起来很麻烦
为什么不一直允许读写,单个开关弄来弄去让人疑惑,除非close
是有场景需要禁止读后者禁止写么,还是这样会提升性能?
4.org.apache.zookeeper.ClientCnxnSocketNIO#isConnected用SelectionKey是否初始化判断是否Connected
不太合理,有可能刚初始化但是还没有connect呢???
备注
sendThread在下面两节中讲到,是client完成和server通信的线程
sessionId也会在后面讲会话的时候进行讲解
pendingQueue和outingQueue之后再讲解
网友评论