zookeeper例子
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("地址",
5000, null);
byte[] dataArr = zk.getData("/sunpy/data", null, new Stat());
System.out.println(new String(dataArr));
}
客户端入口
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, sessionTimeout, watcher});
this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
this.hostProvider = hostProvider;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
// 创建SendThread、EventThread线程
this.cnxn = this.createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, this.getClientCnxnSocket(), canBeReadOnly);
// 运行SendThread、EventThread线程
this.cnxn.start();
}
hostProvider:客户端地址列表管理器。
ClientCxn:客户端核心线程,内部包含SendThread和EventThread。
SendTread:I/O线程,负责zookeeper客户端与服务器之间的网络IO通信。
EventThread:事件线程,负责对服务端事件处理。
客户端实现逻辑
1. 启动客户端,初始化
调用createConnection方法
创建SendThread网络通信线程。
创建EventThread事件处理线程。
SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
super(makeThreadName("-SendThread()"));
// 同步改变zookeeper的state状态
changeZkState(States.CONNECTING);
// 使用传入的ClientCnxnSocket对象进行初始化
this.clientCnxnSocket = clientCnxnSocket;
// 将网络通信线程设置为守护线程
setDaemon(true);
}
EventThread() {
super(makeThreadName("-EventThread"));
// 将事件处理线程设置为守护线程
setDaemon(true);
}
2. 建立连接
启动SendThread线程、EventThread线程。
SendThread线程实现逻辑:
// 进入SendThread线程类,执行线程实现的逻辑:
@Override
public void run() {
// 传入参数:发送Packet的队列outgoingQueue
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
// 判断当前zookeeper的状态,state不为CLOSED和AUTH_FAILED
while (state.isAlive()) {
try {
// 校验ClientCnxnSocket(网络通信Sokcet)的状态
if (!clientCnxnSocket.isConnected()) {
// 关闭退出
// 获取服务器地址重新连接
}
// 校验状态已连接
if (state.isConnected()) {
// 使用sasl权限认证校验
if (zooKeeperSaslClient != null) {
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
// 认证初始化
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (sendAuthEvent) {
// 认证成功或者失败,将Event事件加入到EventThread中的WaitingEvents队列中
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
// 发送数据时间-接听到数据时间
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else { //状态未连接
// 创建连接时间-接听到数据时间
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
// 接听到数据,读超时,抛出会话异常
}
if (state.isConnected()) {
// 写超时,发送ping心跳包测试
}
if (state == States.CONNECTEDREADONLY) {
// 只读模式操作
}
/*
* 传输数据
* 采用NIO方式,ClientCnxnSocketNIO实现,那么就使用Selector通过感兴趣的key,来获取响应的通道传输
* 采用Netty方式,ClientCnxnSocketNetty实现
*/
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
}
}
// 清空outgoingQueue队列中的packet
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
// 关闭ClientCnxnSocket
clientCnxnSocket.close();
// 如果当前状态为未关闭且有权限,那么向事件处理对象线程中添加事件对象
if (state.isAlive()) {
// 将WatchEvent事件添加到waitingEvents队列中
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
// 否则向事件处理对象线程中添加事件对象
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
}
doTransport方法传入pendingQueue用来接收响应的数据。
3. 网络IO操作
zookeeper提供了NIO(ClientCnxnSocketNIO)和Netty(ClientCnxnSocketNetty)两种具体实现。
// NIO实现ClientCnxnSocketNIO
@Override
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
// 获取Selector感兴趣的key
synchronized (this) {
selected = selector.selectedKeys();
}
// 遍历所有key
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// 建立连接的key
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
}
// 读写操作的key
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
//清除感兴趣的key
selected.clear();
}
NIO的读写操作,隐藏在doIO方法
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
// 感兴趣键为可读
if (sockKey.isReadable()) {
// 将服务器传过来的数据写入到缓冲区
int rc = sock.read(incomingBuffer);
// 缓冲区不为空
if (!incomingBuffer.hasRemaining()) {
// 缓冲区调整指针,开始写
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else { // 读取缓冲区数据
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
// 感兴趣键为可写
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb); // 获取outgoingQueue队列中Packet向服务器写
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) { // outgoingQueue队列空了,就不用写了,否则继续写
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
enableWrite();
}
}
}
4. 事件处理线程操作
EventThread线程执行逻辑:
// 进入EventThread线程类,执行线程实现的逻辑:
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
// 从waitingEvents队列中获取event事件(阻塞方式)
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else { // event事件未死就可以执行event
processEvent(event); // 执行watch监控事件
}
if (wasKilled) { // 事件死亡结束死循环
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {}
}
服务器地址列表设计
zookeeper实现的服务器默认地址列表:
createDefaultHostProvider(connectString)
// default hostprovider
private static HostProvider createDefaultHostProvider(String connectString) {
return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}
private void init(Collection<InetSocketAddress> serverAddresses, long randomnessSeed, Resolver resolver) {
this.sourceOfRandomness = new Random(randomnessSeed);
this.resolver = resolver;
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException("A HostProvider may not be empty!");
}
this.serverAddresses = shuffle(serverAddresses);
currentIndex = -1;
lastIndex = -1;
}
可以看到都是通过InetSocketAddress存储地址。
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
ConnectStringParser实现方式,就是对于集群地址,采用分割字符串方式,形成多个String,然后包装成InetSocketAddress对象,最后存放在serverAddresses中。
参考
《从Paxos到Zookeeper分布式一致性原理与实战》
网友评论