美文网首页
分布式锁之复合锁

分布式锁之复合锁

作者: geweixinerr | 来源:发表于2020-05-24 19:52 被阅读0次

业务场景: 分布式架构下A服务需要使用分布式锁来解决资源共享问题,同时A服务需要调用B服务生成全局唯一业务主键,B服务同时也基于分布式锁完成全局的业务主键生成。需求如下:1. A服务未完成的情况下,必须锁定B服务的调用,将其纳入全局分布式锁控制之下。2.对于分布式锁的技术架构设计,必须同时支持单锁与多锁的全局锁定且能够完成单锁与多锁的自动切换。

业务场景已阐述完毕,我就贴代码了。希望对阅览到本文的圈友有所帮助。

package micro.commons.concurrent;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import micro.commons.annotation.ThreadSafe;
import micro.commons.exception.ConcurrentException;
import net.logstash.logback.encoder.org.apache.commons.lang3.ObjectUtils;

/**
 * 并发处理,基于Redis setNx控制. REMARKS: 针对复合锁,子锁累积超时时间建议<=根锁
 * 
 * @author gewx
 **/
@Component
@ThreadSafe
@SuppressWarnings("static-access")
public final class ConcurrentLock {

    /**
     * 分布式锁VALUE
     **/
    private static final String VALUE = "TRUE";

    /**
     * 分布式锁默认超时时间,单位:秒
     **/
    private static final int DEFAULT_TIME_OUT = 15;

    /**
     * 分布式锁Key
     **/
    private static final ThreadLocal<String> KEY = new ThreadLocal<>();

    /**
     * 分布式复合锁Key
     **/
    private static final ThreadLocal<HashSet<String>> MULTIWAY = new ThreadLocal<>()
            .withInitial(() -> new HashSet<>(8));

    /**
     * 分布式复合锁计数器
     **/
    private static final ThreadLocal<Integer> COUNTER = new ThreadLocal<>().withInitial(() -> 0);

    /**
     * 分布式锁超时数值,单位:秒
     **/
    private static final ThreadLocal<Integer> TIME_OUT = new ThreadLocal<>();

    /**
     * 分布式锁超时提示消息
     **/
    private static final ThreadLocal<String> TIPS = new ThreadLocal<>();

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public ConcurrentLock key(String key) {
        if (StringUtils.isNotBlank(KEY.get())) {
            MULTIWAY.get().add(KEY.get());
            MULTIWAY.get().add(key);
        }

        KEY.set(key);
        TIME_OUT.set(DEFAULT_TIME_OUT);
        return this;
    }

    /**
     * 设置超时时间
     * 
     * @author gewx
     * @param timeOut 超时时间,单位:秒
     * @return ConcurrentLock对象
     **/
    public ConcurrentLock timeOut(Integer timeOut) {
        TIME_OUT.set(timeOut);
        return this;
    }

    /**
     * 设置并发异常提示信息
     * 
     * @author gewx
     * @param tips 提示信息
     * @return ConcurrentLock对象
     **/
    public ConcurrentLock tips(String tips) {
        TIPS.set(tips);
        return this;
    }

    /**
     * 并发执行过程
     * 
     * @author gewx
     * @param execute 并发执行过程对象
     * @return T 返回结果对象
     **/
    public <T> T execute(OneByOne<T> execute) {
        Exception exception = null;
        try {
            try {
                before();
                T t = execute.invoke();
                return t;
            } catch (Exception ex) {
                exception = ex;
                throw ex;
            }
        } finally {
            if (!(exception instanceof ConcurrentException)) {
                after();
            }
        }
    }

    /**
     * 并发执行前置
     **/
    private void before() {
        String key = KEY.get();
        if (StringUtils.isBlank(key)) {
            throw new ConcurrentException("concurrent Key is Empty~");
        }

        boolean setNx = redisTemplate.opsForValue().setIfAbsent(key, VALUE, TIME_OUT.get(), TimeUnit.SECONDS);
        if (!setNx) {
            throw new ConcurrentException(ObjectUtils.defaultIfNull(TIPS.get(), "concurrent Mode Fail~"));
        }
    }

    /**
     * 并发执行后置
     **/
    private void after() {
        if (MULTIWAY.get().isEmpty()) {
            try {
                redisTemplate.delete(KEY.get());
            } finally {
                KEY.remove();
                TIME_OUT.remove();
                TIPS.remove();
            }
        } else {
            COUNTER.set(COUNTER.get() + 1);
            if (COUNTER.get().intValue() == MULTIWAY.get().size()) {
                try {
                    MULTIWAY.get().stream().forEach(val -> {
                        redisTemplate.delete(val);
                    });
                } finally {
                    MULTIWAY.remove();
                    COUNTER.remove();
                    KEY.remove();
                    TIME_OUT.remove();
                    TIPS.remove();
                }
            }
        }
    }
}

相关文章

  • Zookeeper实现分布式锁(一)While版

    前面文章讲解了用Redis实现分布式锁的方式: 分布式锁之Redis实现(acquire)分布式锁之Redis实现...

  • 分布式锁之复合锁

    业务场景: 分布式架构下A服务需要使用分布式锁来解决资源共享问题,同时A服务需要调用B服务生成全局唯一业务主键,B...

  • 分布式锁

    为什么要用分布式锁 数据库乐观锁redis分布式锁zookeeper分布式锁 使用分布式锁的场景 实现分布式锁的方...

  • 什么是分布式锁?几种分布式锁分别是怎么实现的?

    一、什么是分布式锁: 1、什么是分布式锁: 分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资...

  • 锁(2)-- 分布式锁

    前言: 锁分3种:java锁、分布式锁、DB锁 分布式锁的几种实现方式 目前几乎很多大型网站及应用都是分布式部署...

  • 分布式锁

    为什么要用分布式锁? 分布式锁是悲观锁的实现; 如果采用乐观锁的方案就用不着分布式锁了。 能用乐观锁的地方尽量用乐...

  • 大佬浅谈分布式锁

    redis 实现 redis 分布锁一、redis 实现分布式锁(可重入锁)redission 实现分布式锁1、对...

  • 画分布式锁之"通文馆圣主"Curator的&

    上文,我们已经基于图文分析了zookeeper实现分布式锁的基本原理,【画分布式锁之Zookeeper实现机...

  • 4:Redis 分布式锁 (文末有项目连接)

    1:什么是缓存分布式锁 2:分布式锁的关键代码 3:业务代码使用分布式缓存锁 4:业务代码使用分布式缓存锁 5:测...

  • 用 Redis 实现分布式锁

    一、什么是分布式锁? 要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。 线程锁:主要用来给方法、代码...

网友评论

      本文标题:分布式锁之复合锁

      本文链接:https://www.haomeiwen.com/subject/pmnjahtx.html