美文网首页
zookeeper的同步流程源码解析

zookeeper的同步流程源码解析

作者: sadamu0912 | 来源:发表于2020-05-18 17:45 被阅读0次

    zk的数据同步,主要有以下三种方式,diff,snap,trunc。

    图1 image.png 图2 image.png

    learner侧的代码

    首先,同步的入口是QuorumPeer的run方法:

    case FOLLOWING:
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
    

    实例化follower对象,然后执行同步方法followLeader方法

    QuorumServer leaderServer = findLeader();            
                try {
                    connectToLeader(leaderServer.addr, leaderServer.hostname);
                    
    """
    注册同步请求,FOLLOWERINFO,如上图1的1 
    """
        
             
                    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
    
                    "check to see if the leader zxid is lower than ours
                    //this should never happen but is just a safety check"
                    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                    if (newEpoch < self.getAcceptedEpoch()) {
                        LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                                + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                        throw new IOException("Error: Epoch of leader is lower");
                    }
                    syncWithLeader(newEpochZxid);                
                    QuorumPacket qp = new QuorumPacket();
                    while (this.isRunning()) {
                        readPacket(qp);
                        processPacket(qp);
                    }
    

    其中registerWithLeader方法很重要,相当于同步协议的handshake。初始化了,follower的最大zxid。还有epoch .

    protected long registerWithLeader(int pktType) throws IOException{
           "
             * Send follower info, including last zxid and sid
             "
            long lastLoggedZxid = self.getLastLoggedZxid();
            QuorumPacket qp = new QuorumPacket();                
            qp.setType(pktType);
            qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
            
           "
             * Add sid to payload
            "
            LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
            ByteArrayOutputStream bsid = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
            boa.writeRecord(li, "LearnerInfo");
            qp.setData(bsid.toByteArray());
            
            writePacket(qp, true);
            readPacket(qp);        
            final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
                    "这里接收到来自leader的同步协议handshake的响应。LEADERINFO "
            if (qp.getType() == Leader.LEADERINFO) {
                 " we are connected to a 1.0 server so accept the new epoch and read the next packet "
                leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
                byte epochBytes[] = new byte[4];
                final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
                if (newEpoch > self.getAcceptedEpoch()) {
                    wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
                    self.setAcceptedEpoch(newEpoch);
                } else if (newEpoch == self.getAcceptedEpoch()) {
                     " since we have already acked an epoch equal to the leaders, we cannot ack
                    // again, but we still need to send our lastZxid to the leader so that we can
                    // sync with it if it does assume leadership of the epoch.
                    // the -1 indicates that this reply should not count as an ack for the new epoch "
                    wrappedEpochBytes.putInt(-1);
                } else {
                    throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
                }
                " 发送ACKEPOCH同步他的初始zxid 。有点像tcp协议的初始化ISN。 "
                QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
                writePacket(ackNewEpoch, true);
                return ZxidUtils.makeZxid(newEpoch, 0);
            } else {
                if (newEpoch > self.getAcceptedEpoch()) {
                    self.setAcceptedEpoch(newEpoch);
                }
                if (qp.getType() != Leader.NEWLEADER) {
                    LOG.error("First packet should have been NEWLEADER");
                    throw new IOException("First packet should have been NEWLEADER");
                }
                return qp.getZxid();
            }
        } 
    

    紧接着调用syncWithLeader方法

    readPacket(qp);
            LinkedList<Long> packetsCommitted = new LinkedList<Long>();
            LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
            synchronized (zk) {
                if (qp.getType() == Leader.DIFF) {
                    LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                    snapshotNeeded = false;
                }
     "如果是snapshot的话,先清空自己的dataTree,然后从snapshot日志文件中反序列化,
    //然后设定lastProcessedZxid "
                else if (qp.getType() == Leader.SNAP) {
                    LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                     " The leader is going to dump the database
                    // clear our own database and read  "
                    zk.getZKDatabase().clear();
                    zk.getZKDatabase().deserializeSnapshot(leaderIs);
                    String signature = leaderIs.readString("signature");
                    if (!signature.equals("BenWasHere")) {
                        LOG.error("Missing signature. Got " + signature);
                        throw new IOException("Missing signature");                   
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                } else if (qp.getType() == Leader.TRUNC) {
                    " we need to truncate the log to the lastzxid of the leader  "
                    LOG.warn("Truncating log to get in sync with the leader 0x"
                            + Long.toHexString(qp.getZxid()));
                    boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                    if (!truncated) {
                       " not able to truncate the log  "
                        LOG.error("Not able to truncate the log "
                                + Long.toHexString(qp.getZxid()));
                        System.exit(13);
                    }
                    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                }
                else {
                    LOG.error("Got unexpected packet from leader "
                            + qp.getType() + " exiting ... " );
                    System.exit(13);
    
                }
                zk.createSessionTracker();
                
                long lastQueued = 0;
    
                // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
                // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
                // we need to make sure that we don't take the snapshot twice.
                boolean isPreZAB1_0 = true;
                //If we are not going to take the snapshot be sure the transactions are not applied in memory
                // but written out to the transaction log
                boolean writeToTxnLog = !snapshotNeeded;
                // we are now going to start getting transactions to apply followed by an UPTODATE
                outerLoop:
                while (self.isRunning()) {
                    readPacket(qp);
     "如果是DIff的方式同步,那么leader会不断发送PROPOSAL,COMMIT数据包。这里涉及到二阶段提交。packetsNotCommitted加一条  "
                    switch(qp.getType()) {
                    case Leader.PROPOSAL:
                        PacketInFlight pif = new PacketInFlight();
                        pif.hdr = new TxnHeader();
                        pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                        if (pif.hdr.getZxid() != lastQueued + 1) {
                        LOG.warn("Got zxid 0x"
                                + Long.toHexString(pif.hdr.getZxid())
                                + " expected 0x"
                                + Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = pif.hdr.getZxid();
                        packetsNotCommitted.add(pif);
                        break;
                    case Leader.COMMIT:
                        if (!writeToTxnLog) {
                            pif = packetsNotCommitted.peekFirst();
                            if (pif.hdr.getZxid() != qp.getZxid()) {
                                LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                            } else {
                                zk.processTxn(pif.hdr, pif.rec);
                                packetsNotCommitted.remove();
                            }
                        } else {
                            packetsCommitted.add(qp.getZxid());
                        }
                        break;
                    case Leader.INFORM:
                        /*
                         * Only observer get this type of packet. We treat this
                         * as receiving PROPOSAL and COMMMIT.
                         */
                        PacketInFlight packet = new PacketInFlight();
                        packet.hdr = new TxnHeader();
                        packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                        // Log warning message if txn comes out-of-order
                        if (packet.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn("Got zxid 0x"
                                    + Long.toHexString(packet.hdr.getZxid())
                                    + " expected 0x"
                                    + Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = packet.hdr.getZxid();
                        if (!writeToTxnLog) {
                            // Apply to db directly if we haven't taken the snapshot
                            zk.processTxn(packet.hdr, packet.rec);
                        } else {
                            packetsNotCommitted.add(packet);
                            packetsCommitted.add(qp.getZxid());
                        }
                        break;
                    "退出循环,说明已经同步完成,break之后,zk.startup(); "
                    case Leader.UPTODATE:
                        if (isPreZAB1_0) {
                            zk.takeSnapshot();
                            self.setCurrentEpoch(newEpoch);
                        }
                        self.cnxnFactory.setZooKeeperServer(zk);                
                        break outerLoop;
                    case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                        // means this is Zab 1.0
                        // Create updatingEpoch file and remove it after current
                        // epoch is set. QuorumPeer.loadDataBase() uses this file to
                        // detect the case where the server was terminated after
                        // taking a snapshot but before setting the current epoch.
                        File updating = new File(self.getTxnFactory().getSnapDir(),
                                            QuorumPeer.UPDATING_EPOCH_FILENAME);
                        if (!updating.exists() && !updating.createNewFile()) {
                            throw new IOException("Failed to create " +
                                                  updating.toString());
                        }
                        if (snapshotNeeded) {
                            zk.takeSnapshot();
                        }
                        self.setCurrentEpoch(newEpoch);
                        if (!updating.delete()) {
                            throw new IOException("Failed to delete " +
                                                  updating.toString());
                        }
                        writeToTxnLog = true;  "Anything after this needs to go to the transaction   log, not applied directly in memory  "
                        isPreZAB1_0 = false;
                        writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                        break;
                    }
                }
            }
            ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
            writePacket(ack, true);
            sock.setSoTimeout(self.tickTime * self.syncLimit);
            zk.startup();
    

    leader侧的代码

    入口是org.apache.zookeeper.server.quorum.QuorumPeer#run方法的leader.lead();

    1 首先是loadData加载数据

            if(zkDb.isInitialized()){
                setZxid(zkDb.getDataTreeLastProcessedZxid());
            }
            else {
                setZxid(zkDb.loadDataBase()); "加载数据"
            }
    

    org.apache.zookeeper.server.ZKDatabase#loadDataBase

    public long loadDataBase() throws IOException {
            long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
            initialized = true;
            return zxid;
        }
    

    我们看到这里有个commitProposalPlaybackListener 。 这个东西就是加载的时候,确定minZxid,maxZxid的地方。也就是图1 中的leader框中的那个队列

    private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
            public void onTxnLoaded(TxnHeader hdr, Record txn){
                addCommittedProposal(hdr, txn);
            }
        };
    "这个方法是在快速恢复最新事物的时候调用FileTxnSnapLog#fastForwardFromEdits"
    
     public void addCommittedProposal(Request request) {
            WriteLock wl = logLock.writeLock();
            try {
                wl.lock(); 
              "LinkedList<Proposal> committedLog 是个链表,最大长度是500"
                
                if (committedLog.size() > commitLogCount=500) {
                    committedLog.removeFirst();
                    minCommittedLog = committedLog.getFirst().packet.getZxid();
                }
                if (committedLog.size() == 0) {
                    minCommittedLog = request.zxid;
                    maxCommittedLog = request.zxid;
                }
    

    2 启动learnerHandler线程

    org.apache.zookeeper.server.quorum.LearnerHandler#run方法358行,只有leader收到过半的LEARNERINFO,才会发送leaderInfo .

    private HashSet<Long> connectingFollowers = new HashSet<Long>();
        public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
            synchronized(connectingFollowers) {
               #### 省略n行代码
                #### 只有超过一半的connectingFollowers ,才会notifyAll唤醒线程
                if (connectingFollowers.contains(self.getId()) && 
                                                verifier.containsQuorum(connectingFollowers)) {
                    waitingForNewEpoch = false;
                    self.setAcceptedEpoch(epoch);
                    connectingFollowers.notifyAll();
                } else {
                    long start = Time.currentElapsedTime();
                    long cur = start;
                    long end = start + self.getInitLimit()*self.getTickTime();
                    while(waitingForNewEpoch && cur < end) {
                        connectingFollowers.wait(end - cur);
                        cur = Time.currentElapsedTime();
                    }
                    if (waitingForNewEpoch) {
                        throw new InterruptedException("Timeout while waiting for epoch from quorum");        
                    }
                }
                return epoch;
            }
        }
    
                    byte ver[] = new byte[4];
                    ByteBuffer.wrap(ver).putInt(0x10000);
                    QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
                    oa.writeRecord(newEpochPacket, "packet");
                    bufferedOutput.flush();
                    QuorumPacket ackEpochPacket = new QuorumPacket();
                    ia.readRecord(ackEpochPacket, "packet");
                     #### 等待接收learner的ACKEPOCH包
                      ####
                      ####
                    if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                        LOG.error(ackEpochPacket.toString()
                                + " is not ACKEPOCH");
                        return;
                    }
                    ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                    ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                    leader.waitForEpochAck(this.getSid(), ss);
    

    前面 图1 的第一步是 Leaner类的registerWithLeader方法。发送LearnerInfo.
    现在是图1 的第二步 ,发送leaderInfo
    然后图1 第三步 接受到ACKEPOCH包,同样的要过半之后,才走后面的流程

    private HashSet<Long> electingFollowers = new HashSet<Long>();
        private boolean electionFinished = false;
        public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
            synchronized(electingFollowers) {
               #### 省略n行代码
                QuorumVerifier verifier = self.getQuorumVerifier();
                #### 过半唤醒线程
                if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
                    electionFinished = true;
                    electingFollowers.notifyAll();
                } else {                
                    long start = Time.currentElapsedTime();
                    long cur = start;
                    long end = start + self.getInitLimit()*self.getTickTime();
                    while(!electionFinished && cur < end) {
                        electingFollowers.wait(end - cur);
                        cur = Time.currentElapsedTime();
                    }
    

    同步

      #### org.apache.zookeeper.server.quorum.LearnerHandler#run
     /* the default to send to the follower */  "默认是SNAP"
                int packetToSend = Leader.SNAP;
                long zxidToSend = 0;
                long leaderLastZxid = 0;
                /** the packets that the follower needs to get updates from **/
                long updates = peerLastZxid;
                
                /* we are sending the diff check if we have proposals in memory to be able to 
                 * send a diff to the 
                 */ 
                ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
                ReadLock rl = lock.readLock();
                try {
                    rl.lock();  
                     #### 拿到最小的minZxid 和最大的maxZxid
                     ####     test
                    ####
                    ####
                     
                    final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                    final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
                    LOG.info("Synchronizing with Follower sid: " + sid
                            +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                            +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                            +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
                    
                    "这里的proposals 的packetType都是PROPOSAL"
                    LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
                    "这里的peerLastZxid 就是前面ACKEPOCH包,发送过来的follower或者observer的lastZxid"
                    if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                        // Follower is already sync with us, send empty diff   
                         "已经同步,发送空的DIFF包"
                        LOG.info("leader and follower are in sync, zxid=0x{}",
                                Long.toHexString(peerLastZxid));
                        packetToSend = Leader.DIFF;
                        zxidToSend = peerLastZxid;
                    } else if (proposals.size() != 0) {
                        LOG.debug("proposal size is {}", proposals.size());
                        "如果peerLastZxid介于leader的【minCommittedLog ,maxCommittedLog 】之间"
                        if ((maxCommittedLog >= peerLastZxid)
                                && (minCommittedLog <= peerLastZxid)) {
                            LOG.debug("Sending proposals to follower");
    
                            // as we look through proposals, this variable keeps track of previous
                            // proposal Id.
                            "因为我们遍历proposals队列,prevProposalZxid 记录上一个proposalZxid"
                            long prevProposalZxid = minCommittedLog;
    
                            // Keep track of whether we are about to send the first packet.
                            // Before sending the first packet, we have to tell the learner
                            // whether to expect a trunc or a diff
                            "先告诉对端是TRUNC还是DIFF同步"
                            boolean firstPacket=true;
    
                            // If we are here, we can use committedLog to sync with
                            // follower. Then we only need to decide whether to
                            // send trunc or not
                            packetToSend = Leader.DIFF;
                            zxidToSend = maxCommittedLog;
    
                            for (Proposal propose: proposals) {
                                // skip the proposals the peer already has
                                  "小于对端的peerLastZxid,直接略过"
                                if (propose.packet.getZxid() <= peerLastZxid) {
                                    prevProposalZxid = propose.packet.getZxid();
                                    continue;
                                } else {
                                    // If we are sending the first packet, figure out whether to trunc
                                    // in case the follower has some proposals that the leader doesn't
                                    if (firstPacket) {
                                        firstPacket = false;
                                        // Does the peer have some proposals that the leader hasn't seen yet
                                        if (prevProposalZxid < peerLastZxid) {
                                            // send a trunc message before sending the diff
                                            packetToSend = Leader.TRUNC;                                        
                                            zxidToSend = prevProposalZxid;
                                            updates = zxidToSend;
                                        }
                                    }
                                      "一个PROPOSAL,一个COMMIT包,间隔着发过去,同步"
                                    queuePacket(propose.packet);
                                    QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                            null, null);
                                    queuePacket(qcommit);
                                }
                            }
                        "如果对端的zxid大于leader的maxCommittedLog,发送TRUNC指令"
                        } else if (peerLastZxid > maxCommittedLog) {
                            LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                                    Long.toHexString(maxCommittedLog),
                                    Long.toHexString(updates));
    
                            packetToSend = Leader.TRUNC;
                            zxidToSend = maxCommittedLog;
                            updates = zxidToSend;
                        } else {
                            LOG.warn("Unhandled proposal scenario");
                        }
                    } else {
                        // just let the state transfer happen
                        LOG.debug("proposals is empty");
                    }               
    
                    LOG.info("Sending " + Leader.getPacketType(packetToSend));
                    leaderLastZxid = leader.startForwarding(this, updates);
    
                } finally {
                    rl.unlock();
                }
    

    4 发送newLeader 包,如果是SNAP,序列化snapshot文件,并且发送给learner

    QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        ZxidUtils.makeZxid(newEpoch, 0), null, null);
                 if (getVersion() < 0x10000) {
                    oa.writeRecord(newLeaderQP, "packet");
                } else {
                    queuedPackets.add(newLeaderQP);
                }
                bufferedOutput.flush();
                //Need to set the zxidToSend to the latest zxid
                if (packetToSend == Leader.SNAP) {
                    zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                }
                oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
                bufferedOutput.flush();
                
                /* if we are not truncating or sending a diff just send a snapshot */
                if (packetToSend == Leader.SNAP) {
                    LOG.info("Sending snapshot last zxid of peer is 0x"
                            + Long.toHexString(peerLastZxid) + " " 
                            + " zxid of leader is 0x"
                            + Long.toHexString(leaderLastZxid)
                            + "sent zxid of db as 0x" 
                            + Long.toHexString(zxidToSend));
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                }
                bufferedOutput.flush();
    

    最后真的网络底层去同步数据

     // Start sending packets
                new Thread() {
                    public void run() {
                        Thread.currentThread().setName(
                                "Sender-" + sock.getRemoteSocketAddress());
                        try {
                            sendPackets();
                        } catch (InterruptedException e) {
                            LOG.warn("Unexpected interruption",e);
                        }
                    }
                }.start();
    

    5 等待learner的ACK

     qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
                if(qp.getType() != Leader.ACK){
                    LOG.error("Next packet was supposed to be an ACK");
                    return;
                }
                LOG.info("Received NEWLEADER-ACK message from " + getSid());
                leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
    

    6 启动zkServer()

    "Leader.lead()"
     startZkServer();
    
    "LearnerHandler#run()"
    /*
                 * Wait until leader starts up
                 */
                synchronized(leader.zk){
                    while(!leader.zk.isRunning() && !this.isInterrupted()){
                        leader.zk.wait(20);
                    }
                }
                // Mutation packets will be queued during the serialize,
                // so we need to mark when the peer can actually start
                // using the data
                //
                queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    

    7 可以响应客户端的请求,正常提供服务

    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    

    附上类图:

    image.png

    最后总结:

    1 集群启动时,当选举完成之后,进入数据同步过程。由QuorumPeer线程,进入同步过程。
    2 learner侧,首先和leader建立连接。然后注册同步请求,FOLLOWERINFO或者OBSERVERINFO,给leader发送peerLastProcessZxid,还有epoch.
    3 然后leader侧,等待过半的LEARNERINFO,然后才发送leaderInfo
    4 registerWithLeader方法中发送ACKEPOCH
    5 leader侧 waitForEpochAck ,过半之后,进行同步
    6 先获取leader的最小的minZxid 和最大的maxZxid,然后作比较,分别进行DIFF,SNAP或者TRUNC同步
    7 等待learner的ACK
    8 启动实例,响应客户端的请求,正常提供服务

    相关文章

      网友评论

          本文标题:zookeeper的同步流程源码解析

          本文链接:https://www.haomeiwen.com/subject/kgxsohtx.html