
客户端连接
通过Zookeeper客户端类库连接org.apache.zookeeper.ZooKeeper
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 100*1000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("conn");
}
});
1、ZooKeeper类的创建
org.apache.zookeeper.ZooKeeper#ZooKeeper


通过createDefaultHostProvider创建静态服务提供类StaticHostProvider


2、ClientCnxn类的创建

org.apache.zookeeper.ZooKeeper#createConnection



其中最重要的是watchManager、sendThread、eventThread的初始化
3、ClientCnxn类的启动
ClientCnxn类的启动便是sendThread和eventThread线程的启动

在具体分析两个线程之前,需要了解pendingQueue和outgoingQueue队列表示的意义。
outgoingQueue:表示需要发送的数据包
pendingQueue:表示已经发送正在等待响应的数据包
SendThread
1、run
run方法内部有一个while方法,只要不为false将会一直运行
org.apache.zookeeper.ClientCnxn.SendThread#run

org.apache.zookeeper.ZooKeeper.States#isAlive
如果state状态不是CLOSED或AUTH_FAILED将会一直循环

2、建立socket连接
如果clientCnxnSocket客户端socket没有连接即sockKey = null,将会调用连接方法

org.apache.zookeeper.ClientCnxn.SendThread#startConnect

org.apache.zookeeper.ClientCnxnSocketNIO#connect

①、创建socket对象
org.apache.zookeeper.ClientCnxnSocketNIO#createSock

②、注册连接事件
org.apache.zookeeper.ClientCnxnSocketNIO#registerAndConnect

再尝试连接,如果immediateConnect返回true则会调用primeConnection方法。这里连接失败返回fasle
③、incomingBuffer和lenBuffer缓存重置

3、AuthFailed


把eventOfDeath添加到waitingEvents

4、发送心跳ping

org.apache.zookeeper.ClientCnxn.SendThread#sendPing

即是把ping包添加到outgoingQueue
5、只读模式,寻找读/写服务器

6、doTransport

org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
selector.select查询有哪些事件。如果没有事件便会阻塞waitTimeOut时间

6.1、连接事件准备就绪
如果是连接事件准备就绪,则会调用primeConnection方法

org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
用来设置会话session,watches和authentication
①、watch

用来判断这些dataWatches、existWatches、childWatches、persistentWatches、persistentRecursiveWatches等watch是否为空


用来判断persistentWatches持久化的watch与persistentRecursiveWatches持久化的递归Watche是否为空。
最后只要任意watch不为空,都会添加到outgoingQueue队列中

②、authInfo
添加权限的authInfo请求数据包

③、添加空的数据包

④、更改感兴趣的事件类型


org.apache.zookeeper.ClientCnxnSocketNIO#doIO
6.2、处理读写事件

①、读事件(得到服务端响应数据)

首先读取4字节长度的数据到incomingBuffer
org.apache.zookeeper.ClientCnxnSocket#readLength
把incomingBuffer设置为还要读取的长度

最后通过readResponse读取响应结果

org.apache.zookeeper.ClientCnxn.SendThread#readResponse

包括PING_XID(pings)、AUTHPACKET_XID(AuthPacket)、WATCHER_EVENT(watch事件)

finishPacket

org.apache.zookeeper.ClientCnxn#finishPacket

org.apache.zookeeper.ClientCnxn.EventThread#queuePacket

②、写事件(发送数据到服务端)

a、findSendablePacket
从outgoingQueue队列中获取第一个并封装成Packet数据包,对于关于authentication权限的数据包,只发送header非空的数据

b、createBB缓存数据
org.apache.zookeeper.ClientCnxn.Packet#createBB
设置数据包Packet.ByteBuffer bb缓存数据

c、通过socker网络传输到服务端
对于发送过的数据并且不是ping或auth的数据包要放到pendingQueue队列中等待响应

d、禁止写数据
如果outgoingQueue是空或没有初始化将会禁止写数据(即禁止向服务端发送数据)

outgoingQueue队列什么时候添加
提交请求时,会把当前数据包添加outgoingQueue
1、getData
org.apache.zookeeper.ZooKeeper#getData

org.apache.zookeeper.ClientCnxn#submitRequest


org.apache.zookeeper.ClientCnxn#queuePacket

2、create
org.apache.zookeeper.ZooKeeper#create

也会调用到submitRequest方法
所以,只要最终调用到queuePacket方法都会添加,zookeeper客户端的增删改查等操作都会添加到outgoingQueue队列中
EventThread
从waitingEvents队列中获取响应数据,阻塞读取数据
org.apache.zookeeper.ClientCnxn.EventThread#run

org.apache.zookeeper.ClientCnxn.EventThread#processEvent
1、watcher
调用watcher.process方法

2、LocalCallback
调用异步执行方法AsyncCallback.processResult方法(LocalCallback事件)

3、response
根据response的类型调用异步执行方法

总结:
客户端的连接通过ZooKeeper创建,创建ZooKeeper类将会创建并启动ClientCnxn(客户端上下文类)。创建ClientCnxn类会引起sendThread、sendThread线程的初始化和启动。
SendThread:创建socket连接、获取命令数据发送给服务端(ping、auth、创建删除节点)、读取服务端响应数据
EventThread:从waitingEvents队列中获取数据。执行监听器watch事件、执行Packet数据包的异步调用
网友评论