概述
zk是CS架构,用户需要通过客户端API跟服务端交互。
客户端主要包含以下几个模块:
- 通信模块:负责跟服务端进行通信,zk支持两种事件驱动编程模型,一种是Java NIO,一种是Netty,默认使用NIO;
- 请求处理模块:负责请求处理的流程;包括同步请求处理,异步请求处理,watch事件注册和处理等;
- 启动模块:负责初始化客户端;包括创建客户端NIO连接注册事件,启动请求处理线程,启动异步事件处理线程等;
本节先来分析下zk客户端涉及到的核心类
客户端核心类
一、ZooKeeper
zk客户端库的核心类,将用户请求数据封装成RequestHeader、Request对象,然后委托ClientCnxn
来与服务端进行通信,相应的返回结果会存储在Response,ReplyHeader对象中。
1.1、ZooKeeper
- ZooKeeper中方法基本都有多个重载的版本,其中带有参数AsyncCallback用于异步处理响应;
- getData/getChildren/exists方法可以传入Watcher,用于注册监听事件;
以getData为例分析:
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) {
final String clientPath = path;
// 节点路径校验
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// DataWatchRegistration可以获取dataWatches(节点数据上注册的watcher)
wcb = new DataWatchRegistration(watcher, clientPath);
}
// 加上客户端跟路径 chroot
final String serverPath = prependChroot(clientPath);
// 请求头
RequestHeader h = new RequestHeader();
// OpCode -> getData
h.setType(ZooDefs.OpCode.getData);
// 请求体 (路径、watch)
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
// 响应体
GetDataResponse response = new GetDataResponse();
// 委托cnxn处理,同步返回void,响应结果异步回调 DataCallback.processResult处理,可以通过ctx传递参数
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
1.
校验节点路径是否合法;
2.
watcher不为空,构造DataWatchRegistration可以获取dataWatches(Map存放节点上注册的监听数据变更的watcher);
3.
serverPath拼接加上客户端根路径 chroot(启动客户端时服务端地址后带的路径,例如:127.0.0.1:2181/test1);
4.
构造请求头/请求体/响应头/响应体;
5.
委托ClientCnxn处理,响应结果异步回调 DataCallback.processResult处理,可以通过ctx传递参数;
1.2、States
States客户端连接状态枚举:
-
CONNECTING
:正在连接,开始建立连接时置为该状态; -
ASSOCIATING
:该状态暂时没用到; -
CONNECTED
:已连接; -
CONNECTEDREADONLY
:已建立只读连接; -
CLOSED
:已关闭; -
AUTH_FAILED
:无权限; -
NOT_CONNECTED
:未连接,默认状态;
1.3、ZKWatchManager
ZKWatchManager通过3个Map管理节点上注册的事件;
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
-
dataWatches
:节点数据变更的监听;对应通过DataWatchRegistration进行注册监听器和获取监听器; -
existWatches
:节点是否还存在的监听;对应ExistsWatchRegistration进行注册监听器和获取监听器; -
childWatches
:子节点变更的监听;对应ChildWatchRegistration进行注册监听器和获取监听器;
二、ClientCnxn
- ClientCnxn负责客户端与服务端交互的主要逻辑,起到承上启下的作用,承上接受ZooKeeper的委托,启下通过内部类SendThread委托给ClientCnxnSocket管理底层IO连接。
- ClientCnxn通过两个Thread(
SendThread、EventThread
),三个队列(outgoingQueue、pendingQueue、waitingEvents
)实现了请求的同步异步处理流程以及监听事件的处理流程;
ClientCnxn重点方法如下:
public class ClientCnxn {
// 已发送并正在等待响应的数据包
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
// 需要发送的数据包
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
final SendThread sendThread;
final EventThread eventThread;
// 统一生成xid,synchronized进行同步
synchronized public int getXid();
// 提交客户端请求,提交之后wait等待响应后唤醒
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException;
// 通过sendThread发送请求包,通过cb异步处理响应
public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode)
throws IOException;
// 请求数据包入队 需要发送的队列
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration);
}
2.1、Packet
Packet负责封装RPC请求和响应,消息格式如下:
static class Packet {
// 请求头
RequestHeader requestHeader;
// 响应头
ReplyHeader replyHeader;
// 请求体
Record request;
// 响应体
Record response;
// 请求buffer
ByteBuffer bb;
// 客户端路径(相对于chroot的路径)
String clientPath;
// 服务端路径(包含chroot的路径)
String serverPath;
// 该packet是否处理完成
boolean finished;
// 该packet的异步回调
AsyncCallback cb;
// 需要传递到异步回调中的数据
Object ctx;
// 监听注册
WatchRegistration watchRegistration;
// 是否只读 默认false
public boolean readOnly;
}
请求.png
-
xid
是客户端发送请求的序号,用来包装请求的FIFO; -
type
是请求类型,例如:CreateRequest等
-
xid
是客户端发送请求的序号; -
zxid
是zk最新的事务id; -
err
是错误码,例如:Code.OK和Code.NONODE,表示处理请求的结果状态;
2.2、SendThread
SendThread继承Thread,是zk客户端专门负责IO处理的类;
主要包含如下方法:
-
startConnect
:创建客户端连接; -
sendPing
:定时发送心跳包; -
onConnected
:连接建立后的回调方法 -
run
:请求的主要流程处理;把pendingQueue, outgoingQueue传递到clientCnxnSocket中进行具体逻辑处理;
2.3、EventThread
EventThread同样继承Thread,是zk客户端专门负责处理watch事件和异步回调的类;
run方法中不断的从waitingEvents这个队列中取出Object,识别出其具体类型Watcher或者AsyncCallback,并分别调用process和processResult接口方法来实现对事件的触发和回调;
三、ClientCnxnSocket
ClientCnxnSocket是负责进行底层通信的抽象类,zk默认提供两种通信方式:
-
ClientCnxnSocketNIO
:使用JDK原生NIO api实现; -
ClientCnxnSocketNetty
:直接使用Netty实现;
两种方式都是对之前NIO或Netty的客户端代码进行简单封装,包括open - connect - register - select - read等;
小结
本节主要先简单了解下zk客户端涉及的核心类,先有个整体印象方便后面客户端启动流程以及客户端请求处理流程的学习;
----------over-----------
网友评论