概述
本节分析下客户端消息的处理,重点关注Watcher及DataCallback;本节以getData的同步和异步方法为例进行分析;
RPC方法流程.png
处理流程
客户端请求处理流程.png1.
示例代码
public class DemoTest implements Watcher, AsyncCallback.DataCallback {
private static Stat stat = new Stat();
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
DemoTest demoTest = new DemoTest();
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, demoTest);
// 同步
byte[] bytes = zooKeeper.getData("/test1", demoTest, stat);
// 异步
zooKeeper.getData("/test1", demoTest, demoTest, "异步回调需要传递的数据");
}
@Override
public void process(WatchedEvent event) {
System.out.println("接收到watch通知:" + event);
}
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("结果回调:" + path + "----" + ctx);
}
}
- 实现Watcher接口,重写process方法用于watch事件回调;
- 实现AsyncCallback接口,重写processResult方法用于异步结果回调;
2.
queuePacket构造packet入队outgoingQueue
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration){
Packet packet = null;
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
// AsyncCallback默认为空
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// 数据包入队 需要发送的队列
outgoingQueue.add(packet);
}
}
// selector.wakeup();
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
-
new Packet
构造Packet(包含Request/Response/Watcher等); -
outgoingQueue.add(packet)
入队outgoingQueue,SendThread.run中进行处理; -
wakeupCnxn()
唤醒多路复用器立即进行select;
3.
同步处理流程阻塞
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
// packet响应后置为true,否则 wait
while (!packet.finished) {
packet.wait();
}
}
return r;
}
-
packet.wait()
阻塞等待packet响应后进行notify,然后同步返回;
4.
异步处理流程结束
- 异步流程同步结束,等待服务端响应后回调processResult;
5.
doTransport
-
doTransport
先write请求包再read响应包,具体流程跟启动流程类似,参考上一节的分析;
6.
readResponse处理响应
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
// 反序列化响应头
replyHdr.deserialize(bbia, "header");
......
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response to the first request!
* 由于请求是按顺序处理的,因此响应也要按顺序处理
*/
try {
// 比对响应xid 跟 原请求xid是否相等,保证顺序性
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
// 设置响应头
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
// 反序列化响应体
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
// packet响应反序列化后,处理watch注册
finishPacket(packet);
}
}
-
replyHdr.deserialize
反序列化ReplyHeader; -
pendingQueue.remove()
从pendingQueue中移除Packet; -
packet.requestHeader.getXid() != replyHdr.getXid()
比对响应xid 跟 原请求xid是否相等,不相等抛出异常,保证请求处理的顺序性; -
packet.response.deserialize
反序列化响应体;
7.
finishPacket
private void finishPacket(Packet p) {
// submitRequest时的wcb,watcher不为空时p.watchRegistration != null
if (p.watchRegistration != null) {
// 注册watcher
p.watchRegistration.register(p.replyHeader.getErr());
}
// 异步回调AsyncCallback为空,需要同步返回
if (p.cb == null) {
synchronized (p) {
// finished置为true,
p.finished = true;
// 唤醒submitRequest时线程
p.notifyAll();
}
} else {
// 异步返回,通过eventThread线程处理
p.finished = true;
// 加入阻塞队列waitingEvents (LinkedBlockingQueue)
eventThread.queuePacket(p);
}
}
-
p.watchRegistration.register
注册watcher,分别放到ZKWatchManager的三个Map中,用于在服务端触发相应事件时回调自定义的process方法;
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
-
p.notifyAll()
p.cb == null即异步回调AsyncCallback为空,唤醒submitRequest时线程,同步返回; -
eventThread.queuePacket(p)
加入阻塞队列waitingEvents (LinkedBlockingQueue线程安全,不需要synchronized),触发EventThread.run方法,异步返回;
8.
processEvent处理事件
private void processEvent(Object event) {
try {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
......
else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
-
p.response instanceof GetDataResponse
判断当前事件时什么类型; -
cb.processResult
回调processResult,执行用户自定义逻辑;
-------over-------
网友评论