美文网首页
Zookeeper(五)-服务端单机模式-事务请求处理

Zookeeper(五)-服务端单机模式-事务请求处理

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-12-28 11:43 被阅读0次

    概述

    本节分析下服务端请求处理流程(以NIO方式为例),主要通过责任链模式中的三个processor进行处理,PrepRequestProcessor ~ SyncRequestProcessor ~ FinalRequestProcessor;

    流程分析

    服务端-请求处理.png
    1. NIO服务端run循环
    public void run() {
        while (!ss.socket().isClosed()) {
            try {
                selector.select(1000);
                Set<SelectionKey> selected;
                synchronized (this) {
                    selected = selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                    ......
                    else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);
                    } 
                    ......
                }
                selected.clear();
            } 
            ......
        }
        ......
    }
    
    • selector.select(1000)每次select最多阻塞1s;
    • c.doIO(k)遍历事件集合,处理OP_READ事件;
    2. 处理readable事件
    void doIO(SelectionKey k) throws InterruptedException {
        try {
            ......
            if (k.isReadable()) {
                int rc = sock.read(incomingBuffer);
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip();
                        isPayload = readLength(k);
                        incomingBuffer.clear();
                    } else {
                        isPayload = true;
                    }
                    if (isPayload) { // not the case for 4letterword
                        readPayload();
                    } else {
                        return;
                    }
                }
            }
            ......
    }
    
    • readLength(k)先读取前4个字节(Packet长度);
    • readPayload()读取Packet除长度之外后面的内容;
    3. 处理Packet
    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");
        ......
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
                si.setOwner(ServerCnxn.me);
                submitRequest(si);
        ......
    }
    
    • h.deserialize(bia, "header")反序列化请求头header;
    • new Request构造Request,Request是代表RequestProcessor链上的请求的结构。在处理请求时,会将各种信息封装起来进行传递;
    • submitRequest(si)提交请求到RequestProcessor链上;加到PrepRequestProcessor的submittedRequests阻塞队列中;
    4. PrepRequestProcessor.run
    public void run() {
        try {
            while (true) {
                // submittedRequests 请求
                Request request = submittedRequests.take();
                ......
                if (Request.requestOfDeath == request) {
                    break;
                }
                // 主要用来生成写请求的事务记录
                pRequest(request);
            }
        }
        ......
    }
    
    • submittedRequests.take()submittedRequests为空时阻塞,处理Packet时Request入队;
    • pRequest(request)根据不同的OpCode进行不同的处理;
    5. PrepRequestProcessor.pRequest
    protected void pRequest(Request request) throws RequestProcessorException {
        ......
        try {
            switch (request.type) {
                case OpCode.create:
                CreateRequest createRequest = new CreateRequest();
                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
                break;
            ......
            case OpCode.getData:
                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                break;
            default:
                LOG.warn("unknown type " + request.type);
                break;
            }
        }
        request.zxid = zks.getZxid();
        nextProcessor.processRequest(request);
    }
    
    • case OpCode.createcreate/set等事务操作,先构造对应的Request,然后分别进行处理;
    • OpCode.getDatagetData/exists等非事务操作,只校验一下Session;
    • nextProcessor.processRequest(request)传递到SyncRequestProcessor;加入queuedRequests阻塞队列;
    6. PrepRequestProcessor.pRequest2Txn

    事务请求处理,不同类型请求处理不同,以setData为例大致流程如下:

    case OpCode.setData:
        zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        SetDataRequest setDataRequest = (SetDataRequest)record;
        if(deserialize)
            ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
        path = setDataRequest.getPath();
        validatePath(path, request.sessionId);
        nodeRecord = getRecordForPath(path);
        checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
        version = setDataRequest.getVersion();
        int currentVersion = nodeRecord.stat.getVersion();
        if (version != -1 && version != currentVersion) {
            throw new KeeperException.BadVersionException(path);
        }
        version = currentVersion + 1;
        request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
        nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
        nodeRecord.stat.setVersion(version);
        addChangeRecord(nodeRecord);
        break;
    
    • checkSessionSession检查;
    • ByteBufferInputStream.byteBuffer2Record反序列化具体Request;
    • checkACLACL权限校验
    • getRecordForPath(path)outstandingChangesForPath中没有时构造ChangeRecord,用于促进PrepRequestProcessor和FinalRequestProcessor之间的信息共享;
    • addChangeRecord(nodeRecord)nodeRecord入队outstandingChanges,同时放入outstandingChangesForPath (path - ChangeRecord);
    7. SyncRequestProcessor.run
    public void run() {
        try {
            int logCount = 0;
            setRandRoll(r.nextInt(snapCount/2));
            while (true) {
                Request si = null;
                // toFlush 已写入并等待刷新到磁盘的Request
                if (toFlush.isEmpty()) {
                    // queuedRequests 待处理的请求;queuedRequests为空阻塞;
                    si = queuedRequests.take();
                } else {
                    // queuedRequests为空返回空;
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
                    // 增量日志写入,添加一个请求到事务日志
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            setRandRoll(r.nextInt(snapCount/2));
                            // roll the log,切换日志,一个日志文件达到一定大小就会生成一个新的
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                // 生成新的snapshot文件
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        // 1000条传递一次
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }
    
    • snapCount生成新txnlog和snaplog文件的阈值,默认为100000;

    • setRandRoll设置randRoll为0 ~ snapCount / 2直接的随机数;

    • LinkedList<Request> toFlush已经写入buffer等待刷新到磁盘的Request集合,为了批量刷新磁盘,提升刷盘效率;

    • queuedRequests.take()toFlush为空时用take()阻塞;

    • queuedRequests.poll()toFlush不为空时用poll()取出,防止阻塞;

    • zks.getZKDatabase().append(si)最终调用到FileTxnLog.append(Zookeeper(三)-持久化分析过),append时会判断TxnHeader hdr是否为空:
      为空表示非事务请求直接返回false,然后传递到下一个Processor-FinalRequestProcessor进行处理;
      不为空表示事务请求,需要写txnlog;接着判断BufferedOutputStream logStream是否为空,为空则需要生成新的txnlog文件;

    • logCount++统计写入BufferedOutputStream请求数,切换新日志文件之后置0;

    • logCount > (snapCount / 2 + randRoll)logCount为snapCount / 2 ~ snapCount之间一个随机数时生成新日志文件,增加随机数为了防止多个节点同时生成日志文件,影响请求的处理;

    • zks.getZKDatabase().rollLog()BufferedOutputStream.flush,并置空,在append时就会生成新txnlog文件;

    • zks.takeSnapshot()创建单独的线程,生成新的snaplog文件;

    • else if (toFlush.isEmpty())toFlush为空表示toFlush队列中没有请求积压,即当前系统写请求不繁忙,此时直接flush到磁盘,不用批量提交;

    • toFlush.size() > 1000写请求繁忙时1000条一次进行批量commit,减少磁盘随机IO,提升效率;

    • flush(toFlush)写入磁盘,同时把Request传递到下一个Processor-FinalRequestProcessor进行处理;

    8. FinalRequestProcessor.processRequest
    public void processRequest(Request request) {
        ......
        ProcessTxnResult rc = null;
        synchronized (zks.outstandingChanges) {
            while (!zks.outstandingChanges.isEmpty()
                    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges.remove(0);
            }
            if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
               rc = zks.processTxn(hdr, txn);
            }
            if (Request.isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
        ......
        ServerCnxn cnxn = request.cnxn;
        String lastOp = "NA";
        zks.decInProcess();
        Code err = Code.OK;
        Record rsp = null;
        try {
            ......
            switch (request.type) {
            case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest);
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ, request.authInfo);
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
                rsp = new GetDataResponse(b, stat);
                break;
            }
            ......
    
        long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
        ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
        try {
            cnxn.sendResponse(hdr, rsp, "response");
            if (closeSession) {
                cnxn.sendCloseSession();
            }
        ......
    }
    
    • zks.outstandingChanges.remove(0)/zks.outstandingChangesForPath.remove(cr.path)从outstandingChanges和outstandingChangesForPath中移除头节点RecodeChanges;
    • zks.processTxn(hdr, txn)事务请求更新DataTree;(Zookeeper(二)-数据模型,具体逻辑以createNode为例分析过)
    • zks.getZKDatabase().addCommittedProposal(request)集群模式时:提交提案(以后详细分析);
    • new GetDataResponse组装具体响应对象;
    • new ReplyHeader组装响应头ReplyHeader;
    • cnxn.sendResponse(hdr, rsp, "response")通过NIOServerCnxn发送响应;
    9. 发送响应
    synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            // Make space for length
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            try {
                baos.write(fourBytes);
                bos.writeRecord(h, "header");
                if (r != null) {
                    bos.writeRecord(r, tag);
                }
                baos.close();
            } catch (IOException e) {
                LOG.error("Error serializing response");
            }
            byte b[] = baos.toByteArray();
            ByteBuffer bb = ByteBuffer.wrap(b);
            bb.putInt(b.length - 4).rewind();
            sendBuffer(bb);
            if (h.getXid() > 0) {
                synchronized(this){
                    outstandingRequests--;
                }
                // check throttling
                synchronized (this.factory) {        
                    if (zkServer.getInProcess() < outstandingLimit || outstandingRequests < 1) {
                        sk.selector().wakeup();
                        enableRecv();
                    }
                }
            }
         } catch(Exception e) {
            LOG.warn("Unexpected exception. Destruction averted.", e);
         }
    }
    
    • bos.writeRecord(h, "header")/bos.writeRecord(r, tag)序列化响应头header和响应体response;
    • sendBuffer(bb)通过SocketChannel.write写出响应;
      --------over---------

    相关文章

      网友评论

          本文标题:Zookeeper(五)-服务端单机模式-事务请求处理

          本文链接:https://www.haomeiwen.com/subject/yzopnktx.html