背景
为什么网上一大堆的redis实现分布式锁,咋到这里,还在使用Mysql的唯一索引来实现呢?
最主要的原因是减少对中间件的依赖。。
就像配置写在本地,而不是去读取分布式配置consul、nacos等。
一、接口
package tech.powerjob.server.extension;
/**
* 锁服务,所有方法都不允许抛出任何异常!
*
* @author tjq
* @since 2020/4/2
*/
public interface LockService {
/**
* 上锁(获取锁),立即返回,不会阻塞等待锁
* @param name 锁名称
* @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
* @return true -> 获取到锁,false -> 未获取到锁
*/
boolean tryLock(String name, long maxLockTime);
/**
* 释放锁
* @param name 锁名称
*/
void unlock(String name);
}
二、实现类
package tech.powerjob.server.extension.defaultimpl;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.OmsLockDO;
import tech.powerjob.server.persistence.remote.repository.OmsLockRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
/**
* 基于数据库实现的分布式锁
*
* @author tjq
* @since 2020/4/5
*/
@Slf4j
@Service
public class DatabaseLockService implements LockService {
private final String ownerIp;
private final OmsLockRepository omsLockRepository;
@Autowired
public DatabaseLockService(OmsLockRepository omsLockRepository) {
this.ownerIp = NetUtils.getLocalHost();
this.omsLockRepository = omsLockRepository;
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
int num = omsLockRepository.deleteByOwnerIP(ownerIp);
log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
}));
}
@Override
public boolean tryLock(String name, long maxLockTime) {
OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
try {
omsLockRepository.saveAndFlush(newLock);
return true;
} catch (DataIntegrityViolationException ignore) {
} catch (Exception e) {
log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
}
OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
// 锁超时,强制释放锁并重新尝试获取
if (lockedMillions > omsLockDO.getMaxLockTime()) {
log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
unlock(name);
return tryLock(name, maxLockTime);
}
return false;
}
@Override
public void unlock(String name) {
try {
CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
}catch (Exception e) {
log.error("[DatabaseLockService] unlock {} failed.", name, e);
}
}
}
三、实体OmsLockDO
字段lockName设计为唯一索引。新增成功,则表示获得锁,反之尝试去释放锁。
package tech.powerjob.server.persistence.remote.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
/**
* 数据库锁
*
* @author tjq
* @since 2020/4/2
*/
@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})})
public class OmsLockDO {
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String lockName;
private String ownerIP;
/**
* 最长持有锁的时间
*/
private Long maxLockTime;
private Date gmtCreate;
private Date gmtModified;
public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
this.lockName = lockName;
this.ownerIP = ownerIP;
this.maxLockTime = maxLockTime;
this.gmtCreate = new Date();
this.gmtModified = this.gmtCreate;
}
}
四、增删查方法的Jpa实现
package tech.powerjob.server.persistence.remote.repository;
import tech.powerjob.server.persistence.remote.model.OmsLockDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import javax.transaction.Transactional;
/**
* 利用唯一性约束作为数据库锁
*
* @author tjq
* @since 2020/4/2
*/
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Modifying
@Transactional
@Query(value = "delete from OmsLockDO where lockName = ?1")
int deleteByLockName(String lockName);
OmsLockDO findByLockName(String lockName);
@Modifying
@Transactional
int deleteByOwnerIP(String ip);
}
五、使用示例
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
/**
* 只能一台server清理的操作统一到这里执行
*/
private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了
boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000);
if (!lock) {
log.info("[CleanService] clean job is already running, just return.");
return;
}
try {
// 删除数据库运行记录
cleanInstanceLog();
cleanWorkflowInstanceLog();
// 删除无用节点
cleanWorkflowNodeInfo();
// 删除 GridFS 过期文件
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
} finally {
lockService.unlock(HISTORY_DELETE_LOCK);
}
}
网友评论