背景
作为创业公司,基础组件全部自研,并且开发团队集体跑路了,留下的坑没有最大,只有更大,近日使用前人造的轮子,分布式任务调度,折腾了近两天,没跑起来,各种问题,又没文档,最后只能放弃使用,而新部署一套也不现实,然后就选择了自己造轮子
对于目前遇到的场景,有个任务,一天跑一次,一次大概2小时能跑完,想过两种方案实现分布式锁解决当前遇到的问题:
- 使用数据库,建一张任务表,利用行锁来锁定任务,锁定成功后,将标识位设置成已占用,占用者设置成自身机器ip,锁定成功的机器通过定时任务每5秒更新一次心跳时间,任务执行完成后将标识设置成未占用,并停止心跳,其它机器通过定时任务做检查,如果15s没有更新心跳时间则认为当前任务的执行机器挂了,并立即争用
- 使用zookeeper,为任务创建一个临时节点,如果创建成功,则表示当前机器获得了锁,定时任务在执行前先判断当前机器是否能获取到锁,获取不到则不执行
理论上两种方式都是可行的,并且各有问题,由于不想建表,所以选择了使用zk。
问题分析
初始化及正常流程
对于前面的问题,我在实现的过程中尽量保证实现的更通用一些,正常情况下,在应用创建锁时,向zk创建一个临时节点,定时任务在执行前先判断当前机器是否能获取到锁,获取不到则不执行,正常流程如下:
image如果不出现其它异常问题,这样简单的处理将非常有效,但是出现网络异常,宕机等情况时,则没这么容易了,如果机器A挂了,机器B不获取锁,则很有可能任务不跑了
异常情况
异常情况主要表现为两种:
- 机器A关闭或宕机,这个时间,机器B会收到dataDelete的回调,此时机器B应该要重新创建path,作为后续任务执行的机器
- 机器A出现了短暂的连接中断或者机器A的会话过期,此时机器B如果重新创建path,那么机器A就失去了执行任务的权限,而此时任务可能正在执行
对于异常情况,我们保持一个原则:尽量让机器A成功创建path,获得任务的执行权,那么我们可以在机器B的handleDataDeleted回调里延迟10s后再获取锁,如果10s后机器A还没恢复,则认为机器A短时间内恢复了不了。机器A出现连接/会话过期问题时,执行流程类似于下图:
image而获取任务与连接断开或者会话过期之间可能存在并发的情况,这个时候有两种策略:
- 调用方重试,如果10s内获取不到锁,则不执行,等待其它机器获取锁后执行
- 阻塞,直到连通zk
不管使用哪种方式都会存在一个问题,机器B上的定时任务先启动,但是获取锁失败,不执行任务,而后机器A在判断自己能否执行任务前,机器A与zk失联(可能是挂了或者会话过期),那么在机器A与zk恢复连接前,机器B可能已经等待超过10s,成功创建了锁对应的path,这个时候,任务将得不到执行,需要使用回调的方式做补尝,在回调方法里判断是否需要启动任务。
锁的状态
基于上述分析,将锁分为以下4个状态:
- CREATED:表示刚创建,是初始状态,这个状态下,直接参与竞争,去zk上创建节点
- OWN:表示zk的节点被自己创建成功了,当有调用过来时,如果是此状态,则再调用一次zkClient.create(path),如果成功,则表示可执行任务,避免链接断了而状态没变
- RELEASED:表示自己没有争用成功,返回false
- WAITING:表示已经收到了handleDataDeleted回调,在10s等待时间中,此时如果获取锁,则同样等待10s后再尝试重新获取锁的判断
增加两个回调方法:
- handleDataDeleted中在延迟10s后争用成功时调用,需要在此回调方法中判断是否需要启动任务,可以通过记录最近一次获取锁的时间,如果离当前时间很近(比如任务执行耗时等),则启动任务(任务没执行完,机器就出了问题,导致当前机器获得了锁,需要继续将任务跑完);或者check一下任务的输出,判断是否需要继续执行任务
- 在重新建立会话后,立即争用,如果失败,则触发回调接口,使用方可在此接口的实现中判断是否需要取消任务
实现方式
基于zk的分布式锁的实现,用于支持任务的执行,整理后的代码如下:
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 使用zookeeper实现分布式锁,可用于执行分布式定时任务等互斥类代码,调用{@link #tryLock()},并在任务结束后,一定要调用{@link #unlock()}释放锁
*
* @author gaohang
*/
public final class ZookeeperLock {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperLock.class);
private static final ScheduledExecutorService delayExecutor = Executors.newScheduledThreadPool(1);
private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1);
/**
* 所有锁都在此节点下
*/
private static final String LOCK_BASE_PATH = "/onlineetl/distributeLock/instance";
private static final Charset UTF8 = Charset.forName("UTF-8");
private final int delayTime = 10;
private final String lockName;
private final String lockPath;
private final ZkClient zkClient;
/**
* 当前机器的ip
*/
private final String localhostAddr;
/**
* 占用锁的机器的ip
*/
private String activeHostAddr;
/**
* 当前持有锁的状态
*/
private volatile State state = State.CREATED;
/**
* 本地锁,防止本地多线程导致本地非串行
*/
private final Lock localLock = new ReentrantLock();
/**
* 创建分布多锁
*
* @param lockName 锁的名字,唯一
* @param zkClient zk客户端
* @param callback 当有dataDeleted时,重新获取争用后调用
* @throws UnknownHostException 获取localhost失败
*/
ZookeeperLock(String lockName, ZkClient zkClient, LockAddrChangedCallback callback) throws UnknownHostException {
this.zkClient = zkClient;
if (StringUtils.isBlank(lockName)) {
throw new IllegalArgumentException("lockName cannot be blank string");
}
this.lockName = lockName;
this.lockPath = LOCK_BASE_PATH + '/' + lockName;
final InetAddress localhost = InetAddress.getLocalHost();
this.localhostAddr = localhost.getHostAddress();
initLock();
subscriptDataChange(lockName, zkClient, callback);
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
logger.info("zookeeper state changed, state: {}", state);
}
@Override
public void handleNewSession() throws Exception {
if (!initLock()) {
callbackExecutor.execute(callback::onPathLose);
}
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
private void subscriptDataChange(String lockName, ZkClient zkClient, LockAddrChangedCallback callback) {
zkClient.subscribeDataChanges(lockPath, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) {
final String newOwner = new String((byte[]) data, UTF8);
if (!isMyself(newOwner)) {
//不是本机
state = State.RELEASED;
logger.info("lock [{}] released, owner is {}", lockName, newOwner);
} else {
//本机持有了锁
state = State.OWN;
logger.info("lock [{}] locked", lockName);
}
activeHostAddr = newOwner;
}
@Override
public void handleDataDeleted(String dataPath) {
//数据被删除有两种情况,
// 1.持有锁的机器挂了
// 2.网络异常或者zk出现了异常
//因此在收到数据删除回调时,未获取锁的机器要优先让上一次获取锁的机器重新获取锁
if (state == State.OWN && activeHostAddr != null && isMyself(activeHostAddr)) {
//设置成released,在获取锁成功时防止有新的上锁操作
state = State.WAITING;
if (localLock.tryLock()) {
//说明锁没有在使用中,释放即可,将锁的权限交给其它机器
localLock.unlock();
activeHostAddr = null;
//5s后再获取锁
delayExecutor.schedule(() -> {
initLock();
//对方挂了,自己争用成功
callbackExecutor.execute(callback::onPathLocked);
}, 5, TimeUnit.SECONDS);
} else {
//锁被占用,立即获取锁
initLock();
}
} else {
state = State.WAITING;
activeHostAddr = null;
//对方可能网络异常了,5s后再获取锁,如果对方5s没恢复,则认为对方短时间内无法恢复了
delayExecutor.schedule(() -> {
initLock();
callbackExecutor.execute(callback::onPathLocked);
}, delayTime, TimeUnit.SECONDS);
}
}
});
}
private boolean isMyself(String activeHostAddr) {
return StringUtils.equals(activeHostAddr, localhostAddr);
}
private boolean initLock() {
try {
zkClient.createEphemeral(lockPath, localhostAddr.getBytes(UTF8));
activeHostAddr = localhostAddr;
state = State.OWN;
logger.info("lock [{}] locked", lockName);
return true;
} catch (ZkNodeExistsException e) {
byte[] host = zkClient.readData(lockPath, true);
if (host == null) {
// 如果不存在节点,立即尝试一次
return initLock();
} else {
activeHostAddr = new String(host, UTF8);
state = State.RELEASED;
logger.info("lock [{}] released, owner is {}", lockName, activeHostAddr);
if (isMyself(activeHostAddr)) {
state = State.OWN;
return true;
}
return false;
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(LOCK_BASE_PATH, true); // 尝试创建父节点
return initLock();
} catch (Throwable e) {
logger.error("lock [{}] failed", lockName, e);
//不让招出异常
return false;
}
}
public boolean tryLock() {
//避免状态被修改,这里先上锁,在handleDataDeleted中,如果trylock成功,则表示没有任务在执行
if (!localLock.tryLock()) {
return false;
}
switch (state) {
case OWN:
if (initLock()) {
return true;
}
state = State.RELEASED;
return false;
case RELEASED:
localLock.unlock();
return false;
case CREATED:
initLock();
return relock();
case WAITING:
try {
TimeUnit.SECONDS.sleep(delayTime);
} catch (InterruptedException e) {
localLock.unlock();
throw new RuntimeException(e);
}
//再次尝试获取锁
final boolean locked = relock();
if (locked) {
return true;
}
initLock();
return relock();
}
//理论上不会走到这里
initLock();
//这里一定要Unlock,否则tryLock会重入,而unlock没有多次释放徜
return relock();
}
private boolean relock() {
localLock.unlock();
return tryLock();
}
public void unlock() {
localLock.unlock();
}
public String getLockName() {
return lockName;
}
public String getActiveHostAddr() {
return activeHostAddr;
}
@Override
public String toString() {
return "ZookeeperLock{" +
"lockName='" + lockName + '\'' +
", lockPath='" + lockPath + '\'' +
", localhostAddr='" + localhostAddr + '\'' +
", activeHostAddr='" + activeHostAddr + '\'' +
", state=" + state +
'}';
}
/**
* 锁的状态,当状态是created时,则竞争,OWN状态表示持有锁,RELEASED状态表示未持有锁, waiting表示锁待争用
*/
private enum State {
OWN, RELEASED, CREATED, WAITING
}
}
网友评论