上一节使用zookeeper实现分布式锁,这节使用ZKClient实现master选举
ZKClient使用比zookeeper要方便些,API更加简单。
master选举实现原理
- 每一台服务器都去竞争创建/master节点
- 如果创建成功,那么master就是当前创建的服务器
- 创建失败,则监听/master节点的删除事件
- 一旦/master删除,所有客户端再次去竞争创建/master节点。谁创建成功,谁就是主节点
master选择类
package com.app.master;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
/**
* @Author: chao.zhu
* @Email: chao.zhu@rograndec.com
* @CreateDate: 2018/08/06
* @Version: 1.0
*/
public class MasterSelector {
private ZkClient zkClient;
//当前服务器信息
private ServiceNodeInfo serviceNodeInfo;
//所有进程争抢的节点
private String MASTER_PATH = "/master";
//超时时间
private Integer SESSION_OUT=5000;
//master节点
private ServiceNodeInfo masterNode;
//服务器是否启动
private volatile boolean running = false;
//master监听事件
IZkDataListener dataListener;
//任务调度类。模拟心跳动作,每隔一小段时间当前serviceNodeInfo去和zookeeper交互一次,看当前master是否挂掉
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public MasterSelector(ZkClient zkClient,ServiceNodeInfo serviceNodeInfo){
this.zkClient = zkClient;
this.serviceNodeInfo = serviceNodeInfo;
dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
tryGetMaster();
}
};
}
public void star(){
if(running){
System.out.println("服务已经启动");
return;
}
running = true;
//监听MASTER_PATH节点,如果节点发送变化,执行dataListener
zkClient.subscribeDataChanges(MASTER_PATH,dataListener);
tryGetMaster();
}
public void stop(){
if(!running){
System.out.println("服务器停止");
return;
}
running = false;
scheduledExecutorService.shutdown();
//关闭当前zkclien的监听事件
zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
//如果当前服务器是主服务器,那么需要释放掉MASTER_PATH
releaseMaster();
}
public void releaseMaster(){
if(checkMaster()){
System.out.println(DateToString(System.currentTimeMillis())+"释放master"+serviceNodeInfo.toString());
zkClient.delete(MASTER_PATH);
}
}
public void tryGetMaster(){
//如果服务器未启动,则直接返回。表示需要先执行star方法
if(!running){
return;
}
//基本原理就是当前服务器1去创建MASTER_PATH,如果创建成功,那么服务器1就是master节点。创建失败,捕捉ZkNodeExistsException异常
try{
zkClient.createEphemeral(MASTER_PATH,serviceNodeInfo);
//没有抛出异常,则当前服务器1就是master服务器
masterNode = serviceNodeInfo;
System.out.println(DateToString(System.currentTimeMillis())+":master节点是:"+masterNode.toString());
//定时5秒钟释放节点。为了模拟服务器故障
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
//检查一下当前服务器1是否还是主服务器,如果是,那么删除/master节点。即:释放主服务
boolean flag = checkMaster();
if(flag){
zkClient.delete(MASTER_PATH);
}
}
},5,TimeUnit.SECONDS);
}
catch (ZkNodeExistsException e){
//如果节点已经存在,获得master节点
ServiceNodeInfo serviceNodeInfo1 = zkClient.readData(MASTER_PATH);
//如果在读取的时候masterNode为空,则重新去抢master节点
if(serviceNodeInfo1 == null){
tryGetMaster();
}else{
masterNode = serviceNodeInfo1;
}
}
}
public boolean checkMaster(){
try{
ServiceNodeInfo m = zkClient.readData(MASTER_PATH);
if(m.getServiceId().intValue() == serviceNodeInfo.getServiceId().intValue()){
return true;
}
}catch (ZkNoNodeException e) {
return false;
} catch (ZkInterruptedException e) {
return checkMaster();
} catch (ZkException e) {
return false;
}
return false;
}
public static String DateToString(long time){
return DateFormatUtils.format(time,"yyyy-MM-dd hh:mm:ss");
}
}
服务器信息类
package com.app.master;
import java.io.Serializable;
/**
* @Author: chao.zhu
* @discription: 服务器信息
* @CreateDate: 2018/08/06
* @Version: 1.0
*/
public class ServiceNodeInfo implements Serializable {
private Integer serviceId;
private String serviceName;
public ServiceNodeInfo(Integer serviceId, String serviceName) {
this.serviceId = serviceId;
this.serviceName = serviceName;
}
public Integer getServiceId() {
return serviceId;
}
public void setServiceId(Integer serviceId) {
this.serviceId = serviceId;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
@Override
public String toString() {
return "ServiceNodeInfo{" +
"serviceId=" + serviceId +
", serviceName='" + serviceName + '\'' +
'}';
}
}
测试客户端
package com.app.master;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
/**
* @Author: chao.zhu
* @description:
* @CreateDate: 2018/08/06
* @Version: 1.0
*/
public class Service1 {
//zk连接串
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception{
List<ZkClient> zkClientList = new ArrayList<ZkClient>(10);
List<MasterSelector> masterSelectorList = new ArrayList<MasterSelector>(10);
try{
for(int i = 0 ; i < 10 ; i++){
ZkClient zkClient = new ZkClient(ZK_CONNECTION,5000);
zkClientList.add(zkClient);
ServiceNodeInfo serviceNodeInfo = new ServiceNodeInfo(i,"service"+i);
MasterSelector masterSelector = new MasterSelector(zkClient,serviceNodeInfo);
masterSelector.star();
}
System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}finally {
System.out.println("Shutting down...");
for (MasterSelector workServer : masterSelectorList) {
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
for (ZkClient client : zkClientList) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
测试客户端代码说明:
- 启动10个客户端
- 然后每个客户端开始争抢/master节点
- 抢到/master节点后,5秒钟释放master节点。因为着10个客户端都注册了/master删除事件,所以当释放master之后,这10个客户端再次去争抢/master节点
网友评论