美文网首页
聊聊PowerJob的LockService

聊聊PowerJob的LockService

作者: go4it | 来源:发表于2024-01-10 09:07 被阅读0次

    本文主要研究一下PowerJob的LockService

    LockService

    tech/powerjob/server/extension/LockService.java

    public interface LockService {
    
        /**
         * 上锁(获取锁),立即返回,不会阻塞等待锁
         * @param name 锁名称
         * @param maxLockTime 最长持有锁的时间,单位毫秒(ms)
         * @return true -> 获取到锁,false -> 未获取到锁
         */
        boolean tryLock(String name, long maxLockTime);
    
        /**
         * 释放锁
         * @param name 锁名称
         */
        void unlock(String name);
    }
    

    LockService接口定义了tryLock、unlock方法

    DatabaseLockService

    tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java

    @Slf4j
    @Service
    public class DatabaseLockService implements LockService {
    
        private final String ownerIp;
    
        private final OmsLockRepository omsLockRepository;
    
        @Autowired
        public DatabaseLockService(OmsLockRepository omsLockRepository) {
    
            this.ownerIp = NetUtils.getLocalHost();
            this.omsLockRepository = omsLockRepository;
    
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                int num = omsLockRepository.deleteByOwnerIP(ownerIp);
                log.info("[DatabaseLockService] execute shutdown hook, release all lock(owner={},num={})", ownerIp, num);
            }));
        }
    
        @Override
        public boolean tryLock(String name, long maxLockTime) {
    
            OmsLockDO newLock = new OmsLockDO(name, ownerIp, maxLockTime);
            try {
                omsLockRepository.saveAndFlush(newLock);
                return true;
            } catch (DataIntegrityViolationException ignore) {
            } catch (Exception e) {
                log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e);
            }
    
            OmsLockDO omsLockDO = omsLockRepository.findByLockName(name);
            long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime();
    
            // 锁超时,强制释放锁并重新尝试获取
            if (lockedMillions > omsLockDO.getMaxLockTime()) {
    
                log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO);
                unlock(name);
                return tryLock(name, maxLockTime);
            }
            return false;
        }
    
        @Override
        public void unlock(String name) {
    
            try {
                CommonUtils.executeWithRetry0(() -> omsLockRepository.deleteByLockName(name));
            }catch (Exception e) {
                log.error("[DatabaseLockService] unlock {} failed.", name, e);
            }
        }
    
    }
    

    DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName

    NetUtils.getLocalHost

    tech/powerjob/common/utils/NetUtils.java

        public static String getLocalHost() {
            if (HOST_ADDRESS != null) {
                return HOST_ADDRESS;
            }
    
            String addressFromJVM = System.getProperty(PowerJobDKey.BIND_LOCAL_ADDRESS);
            if (StringUtils.isNotEmpty(addressFromJVM)) {
                log.info("[Net] use address from[{}]: {}", PowerJobDKey.BIND_LOCAL_ADDRESS, addressFromJVM);
                return HOST_ADDRESS = addressFromJVM;
            }
    
            InetAddress address = getLocalAddress();
            if (address != null) {
                return HOST_ADDRESS = address.getHostAddress();
            }
            return LOCALHOST_VALUE;
        }
    
        public static InetAddress getLocalAddress() {
            if (LOCAL_ADDRESS != null) {
                return LOCAL_ADDRESS;
            }
            InetAddress localAddress = getLocalAddress0();
            LOCAL_ADDRESS = localAddress;
            return localAddress;
        }    
    
        private static InetAddress getLocalAddress0() {
            // @since 2.7.6, choose the {@link NetworkInterface} first
            try {
                InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface());
                if (addressOp != null) {
                    return addressOp;
                }
            } catch (Throwable e) {
                log.warn("[Net] getLocalAddress0 failed.", e);
            }
    
            InetAddress localAddress = null;
            try {
                localAddress = InetAddress.getLocalHost();
                Optional<InetAddress> addressOp = toValidAddress(localAddress);
                if (addressOp.isPresent()) {
                    return addressOp.get();
                }
            } catch (Throwable e) {
                log.warn("[Net] getLocalAddress0 failed.", e);
            }
    
    
            return localAddress;
        }    
    

    NetUtils的getLocalHost先判断HOST_ADDRESS是否有值,有则直接返回,否则先从系统属性读取powerjob.network.local.address,读取不到则取LOCAL_ADDRESS,若LOCAL_ADDRESS为null则通过getLocalAddress0获取

    OmsLockDO

    tech/powerjob/server/persistence/remote/model/OmsLockDO.java

    @Data
    @Entity
    @NoArgsConstructor
    @Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_oms_lock", columnNames = {"lockName"})})
    public class OmsLockDO {
    
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
        @GenericGenerator(name = "native", strategy = "native")
        private Long id;
    
        private String lockName;
    
        private String ownerIP;
        /**
         * 最长持有锁的时间
         */
        private Long maxLockTime;
    
        private Date gmtCreate;
    
        private Date gmtModified;
    
        public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {
            this.lockName = lockName;
            this.ownerIP = ownerIP;
            this.maxLockTime = maxLockTime;
            this.gmtCreate = new Date();
            this.gmtModified = this.gmtCreate;
        }
    }
    

    OmsLockDO定义lockName为唯一索引,它还定义了ownerIP、maxLockTime

    OmsLockRepository

    tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java

    public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
    
        @Modifying
        @Transactional(rollbackOn = Exception.class)
        @Query(value = "delete from OmsLockDO where lockName = ?1")
        int deleteByLockName(String lockName);
    
        OmsLockDO findByLockName(String lockName);
    
        @Modifying
        @Transactional(rollbackOn = Exception.class)
        int deleteByOwnerIP(String ip);
    }
    

    OmsLockRepository继承了JpaRepository,它定义了deleteByLockName、findByLockName、deleteByOwnerIP方法

    小结

    LockService接口定义了tryLock、unlock方法;DatabaseLockService基于数据库实现了LockService,其构造器依赖OmsLockRepository,同时注册了ShutdownHook,在关闭的时候执行omsLockRepository.deleteByOwnerIP(ownerIp);其tryLock方法创建OmsLockDO,然后执行omsLockRepository.saveAndFlush,若成功则返回,若有异常则通过omsLockRepository.findByLockName找到omsLockDO,计算加锁时间,若超过MaxLockTime则执行unlock再重新tryLock;其unlock执行omsLockRepository.deleteByLockName。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的LockService

          本文链接:https://www.haomeiwen.com/subject/iuxzndtx.html