美文网首页
PySyncObj 源码解读 | Raft

PySyncObj 源码解读 | Raft

作者: NightCat | 来源:发表于2018-03-21 10:54 被阅读199次

    创建日志对象,默认是新建一个MemoryJournal对象

    self.__raftLog = createJournal(self.__conf.journalFile)

    #日志对象含有的属性:
    self.__journal = []  #[]存放的是 'command'   'idx'  'term'
    self.__bytesSize = 0
    

    创建序列化对象

    self.__serializer = Serializer(self.__conf.fullDumpFile,        
                                   self.__conf.logCompactionBatchSiz
                                   self.__conf.useFork,             
                                   self.__conf.serializer,          
                                   self.__conf.deserializer,        
                                   self.__conf.serializeChecker)    
    

    创建Poller

    self._poller = createPoller(self.__conf.pollerType)
    首先判断有没有select有没有poll属性:

    1. 有:返回PollPoller对象
    2. 没有:返回SelectPoller()对象

    通过poller来实现IO多路复用

    创建TCP的服务端 | tcp_server.py 中的TcpServer的对象

    self.__server = TcpServer(self._poller, host, port, onNewConnection=self.__o
                              sendBufferSize=self.__conf.sendBufferSize,        
                              recvBufferSize=self.__conf.recvBufferSize,        
                              connectionTimeout=self.__conf.connectionTimeout)  
    

    传入onNewConnection=self.__onNewConnection作为新连接的处理函数
    此时只是创建了一个服务端的对象,初始化了一些属性,没怎么处理,甚至还没有创建套接字

    获得本SyncObj对象的需要复制的方法

    methods = [m for m in dir(self) if callable(getattr(self, m)) and \                
               getattr(getattr(self, m), 'replicated', False) and \                    
               m != getattr(getattr(self, m), 'origName')]                             
    

    把需要复制的所有“方法”和“版本号“的存放到字典(funcVersions)

    funcVersions[origFuncName].add(ver)
    添加{"方法1":"版本号1","方法2":"版本号2",...}
    此时获取的是所有带有”复制“功能的方法,而存储的是是set

    可能同一个方法名有多个版本

     #存放了方法集合,元素是("版本号",0,方法,对象)                                  
     for method in methods:                                       
         ver = getattr(getattr(self, method), 'ver')   #返回方法的'ver'
         methodsToEnumerate.append((ver, 0, method, self))        
    

    从复制方法集中找到最高版本的方法的版本号——self.__selfCodeVersion

     for ver, _, method, obj in sorted(methodsToEnumerate):             
         self.__selfCodeVersion = max(self.__selfCodeVersion, ver)      #存放最新版本的方法?保证从各个复制方法总得到最大的版本号码放入                 
         if obj is self:   
             self._methodToID[method] = currMethodID           #存放方法ID              
         else:                 
             self._methodToID[(id(obj), method)] = currMethodID    
         self._idToMethod[currMethodID] = getattr(obj, method)       #存放{'currMethodId':方法}  
         currMethodID += 1                         
    

    存放两个列表:
    _methodToID:里面存着方法和对应的方法号 #存放{方法:currMethodID}
    _idToMethod:里面存着方法号和对应的方法 #存放{currMethodId :方法}

    __onSetCodeVersion(0)

    #此处和之前一样,取出需要复制的方法
    methods = [m for m in dir(self) if callable(getattr(self, m)) and \
               getattr(getattr(self, m), 'replicated', False) and \
               m != getattr(getattr(self, m), 'origName')]
               
    # 当前版本的方法名
    self.__currentVersionFuncNames = {}
    
    funcVersions = collections.defaultdict(set)     #funcVersion字典,如果key值不存在时,返回一个set集合
    for method in methods:
        ver = getattr(getattr(self, method), 'ver')
        origFuncName = getattr(getattr(self, method), 'origName')
        funcVersions[origFuncName].add(ver)         #添加{"方法名":{ver1,ver2...}} 可能存在多个版本的方法
    
    #暂且不看,因为consumer默认为none
    for consumer in self.__consumers:
        consumerID = id(consumer)
        consumerMethods = [m for m in dir(consumer) if callable(getattr(consumer, m)) and \
                           getattr(getattr(consumer, m), 'replicated', False)]
        for method in consumerMethods:
            ver = getattr(getattr(consumer, method), 'ver')
            origFuncName = getattr(getattr(consumer, method), 'origName')
            funcVersions[(consumerID, origFuncName)].add(ver)
    
    for funcName, versions in iteritems(funcVersions):
        versions = sorted(list(versions))
        for v in versions:
            if v > newVersion:
                break
            realFuncName = funcName[1] if isinstance(funcName, tuple) else funcName
            #得到一个方法和版本的集合,都是版本比setcodeVersion版本低的复制方法
            self.__currentVersionFuncNames[funcName] = realFuncName + '_v' + str(v)
    

    相当于只有比setCodeVersion版本更低的方法是可以用的,指定了可用方法!

    __bindedEvent = threading.Event()

    Python threading模块提供了Event对象用于线程间通信,它提供了设置、清除、等待等方法用于实现线程间的通信。event是最简单的进程间通信方式之一,一个线程产生一个信号,另一个线程则等待该信号。Python 通过threading.Event()产生一个event对象,event对象维护一个内部标志(标志初始值为False),通过set()将其置为True,wait(timeout)则用于阻塞线程直至Flag被set(或者超时,可选的),isSet()用于查询标志位是否为True,Clear()则用于清除标志位(使之为False)。

    PipeNotifier(self._poller)

    #appendEntriesUseBatch默认为TRUE                          
    if not self.__conf.appendEntriesUseBatch and PIPE_NOTIF
        self.__pipeNotifier = PipeNotifier(self._poller)   
    

    主要的函数处理

    self.__mainThread = threading.current_thread()       
    self.__initialised = threading.Event()               
    #创建一个线程,传入的参数是本实例对象的"弱引用"                            
    self.__thread = threading.Thread(target=SyncObj._autoTickThread, args=(weakref.proxy(self),)) 
    self.__thread.start()           #启动线程活动              
    self.__initialised.wait()     #阻塞线程,直到Event.set(),由此可以看到线程SyncObj._autoTickThread会把线程不阻塞,    
    

    创建的线程执行的函数 _autoTickThread(self)

     def _autoTickThread(self):                                  
         try:                                                    
             self.__initInTickThread()                          
         except SyncObjException as e:                           
             if e.errorCode == 'BindError':                      
                 return                                          
             raise                                               
         finally:                                                
             self.__initialised.set()      #set(), 可以继续执行线程了                      
         time.sleep(0.1)                                         
         try:                                                    
             while True:                                         
                 if not self.__mainThread.is_alive():            
                     break                                       
                 if self.__destroying:                           
                     self._doDestroy()                           
                     break                                       
                 self._onTick(self.__conf.autoTickPeriod)        
         except ReferenceError:                                  
             pass                                                
    

    self.__initialised.set()目的:保证先执行__initInTickThread,再调用self._onTick

    __initInTickThread

    def __initInTickThread(self):   
        try:       
            self.__lastInitTryTime = time.time() #上次初始化的时间
            if self.__selfNodeAddr is not None:        
                self.__server.bind()        #开始对TCP服务端端口绑定     
                shouldConnect = None             
            else:             
                shouldConnect = True        
            self.__nodes = []            
            for nodeAddr in self.__otherNodesAddrs:   
                #刚开始shouldConnect是None
                self.__nodes.append(Node(self, nodeAddr, shouldConnect)) 
                self.__raftNextIndex[nodeAddr] = self.__getCurrentLogIndex() + 1  
                self.__raftMatchIndex[nodeAddr] = 0   #刚开始的时候已经复制日志的最高索引值为0
            self.__needLoadDumpFile = True     
            self.__isInitialized = True     
            self.__bindedEvent.set()      
        except:                   
            self.__bindRetries += 1     
            if self.__conf.maxBindRetries and self.__bindRetries >= self.__conf.maxBindRetries:           
                self.__bindedEvent.set()      
                raise SyncObjException('BindError')      
            logging.exception('failed to perform initialization')                                         
    

    开始对TCP服务端端口绑定

    bind | tcp_server.py

        def bind(self):
            self.__socket = socket.socket(self.__hostAddrType, socket.SOCK_STREAM)
            self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__sendBufferSize)
            self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__recvBufferSize)
            self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)   #不适用Nagle算法
            self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #允许重用本地地址和端口
            self.__socket.setblocking(0)  #设置为非阻塞模式
            self.__socket.bind((self.__host, self.__port))
            self.__socket.listen(5)
            self.__fileno = self.__socket.fileno()   #返回监听Socket的文件描述符
            logging.warning(self.__fileno)
            self.__poller.subscribe(self.__fileno,
                                    self.__onNewConnection,
                                    POLL_EVENT_TYPE.READ | POLL_EVENT_TYPE.ERROR)
            self.__state = SERVER_STATE.BINDED
    

    主要:

    1. 创建TCP Socket套接字
    2. 修改套接字的选项
    3. 设置setblocking为0非阻塞模式,此时accept()不会阻塞等待,可以实现多任务
    4. 向poll注册事件,用于生成为客户端服务的套接字

    def __onNewConnection | tcp_server.py

    用于接收客户端的connect或者ERROR的时候解绑

    conn = TcpConnection(poller=self.__poller,
                                         socket=sock,
                                         timeout=self.__connectionTimeout,
                                         sendBufferSize=self.__sendBufferSize,
                                         recvBufferSize=self.__recvBufferSize)
    self.__onNewConnectionCallback(conn)
    

    TcpConnection属于tcp_connection.py
    self.__onNewConnectionCallback的函数是SynvObj里的__onNewConnection函数,用于处理新的连接。
    此时并没有给出__onMessageReceived函数和__onConnected函数

    __onNewConnection做了什么?

    为客户端的这个服务,将其放到self.__unknownConnections[descr] = conn
    同时绑定方法:SyncObj的__onMessageReceived和__onDisconnected

    创建TcpConection对象中

    设置:self.__state = CONNECTION_STATE.CONNECTED
    向poll注册事件,一旦有接收消息或者发送消息事件的时候,调用__processConnection

    __processConnection

     if time.time() - self.__lastReadTime > self.__timeout:
                self.disconnect()
                return
    

    判断:如果接收到的消息发现事件已经比上次读的时间超过了10秒,超时数据不处理,断开连接。

    #初次接收消息的时候,self__state == CONNECTION_STATE.CONNECTING
    if self.__state == CONNECTION_STATE.CONNECTING:
        #初始的时候,还是等于None的
        if self.__onConnected is not None:
            self.__onConnected()
        self.__state = CONNECTION_STATE.CONNECTED
        self.__lastReadTime = time.time()
        return
    

    第一次收到消息,表示连接已经建立好

    接收数据:

    if eventType & POLL_EVENT_TYPE.READ:
        self.__tryReadBuffer()
        if self.__state == CONNECTION_STATE.DISCONNECTED:
            return
    
        while True:
            message = self.__processParseMessage()
            if message is None:
                break
            if self.__onMessageReceived is not None:
                #把消息传到syncobj的消息接受函数中处理
                self.__onMessageReceived(message)
            if self.__state == CONNECTION_STATE.DISCONNECTED:
                return
    

    __tryReadBuffer():主要是等待__processRead返回TRUE时

    def __processRead(self):
        try:
            incoming = self.__socket.recv(self.__recvBufferSize)
        except socket.error as e:
            if e.errno not in (socket.errno.EAGAIN, socket.errno.EWOULDBLOCK):
                self.disconnect()
            return False
        if self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
            self.disconnect()
            return False
        if not incoming:
            self.disconnect()
            return False
        self.__readBuffer += incoming
        return True
    

    message = self.__processParseMessage()开始解析数据:

    def __processParseMessage(self):
        if len(self.__readBuffer) < 4:
            return None
    
        #l是指定的数据长度?
        l = struct.unpack('i', self.__readBuffer[:4])[0]
        if len(self.__readBuffer) - 4 < l:
            return None
        data = self.__readBuffer[4:4 + l]
        try:
            if self.encryptor:
                data = self.encryptor.decrypt(data)
            message = pickle.loads(zlib.decompress(data))
            if self.recvRandKey:
                randKey, message = message
                assert randKey == self.recvRandKey
        except:
            self.disconnect()
            return None
        self.__readBuffer = self.__readBuffer[4 + l:]
        return message
    

    提取数据部分并返回!
    调用Synvobj传来的接收消息处理函数__onMessageReceived处理message

    __onMessageReceived

    # message不是应该是字符串吗?怎么是list?
    if isinstance(message, list) and self.__onUtilityMessage(conn, message):
        self.__unknownConnections.pop(descr, None)
        return
    
    # 消息只包含节点地址
    partnerNode = None
    for node in self.__nodes:
        if node.getAddress() == message:
            partnerNode = node
            break
    
    if partnerNode is None and message != 'readonly':
        conn.disconnect()
        self.__unknownConnections.pop(descr, None)
        return
    # 处理传来集群其他节点IP地址的消息
    if partnerNode is not None:
        partnerNode.onPartnerConnected(conn)
    else:
        nodeAddr = str(self.__readonlyNodesCounter)
        node = Node(self, nodeAddr, shouldConnect=False)
        node.onPartnerConnected(conn)
        self.__readonlyNodes.append(node)
        self.__raftNextIndex[nodeAddr] = self.__getCurrentLogIndex() + 1
        self.__raftMatchIndex[nodeAddr] = 0
        self.__readonlyNodesCounter += 1
    
    self.__unknownConnections.pop(descr, None)
    

    __onUtilityMessage

    判断消息是否有效,无效时返回FALSE

    #消息类型
    if message[0] == 'status':
        conn.send(self.getStatus())
        return True
    elif message[0] == 'add':
        # 默认没开启动态添加节点到集群的功能
        self.addNodeToCluster(message[1],
                              callback=functools.partial(self.__utilityCallback, conn=conn, cmd='ADD',
                                                         node=message[1]))
        return True
    elif message[0] == 'remove':
        if message[1] == self.__selfNodeAddr:
            conn.send('FAIL REMOVE ' + message[1])
        else:
            # 默认没开启动态添加节点到集群的功能
            self.removeNodeFromCluster(message[1], callback=functools.partial(self.__utilityCallback, conn=conn,
                                                                              cmd='REMOVE', node=message[1]))
        return True
    elif message[0] == 'set_version':
        # 切换到所有集群节点上的新代码版本
        self.setCodeVersion(message[1],
                            callback=functools.partial(self.__utilityCallback, conn=conn, cmd='SET_VERSION',
                                                       node=str(message[1])))
        return True
    

    也就是说,如果消息是请求本节点的状态或者切换新代码的版本,此时就返回True

    切换到集群节点的新版本

    def setCodeVersion(self, newVersion, callback=None):
    """切换到所有群集节点上的新代码版本。
    您应该确保群集节点已更新,否则将无法应用命令。
    
     :param newVersion:新的代码版本
     :type int
     :param callback:将在cussess上调用或失败
     :type callback:function(`FAIL_REASON <#pysyncobj.FAIL_REASON > `_,None)
    """
    assert isinstance(newVersion, int)
    if newVersion > self.__selfCodeVersion:
        raise Exception(
            'wrong version, current version is %d, requested version is %d' % (self.__selfCodeVersion, newVersion))
    if newVersion < self.__enabledCodeVersion:
        raise Exception('wrong version, enabled version is %d, requested version is %d' % (
        self.__enabledCodeVersion, newVersion))
    self._applyCommand(pickle.dumps(newVersion), callback, _COMMAND_TYPE.VERSION)
    

    _applyCommand

        def _applyCommand(self, command, callback, commandType=None):
            try:
                if commandType is None:
                    self.__commandsQueue.put_nowait((command, callback))
                else:
                    # 放到队列中
                    self.__commandsQueue.put_nowait((_bchr(commandType) + command, callback))
                    
                # 默认appendEntriesUseBatch是TRUE,Send multiple entries in a single command. 提高整体性能
                # 所以第一个条件就不满足了
                if not self.__conf.appendEntriesUseBatch and PIPE_NOTIFIER_ENABLED:
                    self.__pipeNotifier.notify()
            except Queue.Full:
                self.__callErrCallback(FAIL_REASON.QUEUE_FULL, callback)
    

    node 的 onPartnerConnected方法

    添加方法:

    conn.setOnMessageReceivedCallback(self.__onMessageReceived)
            conn.setOnDisconnectedCallback(self.__onDisconnected)
            self.__status = NODE_STATUS.CONNECTED
    

    回到初始化__initInTickThread方法中

    以上已经把TCP服务器建设好,以及新的连接过来怎么处理的函数都分析完了
    接下来是节点对象的创建

    for nodeAddr in self.__otherNodesAddrs:
        self.__nodes.append(Node(self, nodeAddr, shouldConnect))
        self.__raftNextIndex[nodeAddr] = self.__getCurrentLogIndex() + 1
        self.__raftMatchIndex[nodeAddr] = 0  
    

    Node初始化里的判断shouldConnect判断有点奇怪!!
    IP地址大的向小的发送TCP请求

    if self.__shouldConnect:
      self.__ip = globalDnsResolver().resolve(nodeAddr.rsplit(':', 1)[0])
      self.__port = int(nodeAddr.rsplit(':', 1)[1])
      
      # 新建TCP连接的对象,主次此处传入的是self的__onMessageReceived,onDisconnected
      self.__conn = TcpConnection(poller=syncObj._poller,
                                  onConnected=self.__onConnected,
                                  onMessageReceived=self.__onMessageReceived,
                                  onDisconnected=self.__onDisconnected,
                                  timeout=syncObj._getConf().connectionTimeout,
                                  sendBufferSize=syncObj._getConf().sendBufferSize,
                                  recvBufferSize=syncObj._getConf().recvBufferSize)
      self.__conn.encryptor = self.__encryptor
    

    此时创建的TCPConnection的对象,没有socket,

    此时跳转到onTick函数里

    for node in self.__nodes:
        node.connectIfRequired()            #与其他节点均建立连接
    

    再次看到node.py中的connectIfRequired:

    def connectIfRequired(self):
        if not self.__shouldConnect:
            return
        
        # 由于第一次创建node对象的时候,还未Connect,此时不进入这个if语句内
        if self.__status != NODE_STATUS.DISCONNECTED:
            return
        if time.time() - self.__lastConnectAttemptTime < self.__syncObj()._getConf().connectionRetryTime:
            return
        # 尝试开始建立连接
        self.__status = NODE_STATUS.CONNECTING
        self.__lastConnectAttemptTime = time.time()
        
        #此时调用tcp_connection.py中的connect方法
        if not self.__conn.connect(self.__ip, self.__port):
            self.__status = NODE_STATUS.DISCONNECTED
            return
    

    connect方法中,连接到比本节点IP地址更小的IP的节点

          try:
              self.__socket.connect((host, port))
              logging.debug('__socket.connect')
    

    连接完后,更改self.__fileno,已经状态改为CONNECTING,
    注册收发处理函数__processConnection

    __processConnection和之前的那个是差不多的,不过此处的__onMessageReceived已经不一样了,此处用的是Syncobj里的_onMessageReceived

    _onMessageReceived | Syncobj.py

    消息类型是request_vote

    如果消息的任期大于本节点的任期:

    1. 更改任期为接收到的任期
    2. 设置__votedFor=None
    3. 设置本节点状态为FOLLOWER
    4. 收到这个消息,说明是没有Leader,设置self.__raftLeader = None

    本节点如果是FOLLOWER或者是候选者:
    获取发送消息的候选人的最后日志的任期和最后日志的索引
    遇到以下情况直接返回:

    1. 候选人比本节点的任期小
    2. 候选人与本节点的任期相同,但是索引比本节点小
    3. 任期如果和本节点相同,本节点已经为其他节点投票,

    否则,为该节点投票,发送回复消息:response_vote

    消息类型是append_entries,同时消息的类型是大于等于本节点的任期的

    因为只有Leader节点会发这个消息,首先判断发送这个消息的节点是不是Leader,如果不是,设置它为Leader节点
    再对消息的任期判断:

    if message['term'] > self.__raftCurrentTerm:
        # 更新最新的任期
        self.__raftCurrentTerm = message['term']
        # 收到附加消息,表示已经有Leader了,不需要投票了,清空
        self.__votedFor = None
    # 设置本节点为FOLLOWER,主要是在候选者身份竞争Leader失败时,变回FOLLOWER
    self.__setState(_RAFT_STATE.FOLLOWER)
    

    提取message里的条目,

    # 提取消息内容
    newEntries = message.get('entries', [])
    serialized = message.get('serialized', None)
    # Leader已经提交的日志的索引值,更新本节点记录的信息
    self.__leaderCommitIndex = leaderCommitIndex = message['commit_index']
    

    常规的附加消息的处理:

    if transmission is not None:
        if transmission == 'start':
            self.__recvTransmission = message['data']
            self.__sendNextNodeIdx(nodeAddr, success=False, reset=False)
            return
        elif transmission == 'process':
            self.__recvTransmission += message['data']
            self.__sendNextNodeIdx(nodeAddr, success=False, reset=False)
            return
        elif transmission == 'finish':
            self.__recvTransmission += message['data']
            #本此数据接收完了,字节转换回对象
            newEntries = [pickle.loads(self.__recvTransmission)]
            self.__recvTransmission = ''
        else:
            raise Exception('Wrong transmission type')
    
    prevLogIdx = message['prevLogIdx']
    prevLogTerm = message['prevLogTerm']
    
    # 拿出新的日志条目
    prevEntries = self.__getEntries(prevLogIdx)
    # 如果本节点没有Leader发送的附加消息的日志条目,请求对方发送
    if not prevEntries:
        self.__sendNextNodeIdx(nodeAddr, success=False, reset=True)
        return
    # 日志的条目对应的任期不一致,请求再发?
    if prevEntries[0][2] != prevLogTerm:
        self.__sendNextNodeIdx(nodeAddr, nextNodeIdx=prevLogIdx, success=False, reset=True)
        return
    # 有多个日志条目
    if len(prevEntries) > 1:
        # rollback cluster changes
        # 先不看,默认没开启
        if self.__conf.dynamicMembershipChange:
            for entry in reversed(prevEntries[1:]):
                clusterChangeRequest = self.__parseChangeClusterRequest(entry[0])
                if clusterChangeRequest is not None:
                    self.__doChangeCluster(clusterChangeRequest, reverse=True)
        # 有多个说明与当前Leader节点有冲突,这些日志条目要删除
        self.__deleteEntriesFrom(prevLogIdx + 1)
    # 添加新的条目到节点日志里,newEntries是从Leader传来的message中提取的
    for entry in newEntries:
        self.__raftLog.add(*entry)
    
    # apply cluster changes
    # 暂时不管
    if self.__conf.dynamicMembershipChange:
        for entry in newEntries:
            clusterChangeRequest = self.__parseChangeClusterRequest(entry[0])
            if clusterChangeRequest is not None:
                self.__doChangeCluster(clusterChangeRequest)
    
    nextNodeIdx = prevLogIdx + 1
    if newEntries:
        # 主要是针对多个条目的情况,取最新的索引,为什么不加1呢?
        nextNodeIdx = newEntries[-1][1]
    # 附加消息添加成功,返回下次节点接收的索引
    self.__sendNextNodeIdx(nodeAddr, nextNodeIdx=nextNodeIdx, success=True)
    

    快照部分先不管
    self.__raftCommitIndex = min(leaderCommitIndex, self.__getCurrentLogIndex())
    通过最新的日志条目的索引和Leader的索引来比较,得到最近已经提交的条目索引

    消息类型为:apply_command

    if message['type'] == 'apply_command':
        if 'request_id' in message:
            self._applyCommand(message['command'], (nodeAddr, message['request_id']))
        else:
            self._applyCommand(message['command'], None)
    

    调用了_applyCommand

     def _applyCommand(self, command, callback, commandType=None):
            try:
                if commandType is None:
                    self.__commandsQueue.put_nowait((command, callback))
                else:
                    self.__commandsQueue.put_nowait((_bchr(commandType) + command, callback))
                if not self.__conf.appendEntriesUseBatch and PIPE_NOTIFIER_ENABLED:
                    self.__pipeNotifier.notify()
            except Queue.Full:
                self.__callErrCallback(FAIL_REASON.QUEUE_FULL, callback)
    

    把命令放到队列之中

    消息类型:apply_command_response

    if message['type'] == 'apply_command_response':
        requestID = message['request_id']
        error = message.get('error', None)
        callback = self.__commandsWaitingReply.pop(requestID, None)
        if callback is not None:
            if error is not None:
                callback(None, error)
            else:
                idx = message['log_idx']
                term = message['log_term']
                assert idx > self.__raftLastApplied
                self.__commandsWaitingCommit[idx].append((term, callback))
    

    消息类型:nextnode_idx

     if self.__raftState == _RAFT_STATE.LEADER:
        if message['type'] == 'next_node_idx':
            reset = message['reset']
            nextNodeIdx = message['next_node_idx']
            success = message['success']
    
            currentNodeIdx = nextNodeIdx - 1
            if reset:
                # 对于每一个服务器,需要发送给他的下一个日志条目的索引值
                self.__raftNextIndex[nodeAddr] = nextNodeIdx
            if success:
                # 对于每一个服务器,已经复制给他的日志的最高索引值
                self.__raftMatchIndex[nodeAddr] = currentNodeIdx
            self.__lastResponseTime[nodeAddr] = time.time()
    

    _onTick 最为复杂的函数

    1. 首先先检查是否初始化过
    2. 查看是否需要加载文件,默认是不需要的

    判断是否需要重新选举

    #初始的时候状态时FOLLOWER
    if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE) 
            and self.__selfNodeAddr is not None: 
            
        #第一个条件表示:超时没收到Leader的心跳包,超时——需要重新选举;
        #第二个条件判断是否和其他节点有连接,第一次运行的时候是没有的        
        if self.__raftElectionDeadline < time.time() and self.__connectedToAnyone(): 
            #设置新的选举死亡线,准备重新选举 ,__generateRaftTimeout()                      
            self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout()   
            #表明现在集群中没有Leader              
            self.__raftLeader = None           
            #设置本节点的新的身份(候选者
            #此时self.__raftState=_RAFT_STATE.CANDIDATE                            
            self.__setState(_RAFT_STATE.CANDIDATE)          
            #进入到下一个term   ,CurrentTerm 表示的是本节点的任期                    
            self.__raftCurrentTerm += 1                   
            #先为自己投票                                  
            self.__votedFor = self._getSelfNodeAddr()                
            self.__votesCount = 1                 
            #依次给其他的节点发送投票请求  ,刚开始和其他节点还没有连接的时候发布出去                        
            for node in self.__nodes:   
                node.send({             
                    'type': 'request_vote',              
                    'term': self.__raftCurrentTerm,          
                    'last_log_index': self.__getCurrentLogIndex(),        
                    'last_log_term': self.__getCurrentLogTerm(),              
                })                   
            self.__onLeaderChanged()      
            if self.__votesCount > (len(self.__nodes) + 1) / 2:  
            #票数为所有节点的一半以上,成为Leader                   
                self.__onBecomeLeader()    
    

    符合服务器的设计遵守的原则:如果超过选举超时时间的情况下都有接收到领导人的萧条,或者候选人请求投票,自己变为候选人

    self.__onLeaderChanged()

     def __onLeaderChanged(self):                                                 
         for id in sorted(self.__commandsWaitingReply):                           
             self.__commandsWaitingReply[id](None, FAIL_REASON.LEADER_CHANGED)    
         self.__commandsWaitingReply = {}                                         
    

    如果本节点是Leader的处理

    if self.__raftState == _RAFT_STATE.LEADER:
        # 表明现在还有部分的日志条目未提交
        while self.__raftCommitIndex < self.__getCurrentLogIndex():
            nextCommitIndex = self.__raftCommitIndex + 1
            count = 1
            for node in self.__nodes:
                if self.__raftMatchIndex[node.getAddress()] >= nextCommitIndex:
                    count += 1
            # 表示超过半数的节点以及复制了该条目,更新raftCommitIndex的值
            if count > (len(self.__nodes) + 1) / 2:
                self.__raftCommitIndex = nextCommitIndex
            else:
                break
        # 表示所有的日志条目已经被提交
        self.__leaderCommitIndex = self.__raftCommitIndex
        deadline = time.time() - self.__conf.leaderFallbackTimeout
        count = 1
        for node in self.__nodes:
            if self.__lastResponseTime[node.getAddress()] > deadline:
                count += 1
        #判断是否Leader与超过一般以上的节点均超时,如果是则变为Follower,重新选举
        if count <= (len(self.__nodes) + 1) / 2:
            self.__setState(_RAFT_STATE.FOLLOWER)
            self.__raftLeader = None
    
    

    已经复制的条目中没有应用到状态机的条目的处理

    if self.__raftCommitIndex > self.__raftLastApplied:
        count = self.__raftCommitIndex - self.__raftLastApplied
        
        #返已经复制的条目中没有应用到状态机的条目
        entries = self.__getEntries(self.__raftLastApplied + 1, count)
        #entry中含有
        for entry in entries:
            try:
                # 取出条目的任期
                currentTermID = entry[2]
                
                # __commandsWaitingCommit取出对应索引的元素
                subscribers = self.__commandsWaitingCommit.pop(entry[1], [])
                # 应用命令,跳转到下一个命令可以看__doApplyCommand分析
                # 取出条目中对应的需要复制的方法
                res = self.__doApplyCommand(entry[0])
                for subscribeTermID, callback in subscribers:
                    if subscribeTermID == currentTermID:
                        # 调用对应的方法
                        callback(res, FAIL_REASON.SUCCESS)
                    else:
                        callback(None, FAIL_REASON.DISCARDED)
                 #每次应用到状态机的日志都会+1
                self.__raftLastApplied += 1
            except SyncObjExceptionWrongVer as e:
                logging.error(
                    'request to switch to unsupported code version (self version: %d, requested version: %d)' %
                    (self.__selfCodeVersion, e.ver))
         #appendEntriesUseBatch默认为TRUE
        if not self.__conf.appendEntriesUseBatch:
            needSendAppendEntries = True
    

    从__commandsWaitingCommit中取出事件,
    将命令应用状态机

    __doApplyCommand()

        def __doApplyCommand(self, command):
            commandType = ord(command[:1])
            # Skip no-op and membership change commands
            if commandType == _COMMAND_TYPE.VERSION:
                #把字节转换为数据,版本的情况
                ver = pickle.loads(command[1:])
                #此时的命令版本过高,本节点没有能处理该command的方法
                if self.__selfCodeVersion < ver:
                    raise SyncObjExceptionWrongVer(ver)
    
                oldVer = self.__enabledCodeVersion
                # 获得新的版本号
                self.__enabledCodeVersion = ver
                # onCodeVersionChanged默认为None
                # This callback will be called 
                # when cluster is switched to new version.
                callback = self.__conf.onCodeVersionChanged
                # 设置新的可以方法版本的集合!
                # 相当于给了一个上限,节点中高于这个版本的方法统统不可用
                self.__onSetCodeVersion(ver)
                # 默认为None,暂时不看
                if callback is not None:
                    callback(oldVer, ver)
                # 关于可用版本的设置命令“消息”处理完毕
                return
            if commandType != _COMMAND_TYPE.REGULAR:
                return
            # 处理REGULAR 的Command,字节转换为对象
            command = pickle.loads(command[1:])
            args = []
            kwargs = {
                '_doApply': True,
            }
            # 非元组的Command-即不带参数的Command?
            if not isinstance(command, tuple):
                funcID = command
            # 带参数的Command
            elif len(command) == 2:
                funcID, args = command
            else:
                # 不仅是元组,切带了关键字参数
                funcID, args, newKwArgs = command
                kwargs.update(newKwArgs)
            #返回含有复制装饰器的方法,同时传入参数
            return self._idToMethod[funcID](*args, **kwargs)
    

    主要是通过entry[0]取出对应的需要复制的方法,

    定期发送心跳包

    if self.__raftState == _RAFT_STATE.LEADER:
        #needSendAppendEntries表示需要发送心跳包了,
        if time.time() > self.__newAppendEntriesTime 
              or needSendAppendEntries:
            self.__sendAppendEntries()
    

    选举成功或者周期到了,需要发送心跳包

    onReady处理

    # callback when hock SyncObj
    # 初始化Syncobj时,__onReadyCalled为False 
    # self.__raftLastApplied == self.__leaderCommitIndex此时表示所有日志条目已经被复制到集群节点
    if not self.__onReadyCalled and self.__raftLastApplied == self.__leaderCommitIndex:
        #只要SyncObj同步来自leader的所有数据,就会调用此回调。
        # onReady,默认为None
        if self.__conf.onReady:
            self.__conf.onReady()
        # 此时变化为True
        self.__onReadyCalled = True
    

    暂时不管,因为没有onReady()

    self._checkCommandsToApply()

    与其他节点建立好连接

            for node in self.__nodes:
                node.connectIfRequired() 
    

    此部分,上面内容已经分析过

    每秒执行的任务

    if time.time() > self.__lastReadonlyCheck + 1.0:
        self.__lastReadonlyCheck = time.time()
        newReadonlyNodes = []
        for node in self.__readonlyNodes:
            if node.isConnected():
                newReadonlyNodes.append(node)
            else:
                self.__raftNextIndex.pop(node, None)
                self.__raftMatchIndex.pop(node, None)
                node._destroy()
    

    先不管了,此部分是对只读节点的处理

    相关文章

      网友评论

          本文标题:PySyncObj 源码解读 | Raft

          本文链接:https://www.haomeiwen.com/subject/bwmwqftx.html