美文网首页
聊聊PowerJob的UseCacheLock

聊聊PowerJob的UseCacheLock

作者: go4it | 来源:发表于2024-01-12 21:24 被阅读0次

    本文主要研究一下PowerJob的UseCacheLock

    UseCacheLock

    tech/powerjob/server/core/lock/UseCacheLock.java

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface UseCacheLock {
    
        String type();
    
        String key();
    
        int concurrencyLevel();
    }
    

    UseCacheLock注解定义了type、key、concurrencyLevel属性

    UseCacheLockAspect

    tech/powerjob/server/core/lock/UseCacheLockAspect.java

    @Slf4j
    @Aspect
    @Component
    @Order(1)
    @RequiredArgsConstructor
    public class UseCacheLockAspect {
    
        private final MonitorService monitorService;
    
        private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
    
        private static final long SLOW_THRESHOLD = 100;
    
        @Around(value = "@annotation(useCacheLock))")
        public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
            Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
                int concurrencyLevel = useCacheLock.concurrencyLevel();
                log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
                return CacheBuilder.newBuilder()
                        .initialCapacity(300000)
                        .maximumSize(500000)
                        .concurrencyLevel(concurrencyLevel)
                        .expireAfterWrite(30, TimeUnit.MINUTES)
                        .build();
            });
            final Method method = AOPUtils.parseMethod(point);
            Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
            final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
            long start = System.currentTimeMillis();
            reentrantLock.lockInterruptibly();
            try {
                long timeCost = System.currentTimeMillis() - start;
                if (timeCost > SLOW_THRESHOLD) {
    
                    final SlowLockEvent slowLockEvent = new SlowLockEvent()
                            .setType(SlowLockEvent.Type.LOCAL)
                            .setLockType(useCacheLock.type())
                            .setLockKey(String.valueOf(key))
                            .setCallerService(method.getDeclaringClass().getSimpleName())
                            .setCallerMethod(method.getName())
                            .setCost(timeCost);
    
                    monitorService.monitor(slowLockEvent);
    
                    log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
                            key,
                            JSON.toJSONString(point.getArgs()));
                }
                return point.proceed();
            } finally {
                reentrantLock.unlock();
            }
        }
    }
    

    UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;Cache采用的guava的Cache,其initialCapacity为300000,maximumSize为500000,expireAfterWrite为30分钟;Cache的key为lock key,value为ReentrantLock;其execute方法主要是先执行reentrantLock.lockInterruptibly(),然后执行point.proceed(),最后reentrantLock.unlock();执行point.proceed()之前还判断了一下加锁耗时,若超过SLOW_THRESHOLD(100ms)则通过monitorService.monitor上报SlowLockEvent

    示例

        @UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)
        public void redispatchAsync(Long instanceId, int originStatus) {
            // 将状态重置为等待派发
            instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
        }
    

    key支持SpEl

    小结

    PowerJob的UseCacheLock注解定义了type、key、concurrencyLevel属性;UseCacheLockAspect拦截@UseCacheLock注解,它定义了lockContainer维护了useCacheLock.type()与Cache的关系;而Cache的key为lock key,value为ReentrantLock,最后是通过reentrantLock.lockInterruptibly()加锁。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的UseCacheLock

          本文链接:https://www.haomeiwen.com/subject/mtffodtx.html