注册中心连接状态监听器io.elasticjob.lite.internal.listener.RegistryCenterConnectionStateListener
实现了org.apache.curator.framework.state.ConnectionStateListener
接口。
ConnectionStateListener只有一个方法stateChanged
,它会在客户端与zk服务器连接状态改变时调用。
/**
* Called when there is a state change in the connection
*
* @param client the client
* @param newState the new state
*/
public void stateChanged(CuratorFramework client, ConnectionState newState);
elastic job通过该监听器,可以管理实例与zk连接状态不同时的行为。包括:SUSPENDED(暂停挂起),LOST(确认丢失)和RECONNECTED(重新连接)。
SUSPENDED
There has been a loss of connection. Leaders, locks, etc. should suspend until the connection is re-established. If the connection times-out you willreceive a LOST notice.LOST
The connection is confirmed to be lost. Close any locks, leaders, etc. and attempt to re-create them. NOTE: it is possible to get a RECONNECTED state after this but you should still consider any locks, etc. as dirty/unstable.RECONNECTED
A suspended, lost, or read-only connection has been re-established.
暂停挂起和确认丢失
当elastic job实例接收到与zk处于暂停挂起和确认丢失状态时,当前实例会暂停调度任务。
if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
jobScheduleController.pauseJob();
}
重新连接
当实例接收与zk服务端重新连接时,实例除了继续发起任务调度外,还会重新初始化在zk的状态。
- 持久化作业服务器上线信息;
创建zk节点
jobName/servers/实例IP
.
- 持久化作业运行实例上线相关信息;
创建zk临时节点
jobName/instances/ip@-@pid
.
- 清除分配分片项的运行状态。
删除zk节点
jobName/sharding/本地分片项/running
(如果存在).
if (ConnectionState.RECONNECTED == newState) {
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
instanceService.persistOnline();
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
jobScheduleController.resumeJob();
}
网友评论