美文网首页
开源定时任务power-job使用数据库实现分布式锁

开源定时任务power-job使用数据库实现分布式锁

作者: 天草二十六_简村人 | 来源:发表于2022-12-16 00:41 被阅读0次

背景

为什么网上一大堆的redis实现分布式锁,咋到这里,还在使用Mysql的唯一索引来实现呢?
最主要的原因是减少对中间件的依赖。。

就像配置写在本地,而不是去读取分布式配置consul、nacos等。

一、接口

package tech.powerjob.server.extension;

/**
 * 锁服务,所有方法都不允许抛出任何异常!
 *
 * @author tjq
 * @since 2020/4/2
 */
public interface LockService {

    /**
     * 上锁(获取锁),立即返回,不会阻塞等待锁
     * @param name 锁名称
     * @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
     * @return true -> 获取到锁,false -> 未获取到锁
     */
    boolean tryLock(String name, long maxLockTime);

    /**
     * 释放锁
     * @param name 锁名称
     */
    void unlock(String name);
}

二、实现类

package tech.powerjob.server.extension.defaultimpl;

import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.OmsLockDO;
import tech.powerjob.server.persistence.remote.repository.OmsLockRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;

/**
 * 基于数据库实现的分布式锁
 *
 * @author tjq
 * @since 2020/4/5
 */
@Slf4j
@Service
public class DatabaseLockService implements LockService {

    private final String ownerIp;
    private final OmsLockRepository omsLockRepository;

    @Autowired
    public DatabaseLockService(OmsLockRepository omsLockRepository) {

        this.ownerIp = NetUtils.getLocalHost();
        this.omsLockRepository = omsLockRepository;

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            int num = omsLockRepository.deleteByOwnerIP(ownerIp);
            log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
        }));
    }

    @Override
    public boolean tryLock(String name, long maxLockTime) {

        OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
        try {
            omsLockRepository.saveAndFlush(newLock);
            return true;
        } catch (DataIntegrityViolationException ignore) {
        } catch (Exception e) {
            log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
        }

        OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
        long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();

        // 锁超时,强制释放锁并重新尝试获取
        if (lockedMillions > omsLockDO.getMaxLockTime()) {

            log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
            unlock(name);
            return tryLock(name, maxLockTime);
        }
        return false;
    }

    @Override
    public void unlock(String name) {

        try {
            CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
        }catch (Exception e) {
            log.error("[DatabaseLockService] unlock {} failed.", name, e);
        }
    }

}

三、实体OmsLockDO

字段lockName设计为唯一索引。新增成功,则表示获得锁,反之尝试去释放锁。

package tech.powerjob.server.persistence.remote.model;

import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;

import javax.persistence.*;
import java.util.Date;

/**
 * 数据库锁
 *
 * @author tjq
 * @since 2020/4/2
 */
@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})})
public class OmsLockDO {

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
    @GenericGenerator(name = "native", strategy = "native")
    private Long id;

    private String lockName;

    private String ownerIP;
    /**
     * 最长持有锁的时间
     */
    private Long maxLockTime;

    private Date gmtCreate;

    private Date gmtModified;

    public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
        this.lockName = lockName;
        this.ownerIP = ownerIP;
        this.maxLockTime = maxLockTime;
        this.gmtCreate = new Date();
        this.gmtModified = this.gmtCreate;
    }
}

四、增删查方法的Jpa实现

package tech.powerjob.server.persistence.remote.repository;

import tech.powerjob.server.persistence.remote.model.OmsLockDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;

import javax.transaction.Transactional;

/**
 * 利用唯一性约束作为数据库锁
 *
 * @author tjq
 * @since 2020/4/2
 */
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {

    @Modifying
    @Transactional
    @Query(value = "delete from OmsLockDO where lockName = ?1")
    int deleteByLockName(String lockName);

    OmsLockDO findByLockName(String lockName);

    @Modifying
    @Transactional
    int deleteByOwnerIP(String ip);
}

五、使用示例

    private static final String HISTORY_DELETE_LOCK = "history_delete_lock";

    /**
     * 只能一台server清理的操作统一到这里执行
     */
    private void cleanByOneServer() {
        // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了
        boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000);
        if (!lock) {
            log.info("[CleanService] clean job is already running, just return.");
            return;
        }
        try {
            // 删除数据库运行记录
            cleanInstanceLog();
            cleanWorkflowInstanceLog();
            // 删除无用节点
            cleanWorkflowNodeInfo();
            // 删除 GridFS 过期文件
            cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
            cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
        } finally {
            lockService.unlock(HISTORY_DELETE_LOCK);
        }
    }

相关文章

  • 开源定时任务power-job使用数据库实现分布式锁

    背景 为什么网上一大堆的redis实现分布式锁,咋到这里,还在使用Mysql的唯一索引来实现呢?最主要的原因是减少...

  • 分布式锁

    为什么要用分布式锁 数据库乐观锁redis分布式锁zookeeper分布式锁 使用分布式锁的场景 实现分布式锁的方...

  • 开源定时任务power-job的告警实现

    一、报警接口Alarmable 二、告警实现类 多种实现,这里实现了钉钉消息、邮件和webhook三种方式。你可以...

  • 分布式任务调度-xxl-job

    前言 为什么要使用分布式任务调度框架? 使用原因和解决问题是为了解决: 实现定时调度任务 将定时任务分布式部署 提...

  • 基于Spring Schedule和redis锁互斥定时任务

    定时任务DemoSchedule redis分布式锁

  • 分布式锁实现

    基于数据库实现分布式锁基于缓存(redis,memcached)实现分布式锁基于Zookeeper实现分布式锁 s...

  • 分布式锁的实现方案

    一,分布式锁的实现方案 1,基于数据库实现分布式锁 2,基于缓存实现数据库锁(redis) 3,基于zookeep...

  • Redis实现分布式锁

    前言 一般有三种方式来实现分布式锁: 使用数据库实现 使用 redis 实现 使用 zookeeper 实现 本文...

  • 基于redis的分布式锁

    分布式锁实现方案 基于数据库实现分布式锁 基于缓存(redis,memcached,tair)实现分布式锁 基于Z...

  • 分布式 | 分布式锁的实现

    分布式锁的实现 在常见的分布式锁中有以下三种实现: Redis 实现 Zookeeper 实现 数据库实现 分布式...

网友评论

      本文标题:开源定时任务power-job使用数据库实现分布式锁

      本文链接:https://www.haomeiwen.com/subject/qzrdqdtx.html