摘要
本节讲解QuorumPeer源码,网上资源较少,不知道应该怎么合适的翻译
QuorumPeer主要记录当前服务器各种信息,然后是一个线程,不断地完成leader选举(崩溃恢复以及原子广播这两个阶段的更迭
本节内容较多,主要讲解
内部类
ServerState记录当前server所处的状态
LearnerType记录server参与投票,选举的类型
ResponderThread是旧版本udp选举的类
QuorumServer记录每个sid机器的地址,server间通信端口,以及server间选举leader端口
属性
方法
构造函数
get,set相关
启动相关函数
选举状态相关
epoch持久化相关
内部类
QuorumPeer内部类简介如下
ServerState
在上一节讲过,再贴出来,记录server所处的四种状态
LOOKING:寻找Leader状态。当服务器处于该状态时,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。
FOLLOWING:跟随者状态。表明当前服务器角色是Follower。
LEADING:领导者状态。表明当前服务器角色是Leader。
OBSERVING:观察者状态。表明当前服务器角色是Observer。
LearnerType
标记服务器是否能够参与投票,选举
PARTICIPANT,能够参与一致性的投票,以及选举leader
OBSERVER,只能观察
ResponderThread
这个不讲了,是3.4.0之前选举leader时用udp协议的版本,现在已经Deprecated了
QuorumServer
这个类就是记录每一个配置中server的主要信息
构造函数
public QuorumServer(long id, String hostname,
Integer port, Integer electionPort,
LearnerType type) {
this.id = id;//机器的serverId
this.hostname=hostname;//host
if (port!=null){
this.port=port;//选举完之后,leader和learner的通信端口
}
if (electionPort!=null){
this.electionPort=electionPort;//选举之前,各server参与选举的port
}
if (type!=null){
this.type = type;//server类型,为PARTICIPANT 或者 OBSERVER;
}
this.recreateSocketAddresses();
}
主要函数recreateSocketAddresses,创建socket
public void recreateSocketAddresses() {//解析host创建InetSocketAddress
InetAddress address = null;
try {
address = InetAddress.getByName(this.hostname);
LOG.info("Resolved hostname: {} to address: {}", this.hostname, address);
this.addr = new InetSocketAddress(address, this.port);
if (this.electionPort > 0){
this.electionAddr = new InetSocketAddress(address, this.electionPort);
}
} catch (UnknownHostException ex) {
LOG.warn("Failed to resolve address: {}", this.hostname, ex);
// Have we succeeded in the past?
if (this.addr != null) {
// Yes, previously the lookup succeeded. Leave things as they are
return;
}
// The hostname has never resolved. Create our InetSocketAddress(es) as unresolved
this.addr = InetSocketAddress.createUnresolved(this.hostname, this.port);
if (this.electionPort > 0){
this.electionAddr = InetSocketAddress.createUnresolved(this.hostname,
this.electionPort);
}
}
}
字段
字段非常多,主要字段(非log,jmx,旧版本ResponderThread相关等)列表如下
字段 | 意义 | 默认值 |
---|---|---|
QuorumCnxManager qcm; | 选举leader时的网络IO管理器 | |
ZKDatabase zkDb; | 数据库对象 | |
long OBSERVER_ID; | 表识observer | Long.MAX_VALUE; |
long start_fle, end_fle; | fast leader election开始,结束的时间 | |
LearnerType learnerType | server参与类型 | LearnerType.PARTICIPANT;默认参与者 |
Map<Long, QuorumServer> quorumPeers; | 记录每个sid对应的server | |
QuorumVerifier quorumConfig; | 集群验证器(通常为过半验证) | |
long myid; | 当前机器myid(sid) | |
Vote currentVote; | 某一轮投票最终认定的leader | |
Vote bcVote; | 向前兼容的vote,也代表认定的leader | |
boolean running | 是否不断完成leader选举以及原子广播 | true; |
int tickTime | 同QuorumPeerConfig中的定义 | |
int minSessionTimeout | 同QuorumPeerConfig中的定义 | |
int maxSessionTimeout | 同QuorumPeerConfig中的定义 | |
int initLimit,syncLimit | 同QuorumPeerConfig中的定义 | |
boolean syncEnabled | 同QuorumPeerConfig中的定义 | true; |
boolean quorumListenOnAllIPs | 同QuorumPeerConfig中的定义 | false; |
int tick; | 当前所处的ticktime周期数 | |
ServerState state | server当前状态,初始为looking状态 | ServerState.LOOKING; |
InetSocketAddress myQuorumAddr; | 当前机器的地址 | |
int electionType; | 选举算法类型编号,对应不同的electionAlg | |
Election electionAlg; | 选举算法 | |
ServerCnxnFactory cnxnFactory; | 与client进行IO | |
FileTxnSnapLog logFactory | 事务快照日志 | |
QuorumStats quorumStats | 状态对象 | |
Follower follower; | 如果是following状态时,服务器对应的follower | |
Leader leader; | 如果是leading状态,服务器对应的leader | |
Observer observer; | 如果是observer,服务器对应的observer | |
String SYNC_ENABLED | 判断observer是否同步的属性名称 | "zookeeper.observer.syncEnabled"; |
long acceptedEpoch | 接下来接收或者要成为epoch,会变动,稳定了之后就成了currentEpoch | -1; |
long currentEpoch | 当前所处的epoch,是稳定的 | -1; |
String CURRENT_EPOCH_FILENAME | currentEpoch对应的文件名 | "currentEpoch"; |
String ACCEPTED_EPOCH_FILENAME | acceptedEpoch对应的文件名 | "acceptedEpoch"; |
String UPDATING_EPOCH_FILENAME | updatingEpoch对应的文件名 | "updatingEpoch"; |
这里面
acceptedEpoch 和 currentEpoch 的意义是自己理解,代码注释没有
方法
也特别多,不截图了,把重要的列举出来
构造函数
有好几个,真正调用只有一个
public QuorumPeer() {
super("QuorumPeer");//线程名
quorumStats = new QuorumStats(this);//quorumStats并没有被用到
}
get,set相关
只列举重要的几个
getView
//返回当时整个集群的视图,就是所有server的id以及对应的配置
public Map<Long,QuorumPeer.QuorumServer> getView() {
return Collections.unmodifiableMap(this.quorumPeers);
}
getVotingView获取
//返回整个集群中,作为参与者的所有server的视图
public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
Map<Long,QuorumPeer.QuorumServer> ret =
new HashMap<Long, QuorumPeer.QuorumServer>();
Map<Long,QuorumPeer.QuorumServer> view = getView();
for (QuorumServer server : view.values()) {
if (server.type == LearnerType.PARTICIPANT) {
ret.put(server.id, server);
}
}
return ret;
}
countParticipants
//计算map中PARTICIPANT的server的个数,用于构造集群验证器
protected static int countParticipants(Map<Long,QuorumServer> peers) {
int count = 0;
for (QuorumServer q : peers.values()) {
if (q.type == LearnerType.PARTICIPANT) {
count++;
}
}
return count;
}
getLastLoggedZxid
获取lastZxid
public long getLastLoggedZxid() {
if (!zkDb.isInitialized()) {
loadDataBase();
}
return zkDb.getDataTreeLastProcessedZxid();
}
启动相关函数
start
该函数是启动的入口
public synchronized void start() {
loadDataBase();//从事务日志目录dataLogDir和数据快照目录dataDir中恢复出DataTree数据
cnxnFactory.start();//开启对客户端的连接端口,启动ServerCnxnFactory主线程
startLeaderElection();//创建出选举算法
super.start();//启动QuorumPeer线程,在该线程中进行服务器状态的检查
}
loadDataBase
这个函数目前没有看懂,主要是不知道currentEpoch,acceptedEpoch,updatingEpoch的区别是什么
代码先贴上,以后有机会再看
private void loadDataBase() {
File updating = new File(getTxnFactory().getSnapDir(),
UPDATING_EPOCH_FILENAME);
try {
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
if (epochOfZxid > currentEpoch && updating.exists()) {
LOG.info("{} found. The server was terminated after " +
"taking a snapshot but before updating current " +
"epoch. Setting current epoch to {}.",
UPDATING_EPOCH_FILENAME, epochOfZxid);
setCurrentEpoch(epochOfZxid);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
}
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
startLeaderElection
创建选举算法
synchronized public void startLeaderElection() {//初始化自己投票,以及初始化投票算法
try {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//初始化投票,leader为自己
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {//不会进入这个if
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);//根据electionType创建对应选举算法
}
createElectionAlgorithm
//根据electionAlgorithm编号获取对应的Election算法,当前版本只会是FastLeaderElection
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
run
完成jmx初始化,然后不断处于恢复模式(选leader)和广播模式(处理数据),直到shutdown
public void run() {//先初始化jmx,然后不断处于恢复模式(选leader)和广播模式(处理数据),直到shutdown
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {//先完成jmx的初始化
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());//设置投票
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());//设置投票
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));//设置自己为observer
observer.observeLeader();//观察leader
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);//结束时清空observer
setPeerState(ServerState.LOOKING);//还原为looking
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));//设置自己为follower
follower.followLeader();//follow leader
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);//清空follower
setPeerState(ServerState.LOOKING);//还原为looking
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));//设置自己为leader
leader.lead();
setLeader(null);//结束时清空leader
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);//还原为looking
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
选举状态相关
就是选举结束时,各server成为Follower,Leader,Observer之一
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {//生成follower
return new Follower(this, new FollowerZooKeeperServer(logFactory,
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {//生成leader
return new Leader(this, new LeaderZooKeeperServer(logFactory,
this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {//生成observer
return new Observer(this, new ObserverZooKeeperServer(logFactory,
this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}
synchronized protected void setLeader(Leader newLeader){
leader=newLeader;
}
synchronized protected void setFollower(Follower newFollower){
follower=newFollower;
}
synchronized protected void setObserver(Observer newObserver){
observer=newObserver;
}
public String getServerState() {//获取当前状态
switch (getPeerState()) {
case LOOKING:
return QuorumStats.Provider.LOOKING_STATE;
case LEADING:
return QuorumStats.Provider.LEADING_STATE;
case FOLLOWING:
return QuorumStats.Provider.FOLLOWING_STATE;
case OBSERVING:
return QuorumStats.Provider.OBSERVING_STATE;
}
return QuorumStats.Provider.UNKNOWN_STATE;
}
epoch持久化相关
主要是acceptedEpoch,currentEpoch两个值的持久化,分为读写
private long readLongFromFile(String name) throws IOException {//从snapdir中读取对应name的文件
File file = new File(logFactory.getSnapDir(), name);
BufferedReader br = new BufferedReader(new FileReader(file));
String line = "";
try {
line = br.readLine();
return Long.parseLong(line);
} catch(NumberFormatException e) {
throw new IOException("Found " + line + " in " + file);
} finally {
br.close();
}
}
private void writeLongToFile(String name, long value) throws IOException {//向snapdir中对应name的文件写入long值
File file = new File(logFactory.getSnapDir(), name);
AtomicFileOutputStream out = new AtomicFileOutputStream(file);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
boolean aborted = false;
try {
bw.write(Long.toString(value));
bw.flush();
out.flush();
} catch (IOException e) {
LOG.error("Failed to write new file " + file, e);
// worst case here the tmp file/resources(fd) are not cleaned up
// and the caller will be notified (IOException)
aborted = true;
out.abort();
throw e;
} finally {
if (!aborted) {
// if the close operation (rename) fails we'll get notified.
// worst case the tmp file may still exist
out.close();
}
}
}
思考
bcVote和currentVote区别
bcVote应该是老版本的用法,新版本用的是currentVote,这里没有深究,应该是处理兼容性的问题
在server中启动步骤中所处的位置
QuorumPeer在server中启动的位置在server从非looking状态转化成另外一个状态时,原有状态的清空
比如从following变化到leading时
ans:对应状态结束时finally都有对应处理
如
状态变化时,已有状态的清空
QuorumPeer#run中state变化都是哪里完成的
上一节讲的FastLeaderElection#lookForLeader调用了QuorumPeer#setPeerState
QuorumPeer#run体现zk集群的两个状态
崩溃恢复以及原子广播
acceptedEpoch 和 currentEpoch是干吗用的
http://blog.csdn.net/damacheng/article/details/42393793
acceptedEpoch 是即将成为的epoch,但是可能有比较之后会变化
currentEpoch 是当前的epoch,是稳定的,每次acceptedEpoch比较完了最终确定了之后会赋值给currentEpoch
相当于源码30中思考提出的 getVote()和self.getCurrentVote()的区别是什么
tick是什么,和tickTime区别,有什么意义
tickTime代表周期长短,比如3s是一个周期
tick代表当前是第几个周期,按时递增的,初始为0
在后面讲Leader和LearnerHandler源码时会提到,用于Leader和Learner的交互的超时检测
问题
QuorumPeer#OBSERVER_ID的意义是什么
只看到QuorumCnxManager#receiveConnection接收连接时判断,没看到发送请求时会写入这个值???
吐槽
三个值follower,leader,observer三个属性同时存在?
这三个表示不同的状态,一次只能存在一个,不知道为什么不用enum写状态机
QuorumPeer#quorumStats属性存在的必要性
没有用到啊
网友评论