本周主要工作,在Storm集群上测试基于Wordcount程序的CheckPoint机制
- storm的checkPoint机制必须实现BaseStatefulBolt或继承IStatefulBolt
- 状态包括:Prepare、Committing、Committed
- Action包括:RollBack、INITSTATE、COMMIT
CheckPointState中的nextState()
/**
* Get the next state based on this checkpoint state.
*
* @param recovering if in recovering phase
* @return the next checkpoint state based on this state.
*/
public CheckPointState nextState(boolean recovering) {
CheckPointState nextState;
switch (state) {
case PREPARING:
nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
break;
case COMMITTING:
nextState = new CheckPointState(txid, COMMITTED);
break;
case COMMITTED:
nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
break;
default:
throw new IllegalStateException("Unknown state " + state);
}
return nextState;
}
CheckPointState中的nextAction()
/**
* Get the next action to perform based on this checkpoint state.
*
* @param recovering if in recovering phase
* @return the next action to perform based on this state
*/
public Action nextAction(boolean recovering) {
Action action;
switch (state) {
case PREPARING:
action = recovering ? Action.ROLLBACK : Action.PREPARE;
break;
case COMMITTING:
action = Action.COMMIT;
break;
case COMMITTED:
action = recovering ? Action.INITSTATE : Action.PREPARE;
break;
default:
throw new IllegalStateException("Unknown state " + state);
}
return action;
}
在storm的CheckPoint机制中,系统会检查是否收到来自该Comment的所以输入流中所有任务的检查点,通过类BaseStatefulBoltExecutor中的processCheckPoint()
processCheckPoint()如果达到了,则ack,或者直接忽略
网友评论