创建日志对象,默认是新建一个MemoryJournal对象
self.__raftLog = createJournal(self.__conf.journalFile)
#日志对象含有的属性:
self.__journal = [] #[]存放的是 'command' 'idx' 'term'
self.__bytesSize = 0
创建序列化对象
self.__serializer = Serializer(self.__conf.fullDumpFile,
self.__conf.logCompactionBatchSiz
self.__conf.useFork,
self.__conf.serializer,
self.__conf.deserializer,
self.__conf.serializeChecker)
创建Poller
self._poller = createPoller(self.__conf.pollerType)
首先判断有没有select有没有poll属性:
- 有:返回PollPoller对象
- 没有:返回SelectPoller()对象
通过poller来实现IO多路复用
创建TCP的服务端 | tcp_server.py 中的TcpServer的对象
self.__server = TcpServer(self._poller, host, port, onNewConnection=self.__o
sendBufferSize=self.__conf.sendBufferSize,
recvBufferSize=self.__conf.recvBufferSize,
connectionTimeout=self.__conf.connectionTimeout)
传入onNewConnection=self.__onNewConnection作为新连接的处理函数
此时只是创建了一个服务端的对象,初始化了一些属性,没怎么处理,甚至还没有创建套接字
获得本SyncObj对象的需要复制的方法
methods = [m for m in dir(self) if callable(getattr(self, m)) and \
getattr(getattr(self, m), 'replicated', False) and \
m != getattr(getattr(self, m), 'origName')]
把需要复制的所有“方法”和“版本号“的存放到字典(funcVersions)
funcVersions[origFuncName].add(ver)
添加{"方法1":"版本号1","方法2":"版本号2",...}
此时获取的是所有带有”复制“功能的方法,而存储的是是set
可能同一个方法名有多个版本
#存放了方法集合,元素是("版本号",0,方法,对象)
for method in methods:
ver = getattr(getattr(self, method), 'ver') #返回方法的'ver'
methodsToEnumerate.append((ver, 0, method, self))
从复制方法集中找到最高版本的方法的版本号——self.__selfCodeVersion
for ver, _, method, obj in sorted(methodsToEnumerate):
self.__selfCodeVersion = max(self.__selfCodeVersion, ver) #存放最新版本的方法?保证从各个复制方法总得到最大的版本号码放入
if obj is self:
self._methodToID[method] = currMethodID #存放方法ID
else:
self._methodToID[(id(obj), method)] = currMethodID
self._idToMethod[currMethodID] = getattr(obj, method) #存放{'currMethodId':方法}
currMethodID += 1
存放两个列表:
_methodToID:里面存着方法和对应的方法号 #存放{方法:currMethodID}
_idToMethod:里面存着方法号和对应的方法 #存放{currMethodId :方法}
__onSetCodeVersion(0)
#此处和之前一样,取出需要复制的方法
methods = [m for m in dir(self) if callable(getattr(self, m)) and \
getattr(getattr(self, m), 'replicated', False) and \
m != getattr(getattr(self, m), 'origName')]
# 当前版本的方法名
self.__currentVersionFuncNames = {}
funcVersions = collections.defaultdict(set) #funcVersion字典,如果key值不存在时,返回一个set集合
for method in methods:
ver = getattr(getattr(self, method), 'ver')
origFuncName = getattr(getattr(self, method), 'origName')
funcVersions[origFuncName].add(ver) #添加{"方法名":{ver1,ver2...}} 可能存在多个版本的方法
#暂且不看,因为consumer默认为none
for consumer in self.__consumers:
consumerID = id(consumer)
consumerMethods = [m for m in dir(consumer) if callable(getattr(consumer, m)) and \
getattr(getattr(consumer, m), 'replicated', False)]
for method in consumerMethods:
ver = getattr(getattr(consumer, method), 'ver')
origFuncName = getattr(getattr(consumer, method), 'origName')
funcVersions[(consumerID, origFuncName)].add(ver)
for funcName, versions in iteritems(funcVersions):
versions = sorted(list(versions))
for v in versions:
if v > newVersion:
break
realFuncName = funcName[1] if isinstance(funcName, tuple) else funcName
#得到一个方法和版本的集合,都是版本比setcodeVersion版本低的复制方法
self.__currentVersionFuncNames[funcName] = realFuncName + '_v' + str(v)
相当于只有比setCodeVersion版本更低的方法是可以用的,指定了可用方法!
__bindedEvent = threading.Event()
Python threading模块提供了Event对象用于线程间通信,它提供了设置、清除、等待等方法用于实现线程间的通信。event是最简单的进程间通信方式之一,一个线程产生一个信号,另一个线程则等待该信号。Python 通过threading.Event()产生一个event对象,event对象维护一个内部标志(标志初始值为False),通过set()将其置为True,wait(timeout)则用于阻塞线程直至Flag被set(或者超时,可选的),isSet()用于查询标志位是否为True,Clear()则用于清除标志位(使之为False)。
PipeNotifier(self._poller)
#appendEntriesUseBatch默认为TRUE
if not self.__conf.appendEntriesUseBatch and PIPE_NOTIF
self.__pipeNotifier = PipeNotifier(self._poller)
主要的函数处理
self.__mainThread = threading.current_thread()
self.__initialised = threading.Event()
#创建一个线程,传入的参数是本实例对象的"弱引用"
self.__thread = threading.Thread(target=SyncObj._autoTickThread, args=(weakref.proxy(self),))
self.__thread.start() #启动线程活动
self.__initialised.wait() #阻塞线程,直到Event.set(),由此可以看到线程SyncObj._autoTickThread会把线程不阻塞,
创建的线程执行的函数 _autoTickThread(self)
def _autoTickThread(self):
try:
self.__initInTickThread()
except SyncObjException as e:
if e.errorCode == 'BindError':
return
raise
finally:
self.__initialised.set() #set(), 可以继续执行线程了
time.sleep(0.1)
try:
while True:
if not self.__mainThread.is_alive():
break
if self.__destroying:
self._doDestroy()
break
self._onTick(self.__conf.autoTickPeriod)
except ReferenceError:
pass
self.__initialised.set()目的:保证先执行__initInTickThread,再调用self._onTick
__initInTickThread
def __initInTickThread(self):
try:
self.__lastInitTryTime = time.time() #上次初始化的时间
if self.__selfNodeAddr is not None:
self.__server.bind() #开始对TCP服务端端口绑定
shouldConnect = None
else:
shouldConnect = True
self.__nodes = []
for nodeAddr in self.__otherNodesAddrs:
#刚开始shouldConnect是None
self.__nodes.append(Node(self, nodeAddr, shouldConnect))
self.__raftNextIndex[nodeAddr] = self.__getCurrentLogIndex() + 1
self.__raftMatchIndex[nodeAddr] = 0 #刚开始的时候已经复制日志的最高索引值为0
self.__needLoadDumpFile = True
self.__isInitialized = True
self.__bindedEvent.set()
except:
self.__bindRetries += 1
if self.__conf.maxBindRetries and self.__bindRetries >= self.__conf.maxBindRetries:
self.__bindedEvent.set()
raise SyncObjException('BindError')
logging.exception('failed to perform initialization')
开始对TCP服务端端口绑定
bind | tcp_server.py
def bind(self):
self.__socket = socket.socket(self.__hostAddrType, socket.SOCK_STREAM)
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__sendBufferSize)
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__recvBufferSize)
self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) #不适用Nagle算法
self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #允许重用本地地址和端口
self.__socket.setblocking(0) #设置为非阻塞模式
self.__socket.bind((self.__host, self.__port))
self.__socket.listen(5)
self.__fileno = self.__socket.fileno() #返回监听Socket的文件描述符
logging.warning(self.__fileno)
self.__poller.subscribe(self.__fileno,
self.__onNewConnection,
POLL_EVENT_TYPE.READ | POLL_EVENT_TYPE.ERROR)
self.__state = SERVER_STATE.BINDED
主要:
- 创建TCP Socket套接字
- 修改套接字的选项
- 设置setblocking为0非阻塞模式,此时accept()不会阻塞等待,可以实现多任务
- 向poll注册事件,用于生成为客户端服务的套接字
def __onNewConnection | tcp_server.py
用于接收客户端的connect或者ERROR的时候解绑
conn = TcpConnection(poller=self.__poller,
socket=sock,
timeout=self.__connectionTimeout,
sendBufferSize=self.__sendBufferSize,
recvBufferSize=self.__recvBufferSize)
self.__onNewConnectionCallback(conn)
TcpConnection属于tcp_connection.py
self.__onNewConnectionCallback的函数是SynvObj里的__onNewConnection函数,用于处理新的连接。
此时并没有给出__onMessageReceived函数和__onConnected函数
__onNewConnection做了什么?
为客户端的这个服务,将其放到self.__unknownConnections[descr] = conn
同时绑定方法:SyncObj的__onMessageReceived和__onDisconnected
创建TcpConection对象中
设置:self.__state = CONNECTION_STATE.CONNECTED
向poll注册事件,一旦有接收消息或者发送消息事件的时候,调用__processConnection
__processConnection
if time.time() - self.__lastReadTime > self.__timeout:
self.disconnect()
return
判断:如果接收到的消息发现事件已经比上次读的时间超过了10秒,超时数据不处理,断开连接。
#初次接收消息的时候,self__state == CONNECTION_STATE.CONNECTING
if self.__state == CONNECTION_STATE.CONNECTING:
#初始的时候,还是等于None的
if self.__onConnected is not None:
self.__onConnected()
self.__state = CONNECTION_STATE.CONNECTED
self.__lastReadTime = time.time()
return
第一次收到消息,表示连接已经建立好
接收数据:
if eventType & POLL_EVENT_TYPE.READ:
self.__tryReadBuffer()
if self.__state == CONNECTION_STATE.DISCONNECTED:
return
while True:
message = self.__processParseMessage()
if message is None:
break
if self.__onMessageReceived is not None:
#把消息传到syncobj的消息接受函数中处理
self.__onMessageReceived(message)
if self.__state == CONNECTION_STATE.DISCONNECTED:
return
__tryReadBuffer():主要是等待__processRead返回TRUE时
def __processRead(self):
try:
incoming = self.__socket.recv(self.__recvBufferSize)
except socket.error as e:
if e.errno not in (socket.errno.EAGAIN, socket.errno.EWOULDBLOCK):
self.disconnect()
return False
if self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR):
self.disconnect()
return False
if not incoming:
self.disconnect()
return False
self.__readBuffer += incoming
return True
message = self.__processParseMessage()开始解析数据:
def __processParseMessage(self):
if len(self.__readBuffer) < 4:
return None
#l是指定的数据长度?
l = struct.unpack('i', self.__readBuffer[:4])[0]
if len(self.__readBuffer) - 4 < l:
return None
data = self.__readBuffer[4:4 + l]
try:
if self.encryptor:
data = self.encryptor.decrypt(data)
message = pickle.loads(zlib.decompress(data))
if self.recvRandKey:
randKey, message = message
assert randKey == self.recvRandKey
except:
self.disconnect()
return None
self.__readBuffer = self.__readBuffer[4 + l:]
return message
提取数据部分并返回!
调用Synvobj传来的接收消息处理函数__onMessageReceived处理message
__onMessageReceived
# message不是应该是字符串吗?怎么是list?
if isinstance(message, list) and self.__onUtilityMessage(conn, message):
self.__unknownConnections.pop(descr, None)
return
# 消息只包含节点地址
partnerNode = None
for node in self.__nodes:
if node.getAddress() == message:
partnerNode = node
break
if partnerNode is None and message != 'readonly':
conn.disconnect()
self.__unknownConnections.pop(descr, None)
return
# 处理传来集群其他节点IP地址的消息
if partnerNode is not None:
partnerNode.onPartnerConnected(conn)
else:
nodeAddr = str(self.__readonlyNodesCounter)
node = Node(self, nodeAddr, shouldConnect=False)
node.onPartnerConnected(conn)
self.__readonlyNodes.append(node)
self.__raftNextIndex[nodeAddr] = self.__getCurrentLogIndex() + 1
self.__raftMatchIndex[nodeAddr] = 0
self.__readonlyNodesCounter += 1
self.__unknownConnections.pop(descr, None)
__onUtilityMessage
判断消息是否有效,无效时返回FALSE
#消息类型
if message[0] == 'status':
conn.send(self.getStatus())
return True
elif message[0] == 'add':
# 默认没开启动态添加节点到集群的功能
self.addNodeToCluster(message[1],
callback=functools.partial(self.__utilityCallback, conn=conn, cmd='ADD',
node=message[1]))
return True
elif message[0] == 'remove':
if message[1] == self.__selfNodeAddr:
conn.send('FAIL REMOVE ' + message[1])
else:
# 默认没开启动态添加节点到集群的功能
self.removeNodeFromCluster(message[1], callback=functools.partial(self.__utilityCallback, conn=conn,
cmd='REMOVE', node=message[1]))
return True
elif message[0] == 'set_version':
# 切换到所有集群节点上的新代码版本
self.setCodeVersion(message[1],
callback=functools.partial(self.__utilityCallback, conn=conn, cmd='SET_VERSION',
node=str(message[1])))
return True
也就是说,如果消息是请求本节点的状态或者切换新代码的版本,此时就返回True
切换到集群节点的新版本
def setCodeVersion(self, newVersion, callback=None):
"""切换到所有群集节点上的新代码版本。
您应该确保群集节点已更新,否则将无法应用命令。
:param newVersion:新的代码版本
:type int
:param callback:将在cussess上调用或失败
:type callback:function(`FAIL_REASON <#pysyncobj.FAIL_REASON > `_,None)
"""
assert isinstance(newVersion, int)
if newVersion > self.__selfCodeVersion:
raise Exception(
'wrong version, current version is %d, requested version is %d' % (self.__selfCodeVersion, newVersion))
if newVersion < self.__enabledCodeVersion:
raise Exception('wrong version, enabled version is %d, requested version is %d' % (
self.__enabledCodeVersion, newVersion))
self._applyCommand(pickle.dumps(newVersion), callback, _COMMAND_TYPE.VERSION)
_applyCommand
def _applyCommand(self, command, callback, commandType=None):
try:
if commandType is None:
self.__commandsQueue.put_nowait((command, callback))
else:
# 放到队列中
self.__commandsQueue.put_nowait((_bchr(commandType) + command, callback))
# 默认appendEntriesUseBatch是TRUE,Send multiple entries in a single command. 提高整体性能
# 所以第一个条件就不满足了
if not self.__conf.appendEntriesUseBatch and PIPE_NOTIFIER_ENABLED:
self.__pipeNotifier.notify()
except Queue.Full:
self.__callErrCallback(FAIL_REASON.QUEUE_FULL, callback)
node 的 onPartnerConnected方法
添加方法:
conn.setOnMessageReceivedCallback(self.__onMessageReceived)
conn.setOnDisconnectedCallback(self.__onDisconnected)
self.__status = NODE_STATUS.CONNECTED
回到初始化__initInTickThread方法中
以上已经把TCP服务器建设好,以及新的连接过来怎么处理的函数都分析完了
接下来是节点对象的创建
for nodeAddr in self.__otherNodesAddrs:
self.__nodes.append(Node(self, nodeAddr, shouldConnect))
self.__raftNextIndex[nodeAddr] = self.__getCurrentLogIndex() + 1
self.__raftMatchIndex[nodeAddr] = 0
Node初始化里的判断shouldConnect判断有点奇怪!!
IP地址大的向小的发送TCP请求
if self.__shouldConnect:
self.__ip = globalDnsResolver().resolve(nodeAddr.rsplit(':', 1)[0])
self.__port = int(nodeAddr.rsplit(':', 1)[1])
# 新建TCP连接的对象,主次此处传入的是self的__onMessageReceived,onDisconnected
self.__conn = TcpConnection(poller=syncObj._poller,
onConnected=self.__onConnected,
onMessageReceived=self.__onMessageReceived,
onDisconnected=self.__onDisconnected,
timeout=syncObj._getConf().connectionTimeout,
sendBufferSize=syncObj._getConf().sendBufferSize,
recvBufferSize=syncObj._getConf().recvBufferSize)
self.__conn.encryptor = self.__encryptor
此时创建的TCPConnection的对象,没有socket,
此时跳转到onTick函数里
for node in self.__nodes:
node.connectIfRequired() #与其他节点均建立连接
再次看到node.py中的connectIfRequired:
def connectIfRequired(self):
if not self.__shouldConnect:
return
# 由于第一次创建node对象的时候,还未Connect,此时不进入这个if语句内
if self.__status != NODE_STATUS.DISCONNECTED:
return
if time.time() - self.__lastConnectAttemptTime < self.__syncObj()._getConf().connectionRetryTime:
return
# 尝试开始建立连接
self.__status = NODE_STATUS.CONNECTING
self.__lastConnectAttemptTime = time.time()
#此时调用tcp_connection.py中的connect方法
if not self.__conn.connect(self.__ip, self.__port):
self.__status = NODE_STATUS.DISCONNECTED
return
connect方法中,连接到比本节点IP地址更小的IP的节点
try:
self.__socket.connect((host, port))
logging.debug('__socket.connect')
连接完后,更改self.__fileno,已经状态改为CONNECTING,
注册收发处理函数__processConnection
__processConnection和之前的那个是差不多的,不过此处的__onMessageReceived已经不一样了,此处用的是Syncobj里的_onMessageReceived
_onMessageReceived | Syncobj.py
消息类型是request_vote
如果消息的任期大于本节点的任期:
- 更改任期为接收到的任期
- 设置__votedFor=None
- 设置本节点状态为FOLLOWER
- 收到这个消息,说明是没有Leader,设置self.__raftLeader = None
本节点如果是FOLLOWER或者是候选者:
获取发送消息的候选人的最后日志的任期和最后日志的索引
遇到以下情况直接返回:
- 候选人比本节点的任期小
- 候选人与本节点的任期相同,但是索引比本节点小
- 任期如果和本节点相同,本节点已经为其他节点投票,
否则,为该节点投票,发送回复消息:response_vote
消息类型是append_entries,同时消息的类型是大于等于本节点的任期的
因为只有Leader节点会发这个消息,首先判断发送这个消息的节点是不是Leader,如果不是,设置它为Leader节点
再对消息的任期判断:
if message['term'] > self.__raftCurrentTerm:
# 更新最新的任期
self.__raftCurrentTerm = message['term']
# 收到附加消息,表示已经有Leader了,不需要投票了,清空
self.__votedFor = None
# 设置本节点为FOLLOWER,主要是在候选者身份竞争Leader失败时,变回FOLLOWER
self.__setState(_RAFT_STATE.FOLLOWER)
提取message里的条目,
# 提取消息内容
newEntries = message.get('entries', [])
serialized = message.get('serialized', None)
# Leader已经提交的日志的索引值,更新本节点记录的信息
self.__leaderCommitIndex = leaderCommitIndex = message['commit_index']
常规的附加消息的处理:
if transmission is not None:
if transmission == 'start':
self.__recvTransmission = message['data']
self.__sendNextNodeIdx(nodeAddr, success=False, reset=False)
return
elif transmission == 'process':
self.__recvTransmission += message['data']
self.__sendNextNodeIdx(nodeAddr, success=False, reset=False)
return
elif transmission == 'finish':
self.__recvTransmission += message['data']
#本此数据接收完了,字节转换回对象
newEntries = [pickle.loads(self.__recvTransmission)]
self.__recvTransmission = ''
else:
raise Exception('Wrong transmission type')
prevLogIdx = message['prevLogIdx']
prevLogTerm = message['prevLogTerm']
# 拿出新的日志条目
prevEntries = self.__getEntries(prevLogIdx)
# 如果本节点没有Leader发送的附加消息的日志条目,请求对方发送
if not prevEntries:
self.__sendNextNodeIdx(nodeAddr, success=False, reset=True)
return
# 日志的条目对应的任期不一致,请求再发?
if prevEntries[0][2] != prevLogTerm:
self.__sendNextNodeIdx(nodeAddr, nextNodeIdx=prevLogIdx, success=False, reset=True)
return
# 有多个日志条目
if len(prevEntries) > 1:
# rollback cluster changes
# 先不看,默认没开启
if self.__conf.dynamicMembershipChange:
for entry in reversed(prevEntries[1:]):
clusterChangeRequest = self.__parseChangeClusterRequest(entry[0])
if clusterChangeRequest is not None:
self.__doChangeCluster(clusterChangeRequest, reverse=True)
# 有多个说明与当前Leader节点有冲突,这些日志条目要删除
self.__deleteEntriesFrom(prevLogIdx + 1)
# 添加新的条目到节点日志里,newEntries是从Leader传来的message中提取的
for entry in newEntries:
self.__raftLog.add(*entry)
# apply cluster changes
# 暂时不管
if self.__conf.dynamicMembershipChange:
for entry in newEntries:
clusterChangeRequest = self.__parseChangeClusterRequest(entry[0])
if clusterChangeRequest is not None:
self.__doChangeCluster(clusterChangeRequest)
nextNodeIdx = prevLogIdx + 1
if newEntries:
# 主要是针对多个条目的情况,取最新的索引,为什么不加1呢?
nextNodeIdx = newEntries[-1][1]
# 附加消息添加成功,返回下次节点接收的索引
self.__sendNextNodeIdx(nodeAddr, nextNodeIdx=nextNodeIdx, success=True)
快照部分先不管
self.__raftCommitIndex = min(leaderCommitIndex, self.__getCurrentLogIndex())
通过最新的日志条目的索引和Leader的索引来比较,得到最近已经提交的条目索引
消息类型为:apply_command
if message['type'] == 'apply_command':
if 'request_id' in message:
self._applyCommand(message['command'], (nodeAddr, message['request_id']))
else:
self._applyCommand(message['command'], None)
调用了_applyCommand
def _applyCommand(self, command, callback, commandType=None):
try:
if commandType is None:
self.__commandsQueue.put_nowait((command, callback))
else:
self.__commandsQueue.put_nowait((_bchr(commandType) + command, callback))
if not self.__conf.appendEntriesUseBatch and PIPE_NOTIFIER_ENABLED:
self.__pipeNotifier.notify()
except Queue.Full:
self.__callErrCallback(FAIL_REASON.QUEUE_FULL, callback)
把命令放到队列之中
消息类型:apply_command_response
if message['type'] == 'apply_command_response':
requestID = message['request_id']
error = message.get('error', None)
callback = self.__commandsWaitingReply.pop(requestID, None)
if callback is not None:
if error is not None:
callback(None, error)
else:
idx = message['log_idx']
term = message['log_term']
assert idx > self.__raftLastApplied
self.__commandsWaitingCommit[idx].append((term, callback))
消息类型:nextnode_idx
if self.__raftState == _RAFT_STATE.LEADER:
if message['type'] == 'next_node_idx':
reset = message['reset']
nextNodeIdx = message['next_node_idx']
success = message['success']
currentNodeIdx = nextNodeIdx - 1
if reset:
# 对于每一个服务器,需要发送给他的下一个日志条目的索引值
self.__raftNextIndex[nodeAddr] = nextNodeIdx
if success:
# 对于每一个服务器,已经复制给他的日志的最高索引值
self.__raftMatchIndex[nodeAddr] = currentNodeIdx
self.__lastResponseTime[nodeAddr] = time.time()
_onTick 最为复杂的函数
- 首先先检查是否初始化过
- 查看是否需要加载文件,默认是不需要的
判断是否需要重新选举
#初始的时候状态时FOLLOWER
if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE)
and self.__selfNodeAddr is not None:
#第一个条件表示:超时没收到Leader的心跳包,超时——需要重新选举;
#第二个条件判断是否和其他节点有连接,第一次运行的时候是没有的
if self.__raftElectionDeadline < time.time() and self.__connectedToAnyone():
#设置新的选举死亡线,准备重新选举 ,__generateRaftTimeout()
self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout()
#表明现在集群中没有Leader
self.__raftLeader = None
#设置本节点的新的身份(候选者
#此时self.__raftState=_RAFT_STATE.CANDIDATE
self.__setState(_RAFT_STATE.CANDIDATE)
#进入到下一个term ,CurrentTerm 表示的是本节点的任期
self.__raftCurrentTerm += 1
#先为自己投票
self.__votedFor = self._getSelfNodeAddr()
self.__votesCount = 1
#依次给其他的节点发送投票请求 ,刚开始和其他节点还没有连接的时候发布出去
for node in self.__nodes:
node.send({
'type': 'request_vote',
'term': self.__raftCurrentTerm,
'last_log_index': self.__getCurrentLogIndex(),
'last_log_term': self.__getCurrentLogTerm(),
})
self.__onLeaderChanged()
if self.__votesCount > (len(self.__nodes) + 1) / 2:
#票数为所有节点的一半以上,成为Leader
self.__onBecomeLeader()
符合服务器的设计遵守的原则:如果超过选举超时时间的情况下都有接收到领导人的萧条,或者候选人请求投票,自己变为候选人
self.__onLeaderChanged()
def __onLeaderChanged(self):
for id in sorted(self.__commandsWaitingReply):
self.__commandsWaitingReply[id](None, FAIL_REASON.LEADER_CHANGED)
self.__commandsWaitingReply = {}
如果本节点是Leader的处理
if self.__raftState == _RAFT_STATE.LEADER:
# 表明现在还有部分的日志条目未提交
while self.__raftCommitIndex < self.__getCurrentLogIndex():
nextCommitIndex = self.__raftCommitIndex + 1
count = 1
for node in self.__nodes:
if self.__raftMatchIndex[node.getAddress()] >= nextCommitIndex:
count += 1
# 表示超过半数的节点以及复制了该条目,更新raftCommitIndex的值
if count > (len(self.__nodes) + 1) / 2:
self.__raftCommitIndex = nextCommitIndex
else:
break
# 表示所有的日志条目已经被提交
self.__leaderCommitIndex = self.__raftCommitIndex
deadline = time.time() - self.__conf.leaderFallbackTimeout
count = 1
for node in self.__nodes:
if self.__lastResponseTime[node.getAddress()] > deadline:
count += 1
#判断是否Leader与超过一般以上的节点均超时,如果是则变为Follower,重新选举
if count <= (len(self.__nodes) + 1) / 2:
self.__setState(_RAFT_STATE.FOLLOWER)
self.__raftLeader = None
已经复制的条目中没有应用到状态机的条目的处理
if self.__raftCommitIndex > self.__raftLastApplied:
count = self.__raftCommitIndex - self.__raftLastApplied
#返已经复制的条目中没有应用到状态机的条目
entries = self.__getEntries(self.__raftLastApplied + 1, count)
#entry中含有
for entry in entries:
try:
# 取出条目的任期
currentTermID = entry[2]
# __commandsWaitingCommit取出对应索引的元素
subscribers = self.__commandsWaitingCommit.pop(entry[1], [])
# 应用命令,跳转到下一个命令可以看__doApplyCommand分析
# 取出条目中对应的需要复制的方法
res = self.__doApplyCommand(entry[0])
for subscribeTermID, callback in subscribers:
if subscribeTermID == currentTermID:
# 调用对应的方法
callback(res, FAIL_REASON.SUCCESS)
else:
callback(None, FAIL_REASON.DISCARDED)
#每次应用到状态机的日志都会+1
self.__raftLastApplied += 1
except SyncObjExceptionWrongVer as e:
logging.error(
'request to switch to unsupported code version (self version: %d, requested version: %d)' %
(self.__selfCodeVersion, e.ver))
#appendEntriesUseBatch默认为TRUE
if not self.__conf.appendEntriesUseBatch:
needSendAppendEntries = True
从__commandsWaitingCommit中取出事件,
将命令应用状态机
__doApplyCommand()
def __doApplyCommand(self, command):
commandType = ord(command[:1])
# Skip no-op and membership change commands
if commandType == _COMMAND_TYPE.VERSION:
#把字节转换为数据,版本的情况
ver = pickle.loads(command[1:])
#此时的命令版本过高,本节点没有能处理该command的方法
if self.__selfCodeVersion < ver:
raise SyncObjExceptionWrongVer(ver)
oldVer = self.__enabledCodeVersion
# 获得新的版本号
self.__enabledCodeVersion = ver
# onCodeVersionChanged默认为None
# This callback will be called
# when cluster is switched to new version.
callback = self.__conf.onCodeVersionChanged
# 设置新的可以方法版本的集合!
# 相当于给了一个上限,节点中高于这个版本的方法统统不可用
self.__onSetCodeVersion(ver)
# 默认为None,暂时不看
if callback is not None:
callback(oldVer, ver)
# 关于可用版本的设置命令“消息”处理完毕
return
if commandType != _COMMAND_TYPE.REGULAR:
return
# 处理REGULAR 的Command,字节转换为对象
command = pickle.loads(command[1:])
args = []
kwargs = {
'_doApply': True,
}
# 非元组的Command-即不带参数的Command?
if not isinstance(command, tuple):
funcID = command
# 带参数的Command
elif len(command) == 2:
funcID, args = command
else:
# 不仅是元组,切带了关键字参数
funcID, args, newKwArgs = command
kwargs.update(newKwArgs)
#返回含有复制装饰器的方法,同时传入参数
return self._idToMethod[funcID](*args, **kwargs)
主要是通过entry[0]取出对应的需要复制的方法,
定期发送心跳包
if self.__raftState == _RAFT_STATE.LEADER:
#needSendAppendEntries表示需要发送心跳包了,
if time.time() > self.__newAppendEntriesTime
or needSendAppendEntries:
self.__sendAppendEntries()
选举成功或者周期到了,需要发送心跳包
onReady处理
# callback when hock SyncObj
# 初始化Syncobj时,__onReadyCalled为False
# self.__raftLastApplied == self.__leaderCommitIndex此时表示所有日志条目已经被复制到集群节点
if not self.__onReadyCalled and self.__raftLastApplied == self.__leaderCommitIndex:
#只要SyncObj同步来自leader的所有数据,就会调用此回调。
# onReady,默认为None
if self.__conf.onReady:
self.__conf.onReady()
# 此时变化为True
self.__onReadyCalled = True
暂时不管,因为没有onReady()
self._checkCommandsToApply()
与其他节点建立好连接
for node in self.__nodes:
node.connectIfRequired()
此部分,上面内容已经分析过
每秒执行的任务
if time.time() > self.__lastReadonlyCheck + 1.0:
self.__lastReadonlyCheck = time.time()
newReadonlyNodes = []
for node in self.__readonlyNodes:
if node.isConnected():
newReadonlyNodes.append(node)
else:
self.__raftNextIndex.pop(node, None)
self.__raftMatchIndex.pop(node, None)
node._destroy()
先不管了,此部分是对只读节点的处理
网友评论