美文网首页
zk源码阅读45:PrepRequestProcessor源码解

zk源码阅读45:PrepRequestProcessor源码解

作者: 赤子心_d709 | 来源:发表于2017-08-28 18:47 被阅读228次

    摘要

    本节讲解PrepRequestProcessor,作为Leader的第一个请求处理器
    讲解

    简介
    主要属性
    主要函数
      构造函数
      processRequest:生产者,将请求放入队列
      run:线程方法,消费请求队列,调用pRequest
      pRequest:处理请求,根据request.type区分是否是事务请求,事务的话调用pRequest2Txn.最后让下一个处理器接着处理
      pRequest2Txn:处理事务请求,生成TxnHeader,完成acl,path验证,进行count,version修改等
    思考  
    

    简介

    请求预处理器。在Zookeeper中,那些会改变服务器状态的请求称为事务请求(创建节点、更新数据、删除节点、创建会话等),PrepRequestProcessor能够识别出当前客户端请求是否是事务请求。对于事务请求,PrepRequestProcessor处理器会对其进行一系列预处理,如创建请求事务头、事务体、会话检查、ACL检查和版本检查等。

    在Leader中,所处的位置如下


    PrepRequestProcessor在leader请求链中的位置

    主要属性

        LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();//提交请求的队列
    
        RequestProcessor nextProcessor;//下一个处理器
    
        ZooKeeperServer zks;//zk服务器
    

    主要函数

    构造函数

        public PrepRequestProcessor(ZooKeeperServer zks,
                RequestProcessor nextProcessor) {
            super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                    + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
            this.nextProcessor = nextProcessor;//下一个处理器
            this.zks = zks;//zk服务器
        }
    

    processRequest

    processRequest实现父接口,将请求生产到submittedRequests队列

        public void processRequest(Request request) {//生产到submittedRequests
            // request.addRQRec(">prep="+zks.outstandingChanges.size());
            submittedRequests.add(request);
        }
    

    run

    核心方法,不断从submittedRequests队列取出请求进行消费

        public void run() {
            try {
                while (true) {
                    Request request = submittedRequests.take();//消费线程,取出队列
                    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                    if (request.type == OpCode.ping) {
                        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                    }
                    if (Request.requestOfDeath == request) {//接收到requestOfDeath退出线程
                        break;
                    }
                    pRequest(request);//处理请求
                }
            } catch (RequestProcessorException e) {
                if (e.getCause() instanceof XidRolloverException) {
                    LOG.info(e.getCause().getMessage());
                }
                handleException(this.getName(), e);
            } catch (Exception e) {
                handleException(this.getName(), e);
            }
            LOG.info("PrepRequestProcessor exited loop!");
        }
    

    里面调用了pRequest函数

    pRequest

    处理请求,根据request.type区分是否是事务请求,是的话生成对应请求,调用pRequest2Txn,最后调用nextProcessor.processRequest(request);让下一个处理器处理。 源码如下

        protected void pRequest(Request request) throws RequestProcessorException {//处理请求,根据request.type区分是否是事务请求,是的话生成对应请求,调用pRequest2Txn,最后调用nextProcessor.processRequest(request);让下一个处理器处理
            // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
            // request.type + " id = 0x" + Long.toHexString(request.sessionId));
            request.hdr = null;
            request.txn = null;
            
            try {
                switch (request.type) {//根据request.type区分是否是事务请求
                    case OpCode.create:
                    CreateRequest createRequest = new CreateRequest();
                    pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);//如果是事务请求,调用pRequest2Txn
                    break;
                case OpCode.delete:
                    DeleteRequest deleteRequest = new DeleteRequest();               
                    pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);//如果是事务请求,调用pRequest2Txn
                    break;
                case OpCode.setData:
                    SetDataRequest setDataRequest = new SetDataRequest();                
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);//如果是事务请求,调用pRequest2Txn
                    break;
                case OpCode.setACL:
                    SetACLRequest setAclRequest = new SetACLRequest();                
                    pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);//如果是事务请求,调用pRequest2Txn
                    break;
                case OpCode.check:
                    CheckVersionRequest checkRequest = new CheckVersionRequest();              
                    pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);//如果是事务请求,调用pRequest2Txn
                    break;
                case OpCode.multi:
                    MultiTransactionRecord multiRequest = new MultiTransactionRecord();
                    try {
                        ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
                    } catch(IOException e) {
                       request.hdr =  new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                                zks.getTime(), OpCode.multi);
                       throw e;
                    }
                    List<Txn> txns = new ArrayList<Txn>();
                    //Each op in a multi-op must have the same zxid!
                    long zxid = zks.getNextZxid();
                    KeeperException ke = null;
    
                    //Store off current pending change records in case we need to rollback
                    HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
    
                    int index = 0;
                    for(Op op: multiRequest) {
                        Record subrequest = op.toRequestRecord() ;
    
                        /* If we've already failed one of the ops, don't bother
                         * trying the rest as we know it's going to fail and it
                         * would be confusing in the logfiles.
                         */
                        if (ke != null) {
                            request.hdr.setType(OpCode.error);
                            request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                        } 
                        
                        /* Prep the request and convert to a Txn */
                        else {
                            try {
                                pRequest2Txn(op.getType(), zxid, request, subrequest, false);//如果是事务请求,调用pRequest2Txn
                            } catch (KeeperException e) {
                                if (ke == null) {
                                    ke = e;
                                }
                                request.hdr.setType(OpCode.error);
                                request.txn = new ErrorTxn(e.code().intValue());
                                LOG.info("Got user-level KeeperException when processing "
                                        + request.toString() + " aborting remaining multi ops."
                                        + " Error Path:" + e.getPath()
                                        + " Error:" + e.getMessage());
    
                                request.setException(e);
    
                                /* Rollback change records from failed multi-op */
                                rollbackPendingChanges(zxid, pendingChanges);
                            }
                        }
    
                        //FIXME: I don't want to have to serialize it here and then
                        //       immediately deserialize in next processor. But I'm 
                        //       not sure how else to get the txn stored into our list.
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                        request.txn.serialize(boa, "request") ;
                        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
    
                        txns.add(new Txn(request.hdr.getType(), bb.array()));
                        index++;
                    }
    
                    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
                    request.txn = new MultiTxn(txns);
                    
                    break;
    
                //create/close session don't require request record
                case OpCode.createSession:
                case OpCode.closeSession:
                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                    break;
     
                //All the rest don't need to create a Txn - just verify session
                //剩下的都不是事务请求,验证session即可
                case OpCode.sync:
                case OpCode.exists:
                case OpCode.getData:
                case OpCode.getACL:
                case OpCode.getChildren:
                case OpCode.getChildren2:
                case OpCode.ping:
                case OpCode.setWatches:
                    zks.sessionTracker.checkSession(request.sessionId,
                            request.getOwner());
                    break;
                }
            } catch (KeeperException e) {
                if (request.hdr != null) {
                    request.hdr.setType(OpCode.error);
                    request.txn = new ErrorTxn(e.code().intValue());
                }
                LOG.info("Got user-level KeeperException when processing "
                        + request.toString()
                        + " Error Path:" + e.getPath()
                        + " Error:" + e.getMessage());
                request.setException(e);
            } catch (Exception e) {
                // log at error level as we are returning a marshalling
                // error to the user
                LOG.error("Failed to process " + request, e);
    
                StringBuilder sb = new StringBuilder();
                ByteBuffer bb = request.request;
                if(bb != null){
                    bb.rewind();
                    while (bb.hasRemaining()) {
                        sb.append(Integer.toHexString(bb.get() & 0xff));
                    }
                } else {
                    sb.append("request buffer is null");
                }
    
                LOG.error("Dumping request buffer: 0x" + sb.toString());
                if (request.hdr != null) {
                    request.hdr.setType(OpCode.error);
                    request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
                }
            }
            request.zxid = zks.getZxid();
            nextProcessor.processRequest(request);//下一个处理器接着处理
        }
    

    比如识别request.type为create的话,是一个事务请求,创建CreateRequest,调用pRequest2Txn,最后调用nextProcessor.processRequest让下一个处理器处理

    pRequest2Txn

    生成事务头,完成acl,path验证,进行count,version对应修改,将改动记录在zks.outstandingChanges 等

    protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
            throws KeeperException, IOException, RequestProcessorException//生成事务头,完成acl,path验证,进行count,version对应修改,将改动记录在zks.outstandingChanges 等
        {
            request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                        zks.getTime(), type);//
    
            switch (type) {
                case OpCode.create:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    CreateRequest createRequest = (CreateRequest)record;
                    if(deserialize)
                        ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                    String path = createRequest.getPath();
                    int lastSlash = path.lastIndexOf('/');
                    if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
                        LOG.info("Invalid path " + path + " with session 0x" +
                                Long.toHexString(request.sessionId));
                        throw new KeeperException.BadArgumentsException(path);
                    }
                    List<ACL> listACL = removeDuplicates(createRequest.getAcl());
                    if (!fixupACL(request.authInfo, listACL)) {//验证acl的id格式是否valid???
                        throw new KeeperException.InvalidACLException(path);
                    }
                    String parentPath = path.substring(0, lastSlash);
                    ChangeRecord parentRecord = getRecordForPath(parentPath);
                    checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
                            request.authInfo);//验证是否有create权限
                    int parentCVersion = parentRecord.stat.getCversion();
                    CreateMode createMode =
                        CreateMode.fromFlag(createRequest.getFlags());//根据标志位判断是哪一种createMode
                    if (createMode.isSequential()) {
                        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);//如果是序列的,那么根据parent的cversion来命名path
                    }
                    validatePath(path, request.sessionId);
                    try {
                        if (getRecordForPath(path) != null) {
                            throw new KeeperException.NodeExistsException(path);
                        }
                    } catch (KeeperException.NoNodeException e) {
                        // ignore this one
                    }
                    boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
                    if (ephemeralParent) {
                        throw new KeeperException.NoChildrenForEphemeralsException(path);//ephemeral的节点不允许有child
                    }
                    int newCversion = parentRecord.stat.getCversion()+1;
                    request.txn = new CreateTxn(path, createRequest.getData(),
                            listACL,
                            createMode.isEphemeral(), newCversion);
                    StatPersisted s = new StatPersisted();
                    if (createMode.isEphemeral()) {
                        s.setEphemeralOwner(request.sessionId);//如果createMode是临时的,那么设置临时owner为当前sessionId
                    }
                    parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                    parentRecord.childCount++;
                    parentRecord.stat.setCversion(newCversion);
                    addChangeRecord(parentRecord);
                    addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                            0, listACL));
                    break;
                case OpCode.delete:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    DeleteRequest deleteRequest = (DeleteRequest)record;
                    if(deserialize)
                        ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
                    path = deleteRequest.getPath();
                    lastSlash = path.lastIndexOf('/');
                    if (lastSlash == -1 || path.indexOf('\0') != -1
                            || zks.getZKDatabase().isSpecialPath(path)) {
                        throw new KeeperException.BadArgumentsException(path);
                    }
                    parentPath = path.substring(0, lastSlash);
                    parentRecord = getRecordForPath(parentPath);
                    ChangeRecord nodeRecord = getRecordForPath(path);
                    checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,
                            request.authInfo);
                    int version = deleteRequest.getVersion();
                    if (version != -1 && nodeRecord.stat.getVersion() != version) {
                        throw new KeeperException.BadVersionException(path);
                    }
                    if (nodeRecord.childCount > 0) {
                        throw new KeeperException.NotEmptyException(path);
                    }
                    request.txn = new DeleteTxn(path);
                    parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                    parentRecord.childCount--;
                    addChangeRecord(parentRecord);
                    addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
                            null, -1, null));
                    break;
                case OpCode.setData:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    SetDataRequest setDataRequest = (SetDataRequest)record;
                    if(deserialize)
                        ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
                    path = setDataRequest.getPath();
                    validatePath(path, request.sessionId);
                    nodeRecord = getRecordForPath(path);
                    checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
                            request.authInfo);
                    version = setDataRequest.getVersion();
                    int currentVersion = nodeRecord.stat.getVersion();
                    if (version != -1 && version != currentVersion) {
                        throw new KeeperException.BadVersionException(path);
                    }
                    version = currentVersion + 1;
                    request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
                    nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                    nodeRecord.stat.setVersion(version);
                    addChangeRecord(nodeRecord);
                    break;
                case OpCode.setACL:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    SetACLRequest setAclRequest = (SetACLRequest)record;
                    if(deserialize)
                        ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
                    path = setAclRequest.getPath();
                    validatePath(path, request.sessionId);
                    listACL = removeDuplicates(setAclRequest.getAcl());
                    if (!fixupACL(request.authInfo, listACL)) {
                        throw new KeeperException.InvalidACLException(path);
                    }
                    nodeRecord = getRecordForPath(path);
                    checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN,
                            request.authInfo);
                    version = setAclRequest.getVersion();
                    currentVersion = nodeRecord.stat.getAversion();
                    if (version != -1 && version != currentVersion) {
                        throw new KeeperException.BadVersionException(path);
                    }
                    version = currentVersion + 1;
                    request.txn = new SetACLTxn(path, listACL, version);
                    nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                    nodeRecord.stat.setAversion(version);
                    addChangeRecord(nodeRecord);
                    break;
                case OpCode.createSession:
                    request.request.rewind();
                    int to = request.request.getInt();
                    request.txn = new CreateSessionTxn(to);
                    request.request.rewind();
                    zks.sessionTracker.addSession(request.sessionId, to);
                    zks.setOwner(request.sessionId, request.getOwner());
                    break;
                case OpCode.closeSession:
                    // We don't want to do this check since the session expiration thread
                    // queues up this operation without being the session owner.
                    // this request is the last of the session so it should be ok
                    //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    HashSet<String> es = zks.getZKDatabase()
                            .getEphemerals(request.sessionId);//获取sessionId对应的临时节点的路径列表
                    synchronized (zks.outstandingChanges) {
                        for (ChangeRecord c : zks.outstandingChanges) {//遍历 zk serve的事务变更队列,这些事务处理尚未完成,没有应用到内存数据库中
                            if (c.stat == null) {//如果当前变更记录没有状态信息(删除时才会出现,参照上面处理delete时的ChangeRecord构造参数)
                                // Doing a delete
                                es.remove(c.path);//避免多次删除
                            } else if (c.stat.getEphemeralOwner() == request.sessionId) {//如果变更节点是临时的,且源于当前sessionId(只有创建和修改时,stat不会为null)
                                es.add(c.path);//添加记录,最终要将添加或者修改的record再删除掉
                            }
                        }
                        for (String path2Delete : es) {//添加节点变更事务,将es中所有路径的临时节点都删掉
                            addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                    path2Delete, null, 0, null));
                        }
    
                        zks.sessionTracker.setSessionClosing(request.sessionId);
                    }
    
                    LOG.info("Processed session termination for sessionid: 0x"
                            + Long.toHexString(request.sessionId));
                    break;
                case OpCode.check:
                    zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                    CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
                    if(deserialize)
                        ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
                    path = checkVersionRequest.getPath();
                    validatePath(path, request.sessionId);
                    nodeRecord = getRecordForPath(path);
                    checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ,
                            request.authInfo);
                    version = checkVersionRequest.getVersion();
                    currentVersion = nodeRecord.stat.getVersion();
                    if (version != -1 && version != currentVersion) {
                        throw new KeeperException.BadVersionException(path);
                    }
                    version = currentVersion + 1;
                    request.txn = new CheckVersionTxn(path, version);
                    break;
            }
        }
    

    思考

    PrepRequestProcessor中processRequest函数是被谁调用的

    ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)
    RequestProcessor#processRequest

    哪些请求是事务请求

    按照PrepRequestProcessor#pRequest的写法

    事务请求是 create,delete,setData,setACL,check,multi,createSession,closeSession
    非事务请求是 sync,exists,getData,getACL,getChildren,getChildren2,ping,setWatches.
    

    pRequest2Txn函数调用addChangeRecord会添加记录到zks.outstandingChanges,何时删除

    PrepRequestProcessor#rollbackPendingChanges 或者
    FinalRequestProcessor#processRequest

    refer

    http://www.cnblogs.com/leesf456/p/6412843.html

    相关文章

      网友评论

          本文标题:zk源码阅读45:PrepRequestProcessor源码解

          本文链接:https://www.haomeiwen.com/subject/dgyolxtx.html