美文网首页JAVA进阶Java
xxl_job的分布式锁的实现

xxl_job的分布式锁的实现

作者: 尘埃里的玄 | 来源:发表于2021-12-15 16:33 被阅读0次

    提到分布式锁的实现,一瞬间想到的就是三种实现方案:数据库的innodb的行锁、redis的setnx和zookeep的zab协议。

    一、 基于数据库实现分布式锁

    1. 悲观锁

    利用select … where … for update 排他锁

    注意: 其他附加功能与实现一基本一致,这里需要注意的是“where name=lock ”,name字段必须要走索引,否则会锁表。有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题。

    1. 乐观锁

    所谓乐观锁与前边最大区别在于基于CAS思想,是不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才能觉察到。我们的抢购、秒杀就是用了这种实现以防止超卖。
    通过增加递增的版本号字段实现乐观锁

    二、 基于缓存(Redis等)实现分布式锁

    1. 使用命令介绍:
      (1)SETNX
      SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
      (2)expire
      expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
      (3)delete
      delete key:删除key

    在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

    1. 实现思想:
      (1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
      (2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
      (3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。

    代码:

    
       * 分布式锁的简单实现代码    */
    
      public class DistributedLock {
    
       
    
          private final JedisPool jedisPool;
    
       
    
          public DistributedLock(JedisPool jedisPool) {
    
             this.jedisPool = jedisPool;
    
         }
    
      
    
         /**
    
          * 加锁
    
          * @param lockName       锁的key
    
          * @param acquireTimeout 获取超时时间
    
          * @param timeout        锁的超时时间
    
          * @return 锁标识
    
          */
    
         public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
    
             Jedis conn = null;
    
             String retIdentifier = null;
    
             try {
    
                 // 获取连接
    
                 conn = jedisPool.getResource();
    
                 // 随机生成一个value
    
                 String identifier = UUID.randomUUID().toString();
    
                 // 锁名,即key值
    
                 String lockKey = "lock:" + lockName;
    
                 // 超时时间,上锁后超过此时间则自动释放锁
    
                 int lockExpire = (int) (timeout / );
    
      
    
                 // 获取锁的超时时间,超过这个时间则放弃获取锁
    
                 long end = System.currentTimeMillis() + acquireTimeout;
    
                 while (System.currentTimeMillis() < end) {
    
                     if (conn.setnx(lockKey, identifier) == ) {
    
                         conn.expire(lockKey, lockExpire);
    
                         // 返回value值,用于释放锁时间确认
    
                         retIdentifier = identifier;
    
                         return retIdentifier;
    
                     }
    
                     // 返回-代表key没有设置超时时间,为key设置一个超时时间
    
                     if (conn.ttl(lockKey) == -) {
    
                         conn.expire(lockKey, lockExpire);
    
                     }
    
      
    
                     try {
    
                         Thread.sleep();
    
                     } catch (InterruptedException e) {
    
                         Thread.currentThread().interrupt();
    
                     }
    
                 }
    
             } catch (JedisException e) {
    
                 e.printStackTrace();
    
             } finally {
    
                 if (conn != null) {
    
                     conn.close();
    
                 }
    
             }
    
             return retIdentifier;
    
         }
    
      
    
         /**
    
          * 释放锁
    
          * @param lockName   锁的key
    
          * @param identifier 释放锁的标识
    
          * @return
    
          */
    
         public boolean releaseLock(String lockName, String identifier) {
    
             Jedis conn = null;
    
             String lockKey = "lock:" + lockName;
    
             boolean retFlag = false;
    
             try {
    
                 conn = jedisPool.getResource();
    
                 while (true) {
    
                     // 监视lock,准备开始事务
    
                     conn.watch(lockKey);
    
                    // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
    
                     if (identifier.equals(conn.get(lockKey))) {
    
                         Transaction transaction = conn.multi();
    
                         transaction.del(lockKey);
    
                        List<Object> results = transaction.exec();
                        if(results == null){
                            continue;
                        }
                        retFlag = true;
                }
                    conn.unwatch();
                    break;
                    }
                }catch(JedisException e){
                    e.printStackTrace();
                }finally{
                if(conn!- null){
                    conn.close();
                }
            }
            return retFlag;
            }
        }
                
    

    模拟线程进行秒杀服务

    public class ThreadA extends Thread {
    
        private Service service;
    
     
    
        public ThreadA(Service service) {
    
            this.service = service;
    
        }
    
     
    
        @Override
    
        public void run() {
    
            service.seckill();
    
        }
    
    }
    
     
    
    public class Test {
    
        public static void main(String[] args) {
    
            Service service = new Service();
    
            for (int i = 0; i < 50; i++) {
    
                ThreadA threadA = new ThreadA(service);
    
                threadA.start();
    
            }
    
        }
    
    }
    

    结果如下:为有序的


    image.png

    若注释掉使用锁的部分

    public void seckill() {
    
        // 返回锁的value值,供释放锁时候进行判断
    
        //String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
    
        System.out.println(Thread.currentThread().getName() + "获得了锁");
    
        System.out.println(--n);
    
        //lock.releaseLock("resource", indentifier);
    
    }
    

    从结果可以看出,有一些是异步进行的:


    image.png

    三、三, 基于Zookeeper实现分布式锁

    ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:

    (1)创建一个目录mylock;
    (2)线程A想获取锁就在mylock目录下创建临时顺序节点;
    (3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
    (4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
    (5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。

    这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。

    实现源码如下:

    import lombok.extern.slf4j.Slf4j;
    
    import org.apache.commons.lang.StringUtils;
    
    import org.apache.curator.framework.CuratorFramework;
    
    import org.apache.curator.framework.CuratorFrameworkFactory;
    
    import org.apache.curator.retry.RetryNTimes;
    
    import org.apache.zookeeper.CreateMode;
    
    import org.apache.zookeeper.data.Stat;
    
    import org.springframework.beans.factory.annotation.Value;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.stereotype.Component;
    
     
    
    /**
    
     * 分布式锁Zookeeper实现
    
     *
    
     */
    
    @Slf4j
    
    @Component
    
    public class ZkLock implements DistributionLock {
    
    private String zkAddress = "zk_adress";
    
        private static final String root = "package root";
    
        private CuratorFramework zkClient;
    
     
    
        private final String LOCK_PREFIX = "/lock_";
    
     
    
        @Bean
    
        public DistributionLock initZkLock() {
    
            if (StringUtils.isBlank(root)) {
    
                throw new RuntimeException("zookeeper 'root' can't be null");
    
            }
    
            zkClient = CuratorFrameworkFactory
    
                    .builder()
    
                    .connectString(zkAddress)
    
                    .retryPolicy(new RetryNTimes(2000, 20000))
    
                    .namespace(root)
    
                    .build();
    
            zkClient.start();
    
            return this;
    
        }
    
     
    
        public boolean tryLock(String lockName) {
    
            lockName = LOCK_PREFIX+lockName;
    
            boolean locked = true;
    
            try {
    
                Stat stat = zkClient.checkExists().forPath(lockName);
    
                if (stat == null) {
    
                    log.info("tryLock:{}", lockName);
    
                    stat = zkClient.checkExists().forPath(lockName);
    
                    if (stat == null) {
    
                        zkClient
    
                                .create()
    
                                .creatingParentsIfNeeded()
    
                                .withMode(CreateMode.EPHEMERAL)
    
                                .forPath(lockName, "1".getBytes());
    
                    } else {
    
                        log.warn("double-check stat.version:{}", stat.getAversion());
    
                        locked = false;
    
                    }
    
                } else {
    
                    log.warn("check stat.version:{}", stat.getAversion());
    
                    locked = false;
    
                }
    
            } catch (Exception e) {
    
                locked = false;
    
            }
    
            return locked;
    
        }
    
     
    
        public boolean tryLock(String key, long timeout) {
    
            return false;
    
        }
    
     
    
        public void release(String lockName) {
    
            lockName = LOCK_PREFIX+lockName;
    
            try {
    
                zkClient
    
                        .delete()
    
                        .guaranteed()
    
                        .deletingChildrenIfNeeded()
    
                        .forPath(lockName);
    
                log.info("release:{}", lockName);
    
            } catch (Exception e) {
    
                log.error("删除", e);
    
            }
    
        }
    
     
    
        public void setZkAddress(String zkAddress) {
    
            this.zkAddress = zkAddress;
    
        }
    
    }
    

    优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。

    缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式

    四、对比

    数据库分布式锁实现
    缺点:

    1.db操作性能较差,并且有锁表的风险
    2.非阻塞操作失败后,需要轮询,占用cpu资源;
    3.长时间不commit或者长时间轮询,可能会占用较多连接资源

    Redis(缓存)分布式锁实现
    缺点:

    1.锁删除失败 过期时间不好控制
    2.非阻塞,操作失败后,需要轮询,占用cpu资源;

    ZK分布式锁实现
    缺点:性能不如redis实现,主要原因是写操作(获取锁释放锁)都需要在Leader上执行,然后同步到follower。

    总之:ZooKeeper有较好的性能和可靠性。

    从理解的难易程度角度(从低到高)数据库 > 缓存 > Zookeeper

    从实现的复杂性角度(从低到高)Zookeeper >= 缓存 > 数据库

    从性能角度(从高到低)缓存 > Zookeeper >= 数据库

    从可靠性角度(从高到低)Zookeeper > 缓存 > 数据库

    五、xxl_job使用数据库层面实现分布式锁

    源代码`:

    package com.xxl.job.admin.core.thread;
    
    import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
    import com.xxl.job.admin.core.cron.CronExpression;
    import com.xxl.job.admin.core.model.XxlJobInfo;
    import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
    import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
    import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author xuxueli 2019-05-21
     */
    public class JobScheduleHelper {
        private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
    
        private static JobScheduleHelper instance = new JobScheduleHelper();
        public static JobScheduleHelper getInstance(){
            return instance;
        }
    
        public static final long PRE_READ_MS = 5000;    // pre read
    
        private Thread scheduleThread;
        private Thread ringThread;
        private volatile boolean scheduleThreadToStop = false;
        private volatile boolean ringThreadToStop = false;
        private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
    
        public void start(){
    
            // schedule thread
            scheduleThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
    
                    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
    
                    while (!scheduleThreadToStop) {
    
                        // Scan Job
                        long start = System.currentTimeMillis();
    
                        Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    
                        boolean preReadSuc = true;
                        try {
    
                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                            connAutoCommit = conn.getAutoCommit();
                            conn.setAutoCommit(false);
    
                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                            preparedStatement.execute();
    
                            // tx start
    
                            // 1、pre read
                            long nowTime = System.currentTimeMillis();
                            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                            if (scheduleList!=null && scheduleList.size()>0) {
                                // 2、push time-ring
                                for (XxlJobInfo jobInfo: scheduleList) {
    
                                    // time-ring jump
                                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    
                                        // 1、misfire match
                                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                            // FIRE_ONCE_NOW 》 trigger
                                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                        }
    
                                        // 2、fresh next
                                        refreshNextValidTime(jobInfo, new Date());
    
                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    
                                        // 1、trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    
                                        // 2、fresh next
                                        refreshNextValidTime(jobInfo, new Date());
    
                                        // next-trigger-time in 5s, pre-read again
                                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    
                                            // 1、make ring second
                                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                            // 2、push time ring
                                            pushTimeRing(ringSecond, jobInfo.getId());
    
                                            // 3、fresh next
                                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                        }
    
                                    } else {
                                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
    
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
    
                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                    }
    
                                }
    
                                // 3、update trigger info
                                for (XxlJobInfo jobInfo: scheduleList) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                                }
    
                            } else {
                                preReadSuc = false;
                            }
    
                            // tx stop
    
    
                        } catch (Exception e) {
                            if (!scheduleThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                            }
                        } finally {
    
                            // commit
                            if (conn != null) {
                                try {
                                    conn.commit();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.setAutoCommit(connAutoCommit);
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
    
                            // close PreparedStatement
                            if (null != preparedStatement) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
                        }
                        long cost = System.currentTimeMillis()-start;
    
    
                        // Wait seconds, align second
                        if (cost < 1000) {  // scan-overtime, not wait
                            try {
                                // pre-read period: success > scan each second; fail > skip this period;
                                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                            } catch (InterruptedException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
                }
            });
            scheduleThread.setDaemon(true);
            scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
            scheduleThread.start();
    
    
            // ring thread
            ringThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    while (!ringThreadToStop) {
    
                        // align second
                        try {
                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                        } catch (InterruptedException e) {
                            if (!ringThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
    
                        try {
                            // second data
                            List<Integer> ringItemData = new ArrayList<>();
                            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                            for (int i = 0; i < 2; i++) {
                                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                                if (tmpData != null) {
                                    ringItemData.addAll(tmpData);
                                }
                            }
    
                            // ring trigger
                            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                            if (ringItemData.size() > 0) {
                                // do trigger
                                for (int jobId: ringItemData) {
                                    // do trigger
                                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                                }
                                // clear
                                ringItemData.clear();
                            }
                        } catch (Exception e) {
                            if (!ringThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
                }
            });
            ringThread.setDaemon(true);
            ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
            ringThread.start();
        }
    
        private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
            Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
            if (nextValidTime != null) {
                jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                jobInfo.setTriggerNextTime(nextValidTime.getTime());
            } else {
                jobInfo.setTriggerStatus(0);
                jobInfo.setTriggerLastTime(0);
                jobInfo.setTriggerNextTime(0);
                logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",
                        jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
            }
        }
    
        private void pushTimeRing(int ringSecond, int jobId){
            // push async ring
            List<Integer> ringItemData = ringData.get(ringSecond);
            if (ringItemData == null) {
                ringItemData = new ArrayList<Integer>();
                ringData.put(ringSecond, ringItemData);
            }
            ringItemData.add(jobId);
    
            logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
        }
    
        public void toStop(){
    
            // 1、stop schedule
            scheduleThreadToStop = true;
            try {
                TimeUnit.SECONDS.sleep(1);  // wait
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            if (scheduleThread.getState() != Thread.State.TERMINATED){
                // interrupt and wait
                scheduleThread.interrupt();
                try {
                    scheduleThread.join();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            // if has ring data
            boolean hasRingData = false;
            if (!ringData.isEmpty()) {
                for (int second : ringData.keySet()) {
                    List<Integer> tmpData = ringData.get(second);
                    if (tmpData!=null && tmpData.size()>0) {
                        hasRingData = true;
                        break;
                    }
                }
            }
            if (hasRingData) {
                try {
                    TimeUnit.SECONDS.sleep(8);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            // stop ring (wait job-in-memory stop)
            ringThreadToStop = true;
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            if (ringThread.getState() != Thread.State.TERMINATED){
                // interrupt and wait
                ringThread.interrupt();
                try {
                    ringThread.join();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
        }
    
    
        // ---------------------- tools ----------------------
        public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
            ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
            if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
                Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
                return nextValidTime;
            } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
                return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
            }
            return null;
        }
    
    }
    
    
    image.png

    相关文章

      网友评论

        本文标题:xxl_job的分布式锁的实现

        本文链接:https://www.haomeiwen.com/subject/kudpfrtx.html