美文网首页
zk源码阅读51:FinalRequestProcessor源码

zk源码阅读51:FinalRequestProcessor源码

作者: 赤子心_d709 | 来源:发表于2017-08-31 16:14 被阅读114次

    介绍

    用来进行客户端请求返回之前的操作,包括创建客户端请求的响应,针对事务请求,该处理还会负责将事务应用到内存数据库中去。是处理链的最后一环。

    属性

        private static final Logger LOG = LoggerFactory.getLogger(FinalRequestProcessor.class);
    
        ZooKeeperServer zks;//zk服务器
    

    函数

    shutdown

    不含线程方法,也没有nextProcessor,打个log就行

    public void shutdown() {
            // we are the final link in the chain
            LOG.info("shutdown of request processor complete");
        }
    

    构造函数

        public FinalRequestProcessor(ZooKeeperServer zks) {
            this.zks = zks;
        }
    

    processRequest

    针对事务请求,完成事务的处理
    对于非事务请求,完成对应处理
    serverStat进行对应更新,延迟,数量等等
    cnxn发送回复消息

    源码如下

        public void processRequest(Request request) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processing request:: " + request);
            }
            // request.addRQRec(">final");
            long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
            if (request.type == OpCode.ping) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
            }
            ProcessTxnResult rc = null;
            synchronized (zks.outstandingChanges) {//从outstandingChanges遍历
                while (!zks.outstandingChanges.isEmpty()
                        && zks.outstandingChanges.get(0).zxid <= request.zxid) {// outstandingChanges不为空且首个元素的zxid小于请求的zxid
                    ChangeRecord cr = zks.outstandingChanges.remove(0);//之前在PrepRequestProcessor.addChangeRecord()时生产该队列
                    if (cr.zxid < request.zxid) {//只会出现>= (当前请求是事务请求是,按照顺序应该是=,如果不是的话,应该是>)
                        LOG.warn("Zxid outstanding "
                                + cr.zxid
                                + " is less than current " + request.zxid);
                    }
                    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                        zks.outstandingChangesForPath.remove(cr.path);
                    }
                }
                if (request.hdr != null) {//如果是事务
                   TxnHeader hdr = request.hdr;
                   Record txn = request.txn;
    
                   rc = zks.processTxn(hdr, txn);//处理事务,得到结果
                }
                // do not add non quorum packets to the queue.
                if (Request.isQuorum(request.type)) {//事务请求
                    zks.getZKDatabase().addCommittedProposal(request);
                }
            }
    
            if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {//处理closeSession
                ServerCnxnFactory scxn = zks.getServerCnxnFactory();
                // this might be possible since
                // we might just be playing diffs from the leader
                if (scxn != null && request.cnxn == null) {
                    // calling this if we have the cnxn results in the client's
                    // close session response being lost - we've already closed
                    // the session/socket here before we can send the closeSession
                    // in the switch block below
                    scxn.closeSession(request.sessionId);//关闭ServerCnxn
                    return;
                }
            }
    
            if (request.cnxn == null) {
                return;
            }
            ServerCnxn cnxn = request.cnxn;
    
            String lastOp = "NA";
            zks.decInProcess();
            Code err = Code.OK;
            Record rsp = null;
            boolean closeSession = false;
            try {
                if (request.hdr != null && request.hdr.getType() == OpCode.error) {
                    throw KeeperException.create(KeeperException.Code.get((
                            (ErrorTxn) request.txn).getErr()));
                }
    
                KeeperException ke = request.getException();
                if (ke != null && request.type != OpCode.multi) {
                    throw ke;//有异常就抛异常
                }
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}",request);
                }
                switch (request.type) {
                case OpCode.ping: {
                    zks.serverStats().updateLatency(request.createTime);//更新延迟统计数据
    
                    lastOp = "PING";
                    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                            request.createTime, System.currentTimeMillis());//用回复更新同种统计
    
                    cnxn.sendResponse(new ReplyHeader(-2,
                            zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");//发送回复
                    return;
                }
                case OpCode.createSession: {
                    zks.serverStats().updateLatency(request.createTime);
    
                    lastOp = "SESS";
                    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                            request.createTime, System.currentTimeMillis());
    
                    zks.finishSessionInit(request.cnxn, true);//结束会话初始化
                    return;
                }
                case OpCode.multi: {
                    lastOp = "MULT";
                    rsp = new MultiResponse() ;
    
                    for (ProcessTxnResult subTxnResult : rc.multiResult) {
    
                        OpResult subResult ;
    
                        switch (subTxnResult.type) {
                            case OpCode.check:
                                subResult = new CheckResult();
                                break;
                            case OpCode.create:
                                subResult = new CreateResult(subTxnResult.path);
                                break;
                            case OpCode.delete:
                                subResult = new DeleteResult();
                                break;
                            case OpCode.setData:
                                subResult = new SetDataResult(subTxnResult.stat);
                                break;
                            case OpCode.error:
                                subResult = new ErrorResult(subTxnResult.err) ;
                                break;
                            default:
                                throw new IOException("Invalid type of op");
                        }
    
                        ((MultiResponse)rsp).add(subResult);
                    }
    
                    break;
                }
                case OpCode.create: {
                    lastOp = "CREA";
                    rsp = new CreateResponse(rc.path);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.delete: {
                    lastOp = "DELE";
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.setData: {
                    lastOp = "SETD";
                    rsp = new SetDataResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.setACL: {
                    lastOp = "SETA";
                    rsp = new SetACLResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.closeSession: {
                    lastOp = "CLOS";
                    closeSession = true;
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.sync: {
                    lastOp = "SYNC";
                    SyncRequest syncRequest = new SyncRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            syncRequest);
                    rsp = new SyncResponse(syncRequest.getPath());
                    break;
                }
                case OpCode.check: {
                    lastOp = "CHEC";
                    rsp = new SetDataResponse(rc.stat);
                    err = Code.get(rc.err);
                    break;
                }
                case OpCode.exists: {
                    lastOp = "EXIS";
                    // TODO we need to figure out the security requirement for this!
                    ExistsRequest existsRequest = new ExistsRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            existsRequest);
                    String path = existsRequest.getPath();
                    if (path.indexOf('\0') != -1) {
                        throw new KeeperException.BadArgumentsException();
                    }
                    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                            .getWatch() ? cnxn : null);
                    rsp = new ExistsResponse(stat);
                    break;
                }
                case OpCode.getData: {//getData请求
                    lastOp = "GETD";
                    GetDataRequest getDataRequest = new GetDataRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getDataRequest);//反序列化出getDataRequest
                    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }//验证path对应的node是否存在
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                            ZooDefs.Perms.READ,
                            request.authInfo);//验证ACL权限
                    Stat stat = new Stat();
                    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                            getDataRequest.getWatch() ? cnxn : null);//如果有watch标志位,Watcher就传cnxn
                    rsp = new GetDataResponse(b, stat);
                    break;
                }
                case OpCode.setWatches: {
                    lastOp = "SETW";
                    SetWatches setWatches = new SetWatches();
                    // XXX We really should NOT need this!!!!
                    request.request.rewind();
                    ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                    long relativeZxid = setWatches.getRelativeZxid();
                    zks.getZKDatabase().setWatches(relativeZxid, 
                            setWatches.getDataWatches(), 
                            setWatches.getExistWatches(),
                            setWatches.getChildWatches(), cnxn);
                    break;
                }
                case OpCode.getACL: {
                    lastOp = "GETA";
                    GetACLRequest getACLRequest = new GetACLRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getACLRequest);
                    Stat stat = new Stat();
                    List<ACL> acl = 
                        zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
                    rsp = new GetACLResponse(acl, stat);
                    break;
                }
                case OpCode.getChildren: {
                    lastOp = "GETC";
                    GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getChildrenRequest);
                    DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                            ZooDefs.Perms.READ,
                            request.authInfo);
                    List<String> children = zks.getZKDatabase().getChildren(
                            getChildrenRequest.getPath(), null, getChildrenRequest
                                    .getWatch() ? cnxn : null);
                    rsp = new GetChildrenResponse(children);
                    break;
                }
                case OpCode.getChildren2: {
                    lastOp = "GETC";
                    GetChildren2Request getChildren2Request = new GetChildren2Request();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getChildren2Request);
                    Stat stat = new Stat();
                    DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                            ZooDefs.Perms.READ,
                            request.authInfo);
                    List<String> children = zks.getZKDatabase().getChildren(
                            getChildren2Request.getPath(), stat, getChildren2Request
                                    .getWatch() ? cnxn : null);
                    rsp = new GetChildren2Response(children, stat);
                    break;
                }
                }
            } catch (SessionMovedException e) {
                // session moved is a connection level error, we need to tear
                // down the connection otw ZOOKEEPER-710 might happen
                // ie client on slow follower starts to renew session, fails
                // before this completes, then tries the fast follower (leader)
                // and is successful, however the initial renew is then 
                // successfully fwd/processed by the leader and as a result
                // the client and leader disagree on where the client is most
                // recently attached (and therefore invalid SESSION MOVED generated)
                cnxn.sendCloseSession();
                return;
            } catch (KeeperException e) {
                err = e.code();
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process " + request, e);
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                bb.rewind();
                while (bb.hasRemaining()) {
                    sb.append(Integer.toHexString(bb.get() & 0xff));
                }
                LOG.error("Dumping request buffer: 0x" + sb.toString());
                err = Code.MARSHALLINGERROR;
            }
    
            long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
            ReplyHeader hdr =
                new ReplyHeader(request.cxid, lastZxid, err.intValue());
    
            zks.serverStats().updateLatency(request.createTime);
            cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
                        request.createTime, System.currentTimeMillis());
    
            try {
                cnxn.sendResponse(hdr, rsp, "response");//发送回复
                if (closeSession) {
                    cnxn.sendCloseSession();
                }
            } catch (IOException e) {
                LOG.error("FIXMSG",e);
            }
        }
    

    思考

    processRequest里面,事务和非事务的处理方式

    事务的话,统一调用 zks.processTxn(hdr, txn), 底层dataTree统一处理
    非事务的话,在当前类写了一堆代码,比如getData请求

    非事务请求,单独处理

    事务日志记了什么

    只要产生了事务请求,即使还没有提交,都会记录到事务日志中去
    即使提交,并且最终处理了,也不会有什么日志变化
    代码中 zks.getZKDatabase().addCommittedProposal(request); 并没有修改,生成日志文件

    问题

    事务请求最终提交,处理完生效,为什么不需要日志了

    前面没提交的时候生成事务日志,这里完成成功了,倒不需要记录了?

    refer

    http://www.cnblogs.com/leesf456/p/6472496.html

    相关文章

      网友评论

          本文标题:zk源码阅读51:FinalRequestProcessor源码

          本文链接:https://www.haomeiwen.com/subject/ebpqjxtx.html