集群处理请求分两种:事务和非事务,对于非事务,请求处理和单机类似,节点本地就可以完成数据的请求;事务请求需要提交给Leader处理
,Leader以投票的形式,等待半数的Follower的投票,完成同步后才将操作结果返回
。
这里,无论什么模式、节点类型,处理客户端请求的都是ServerCnxnFactory的子类,默认为NIOServerCnxnFactory,只是其内部处理调用链的zkServer实例不同,单机模式为ZooKeeperServer的实例,其他类型的节点使用ZooKeeperServer类的子类. ZooKeeperServer的子类UML类图如下:
zookeeper请求处理的各个processor.jpg
1 这么多ZooKeeperServer的子类,一个事物请求来了,调用什么方法去处理事物
1.1 在org.apache.zookeeper.server.quorum.QuorumPeer#run方法中,首先确定角色。
while (running) {
switch (getPeerState()) {
case LOOKING:
"省略n行代码====================="
"当peerState是LOOKING的时候,进行选举投票,选举出leader"
setCurrentVote(makeLEStrategy().lookForLeader());
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
"当选举过之后是OBSERVING 状态,那么实例化的是ObserverZooKeeperServer"
setObserver(makeObserver(logFactory));
"同步leader"
observer.observeLeader();
}
"省略n行代码====================="
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
"当选举过之后是FOLLOWING状态,那么实例化的是
FollowerZooKeeperServer"
setFollower(makeFollower(logFactory));
"同步leader"
follower.followLeader();
}
"省略n行代码====================="
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
}
"省略n行代码====================="
break;
}
}
从上面的代码可以看出,在选举成功之后,就确定了,每个服务器是什么状态,也就确定是什么ZooKeeperServer实例。
1.2 对于每个ZooKeeperServer实例,他的业务处理链是不同的。
责任链由setupRequestProcessors方法确定
比如FollowerZooKeeperServer实例
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
firstProcessor 是FollowerRequestProcessor,下一个是CommitProcessor,再下一个是
FinalRequestProcessor。而且还另外聚合了SyncRequestProcessor,下一个是SendAckRequestProcessor 。
1.3 那这个处理链是什么时候确定的呢?
case OBSERVING:
try {
"实例化ObserverZookeeperServer"
setObserver(makeObserver(logFactory));
"从 leader那边同步代码,并且完成ObserverZookeeperServer的初始化,包括责任链的建立"
observer.observeLeader();
}
"省略n行代码====================="
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
"实例化FollowerZookeeperServer"
setFollower(makeFollower(logFactory));
"从 leader那边同步代码,并且完成FollowerZookeeperServer的初始化,包括责任链的建立"
follower.followLeader();
}
"省略n行代码====================="
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
}
break;
}
在org.apache.zookeeper.server.quorum.Learner#syncWithLeader的469行 zk.startup(); org.apache.zookeeper.server.quorum.Leader#lead 431 行 startZkServer();
调用下面的setupRequestProcessors方法,构建责任链。
因为各自都继承了zookeeperServer(继承结构看上图),并且重写了setupRequestProcessors方法。
所以这里实际上是调用了各种ZookeeperServer实例的setupRequestProcessors方法。
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();
registerJMX();
setState(State.RUNNING);
notifyAll();
}
2 请求的入口在哪里?
"org.apache.zookeeper.server.NIOServerCnxnFactory#run"
当读写事件就绪时,NIOServerCnxn对象进行IO任务。
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
}
"org.apache.zookeeper.server.ZooKeeperServer#processPacket"
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
"丢给对应的firstProcessor去处理,事物逻辑。
而对于不同的角色,比如说leader,对应的是LeaderZooKeeperServer
,而 follower对应的是FollowerZookeeperServer"
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
}
submitRequest之后,已经丢给具体的责任链去处理了。而不同角色,不同的ZOokeeperServer实例,对应的firstProcessor是不同的。
大概流程图如下:
zookeeper事务处理的事务流程.png
3 当客户端请求到达leader的时候,事物的流程是怎么样的?
leader的firstProcessor,从LeaderZooKeeperServer#setupRequestProcessors方法中,可以看出PrepRequestProcessor是firstProcessor
。processRequest方法,只是把request对象添加到submittedRequests阻塞队列中。业务处理在run方法中。
3.1 PrepRequestProcessor 对事物请求加事物头,非事物请求,checkSession
while (true) {
Request request = submittedRequests.take();
pRequest(request);
}
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
"如果是 create,delete ,set等改变内存数据库,zkDatabase的请求,转化成事物请求"
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
"省略n行代码====================="
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
"因为zookeeper的事物请求都是leader处理的,
所以他的分布式唯一id,
只要在leader侧ks.getNextZxid(),
唯一即可。AtomicLong 类型,保证事物请求并发时,线程安全。"
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
break;
"对内存数据库zkDatabase不做改变的,就checkSession"
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
3.2 ProposalRequestProcessor。对事物请求和非事物请求分流
ProposalRequestProcessor#processRequest
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
"非事物请求交给下一个CommitProcessor"
nextProcessor.processRequest(request);
"hdr不为空,说明是事物请求,委托给leader,发送proposal消息"
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
"并且自己先持久化到txnLog日志里面"
syncProcessor.processRequest(request);
}
}
3.2.2 首先讲一下,事物消息刷新到txnLog的过程。syncProcessor处理持久化事物日志的过程。
public void run() {
try {
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
"不让所有的zookeeperServer一起发起快照"
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
"第一轮循环是toFlush为空,进入下面的si!=null 的判断"
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
"调用FileTxnLog#append,追加transaction log 到日志文件"
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
"第一轮最后,添加到toFlush,第二轮就会flush,也就是hit the disk "
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
下面是flush方法
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
"这里的commit才是真正的hit the disk , "
"而前面的append只是加入到groupcommit的数组中"
"LinkedList<FileOutputStream> streamsToFlush"
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
"这里进入AckRequestProcessor"
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
tip:
磁盘写缓存只有强制事务日志刷到磁盘后,server才能对proposal进行ack操作。说得更明白一点,server会调用ZKDatabase的commit方法,这最终会调用FileChannel.force方法。这样,server会在ack之前保证事务已被持久化到磁盘。关于此事还有一点要注意,现代磁盘有一个写缓存,可以保存要写到磁盘的数据。如果启用了写缓存,强行刷新不能保证返回的时候数据已落到磁盘,数据会落到写缓存中。为了保证在FileChannel.force()返回后数据落到磁盘,要禁用写磁盘缓存。操作系统有许多方式可以禁用
3.2.1 发送proposal消息
Leader#propose
public Proposal propose(Request request) throws XidRolloverException {
"省略n行代码======================"
"packetType =PROPOSAL "
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
lastProposed = p.packet.getZxid();
"ConcurrentMap<Long, Proposal> outstandingProposals 发送出去的提案"
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}
void sendPacket(QuorumPacket qp) {
"HashSet<LearnerHandler> forwardingFollowers"
"给所有的follower发送proposal消息"
"LearnerHandler 是leader和learner的通讯纽带"
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
3.3 AckRequestProcessor 判断ackSet是否过半。如果是,就发送COMMIT和INFORM 消息
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
Leader#processAck
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
"省略n行代码======================"
"前面ProposalProcessor已经放进去一个了"
Proposal p = outstandingProposals.get(zxid);
"leader添加自己到ackSet中,自己肯定投自己的提案,只要再接受一个ACK就"
"可以通过了"
p.ackSet.add(sid);
"判断是否过半"
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
这儿有个疑问
就是加入执行过半判断,还没有接受到过半的ACK怎么办?直接认定失败?
3.4 follower.followLeader的时候,接受PROPOSAL消息
Follower#followLeader
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
Follower#processPacket
"处理PROPOSAL消息"
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
"进入FollowerZooKeeperServer#logRequest方法,进入syncProcessor"
"然后进入SyncRequestProcessor#flush方法,进入下一个Processor"
fzk.logRequest(hdr, txn);
break;
3.5 SendAckRequestProcessor follower发送ACK 给leader,发现observer是没有发的
public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
3.6 LearnerHandler接受ACK
LearnerHandler#run 576 行
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
"这里的sid,指的就是发送ACK对应的Follower"
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
然后Leader#processAck方法,把上面的follower的sid,假如到对应Proposal的ackSet中。
Leader#processAck
"过半之后"
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
"将要被使用的proposal"
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
"给follower发送COMMIT 消息,给observer发送INFORM消息"
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
3.6.1 follower接受COMMIT消息,并且commit
Follower#processPacket
case Leader.COMMIT:
fzk.commit(qp.getZxid());
CommitProcessor
CommitProcessor#run
public void run() {
try {
Request nextPending = null;
while (!finished) {
"第一轮循环toProcess长度是0 ,committedRequests长度是1 "
int len = toProcess.size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
"第一轮循环toProcess长度是0 ,committedRequests长度是1 走这里,加入到toProcess "
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
3.6.2 然后就是follower 的FinalRequestProcessor
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ cr.zxid
+ " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
"进入ZkDatabase处理txn"
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
以及构建response对象返回。
leader侧同上。
4 假如request是发送到follower或者observer的话,发送REQUEST消息给leader,让leader来处理
FollowerRequestProcessor#run
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
"发送REQUEST消息给leader"
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
"发送REQUEST消息给leader"
zks.getFollower().request(request);
break;
}
}
网友评论