美文网首页
Leaf分布式Id生成系统segment方式的源码全解析

Leaf分布式Id生成系统segment方式的源码全解析

作者: 瑞瑞余之 | 来源:发表于2024-05-11 22:05 被阅读0次

    写在所有之前

    在日常业务中可能遇到生成业务类全局ID的情况,这类ID的关键点在于全局不重复,对于单例来说,这个不难实现,但是对于分布式场景下,如何保证每个独立部署的服务都能生成互补重复的唯一键呢,它的核心思路就是找到一个唯一的“权威”来做这个事情,也就是说独立于分布式的应用来找一位发布者。但是这又引出另一个问题,如何保证这个唯一“权威”不成为系统瓶颈呢?
    实际上目前不少支持分布式部署的中间件都有自己的CAP逻辑,美团推出的开源项目Leaf也就是利用了数据库和ZK以及他们的分布式方案最终实现了分布式Id生成系统。
    具体的实现逻辑可见美团技术团队的官方文档

    • 框架介绍
    • 源码地址
      本文的目的是从源码的角度介绍Leaf是如何工作的。如前文所说,在Leaf的设计中有两种实现方式,一是基于数据库的segment算法,二是基于ZK的雪花算法。我们先整体看一下Leaf源码的结构:
      Leaf项目框架.png
      Leaf的源码很简单,分为两部分,leaf-core作为具体的Id生成实现自成一体,另外的leaf-server是官方以web服务的形式对外提供查询api和监控系统。
      监控web.png
      实际上,我们在阅读源码时只用关注到leaf-core,leaf-server仅仅是展示层,在真实项目中可用,也可自行替换。以下从segment(基于database)和snowflake(基于雪花算法)两个方面分别进行源码解读。

    segment

    原理解释

    实际上,segemnt利用数据库作为一个Id生成规则的存放地址,从表结构可以看到:

    CREATE TABLE `leaf_alloc` (
      `biz_tag` varchar(128) NOT NULL DEFAULT '',
      `max_id` bigint NOT NULL DEFAULT '1',
      `step` int NOT NULL,
      `description` varchar(256) DEFAULT NULL,
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`biz_tag`)
    )
    

    biz_tag:业务标签,它意味着该表中每一条数据都是一个业务的id生成规则,比如在一个电商系统中,除了订单id需要生成外,可能还有商品条形码id,如此就会出现两条表记录。
    max_id:当前规则会分配的最大id值。注意这里是bigint类型,因此意味着segment方式生成的id就是普通数字。
    step:步长,不难理解,当前规则的号段就是从max_id - step ~ max_id。
    我们注意到这一系列字段中leaf_alloc表中并不会存储当前的id,那客户端在申请一个新id时,它是怎么知道下一个id是多少呢?实际上,当前id是记录在该服务运行时内存中。
    这就涉及到segment形式中的核心概念——segment,它的定义如下:

    public class Segment {
        private AtomicLong value = new AtomicLong(0);
        private volatile long max;
        private volatile int step;
        private SegmentBuffer buffer;
    }
    

    没错第一个成员变量就是当前id值。可以想见,当leaf服务启动时,程序会读取db中的规则配置,将其读入内存并以Segment形式存储(见其他三个fields),并初始化当前id=0.
    这就是segment的大致逻辑,总结起来,它利用了db存储规则,然后在每个leaf-core启动时加载规则,并初始化当前id。读到这里有个小问题:“分布式系统中,自然leaf服务也会分布式部署,当有多个leaf服务时,如何保证生成的id不重复呢”。这里先说结论:
    每个leaf-core在启动时都会去读取db中的规则(如果没有则需提前手动insert)。每次读取前都会根据步长(step)取修改max_id:

    max_id = max_id + step
    

    理解了吧,也就是说每个leaf服务都有一个独立的号段(max_id-step, max_id),因此它们在分配id时并不会重复。
    关于其原理言尽于此,我们直接看看源码:
    segemnt核心类如下,
    一个实现类SegmentIDGenImpl实现接口IDGen,结束!是不是很简单


    segment核心类.png

    在IDGen interface中两方法get,init

    public interface IDGen {
        Result get(String key);
        boolean init();
    }
    

    自然从init实现方法开始,里面涉及到两个具体方法updateCacheFromDb和updateCacheFromDbAtEveryMinute。从字面理解,将规则从db中加载到segment对象中,并且从开启每分钟同步一次的定时任务。关注点可以放在它的加载过程。

        private void updateCacheFromDb() {
            logger.info("update cache from db");
            StopWatch sw = new Slf4JStopWatch();
            try {
                // 1.获取所有biz_tag
                List<String> dbTags = dao.getAllTags();
                if (dbTags == null || dbTags.isEmpty()) {
                    return;
                }
                // 2.获取当前缓存中的biz_tag
                List<String> cacheTags = new ArrayList<String>(cache.keySet());
                Set<String> insertTagsSet = new HashSet<>(dbTags);
                Set<String> removeTagsSet = new HashSet<>(cacheTags);
                //db中新加的tags灌进cache
                for(int i = 0; i < cacheTags.size(); i++){
                    String tmp = cacheTags.get(i);
                    if(insertTagsSet.contains(tmp)){
                        insertTagsSet.remove(tmp);
                    }
                }
                // 3.至此获得目前需要新增到cache中的tags,存放到insertTagsSet中
                for (String tag : insertTagsSet) {
                    SegmentBuffer buffer = new SegmentBuffer();
                    buffer.setKey(tag);
                    Segment segment = buffer.getCurrent();
                    segment.setValue(new AtomicLong(0));
                    segment.setMax(0);
                    segment.setStep(0);
                    cache.put(tag, buffer);
                    logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
                }
                //4. cache中已失效的tags从cache删除
                for(int i = 0; i < dbTags.size(); i++){
                    String tmp = dbTags.get(i);
                    if(removeTagsSet.contains(tmp)){
                        removeTagsSet.remove(tmp);
                    }
                }
                for (String tag : removeTagsSet) {
                    cache.remove(tag);
                    logger.info("Remove tag {} from IdCache", tag);
                }
            } catch (Exception e) {
                logger.warn("update cache from db exception", e);
            } finally {
                sw.stop("updateCacheFromDb");
            }
        }
    

    代码很简单,值得一提的是,第三步中segment对象被放到了SegmentBuffer中,实际上塞到本地缓存中的是SegmentBuffer,它是个什么玩意儿?这涉及到美团对leaf的一层优化。当value==max_id后,会更新db中的规则,并reload规则到segmnt中,这里涉及到数据库的I/O,在高qps情况下这个过程会频繁发生。


    单缓存.png

    因此leaf在当前value达到某个阈值后,会提前开启下一个max_id的生成,并把该规则读入新segment中,一旦segment1中id达到max_id,则直接内存级别切换segment即可。


    双缓存.png
    因此引入了SegmentBuffer来管理这两个segemnts:
    public class SegmentBuffer {
        private String key;
        private Segment[] segments; //双buffer
        private volatile int currentPos; //当前的使用的segment的index
        private volatile boolean nextReady; //下一个segment是否处于可切换状态
        private volatile boolean initOk; //是否初始化完成
        private final AtomicBoolean threadRunning; //线程是否在运行中
        private final ReadWriteLock lock;
    
        private volatile int step;
        private volatile int minStep;
        private volatile long updateTimestamp;
    }
    

    到这里我们看到了leaf-segemnt形式中缓存的最上层是业务tag-SegmentBuffer,SegmentBuffer中对应了两个Segment,而当前的value就存在segment当中。


    存储结构.png

    因此我们看到上面提到的updateCacheFromDb方法,实际上是在更新第一层关系,也就是扫描db中所有的bizTag,跟缓存中做比对,新增的bizTag在缓存中创建map,删除缓存中不存在的bizTag,保留与db一致的bizTag。需要注意的是,这时候对于新增的bizTag仅仅是在cache中创建空的SegmentBuffer。与updateCacheFromDb对应,updateSegmentFromDb就在更新第二层。

        public void updateSegmentFromDb(String key, Segment segment) {
            StopWatch sw = new Slf4JStopWatch();
            SegmentBuffer buffer = segment.getBuffer();
            LeafAlloc leafAlloc;
            if (!buffer.isInitOk()) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else if (buffer.getUpdateTimestamp() == 0) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else {
                long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
                int nextStep = buffer.getStep();
                //表示15min内更新过,代表更新频繁,这样的化,通过增加步长,
                // 缓存更多的id,来减少对db的操作
                if (duration < SEGMENT_DURATION) {
                    if (nextStep * 2 > MAX_STEP) {
                        //do nothing
                    } else {
                        nextStep = nextStep * 2;
                    }
                } else if (duration < SEGMENT_DURATION * 2) {
                    //do nothing with nextStep
                } else {
                    //相当于duration >= 30min,表示该key的访问并不频繁,因此可以降低步长。
                    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
                }
                logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
                LeafAlloc temp = new LeafAlloc();
                temp.setKey(key);
                temp.setStep(nextStep);
                leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(nextStep);
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
            }
            // must set value before set max
            long value = leafAlloc.getMaxId() - buffer.getStep();
            segment.getValue().set(value);
            segment.setMax(leafAlloc.getMaxId());
            segment.setStep(buffer.getStep());
            sw.stop("updateSegmentFromDb", key + " " + segment);
        }
    

    这样一来,当客户端请求获取一个id时,会传入当前服务所属bizTag,如果cache中存在则直接找到segment并获取其中的value;如果cache中不存在bizTag则直接返回找不到的异常。这里有个疑问:缓存中找不到bizTag的时候,为什么不去db中主动查询一遍呢?我猜想,开发者的设计理念就是通过缓存去应对高并发场景降低db的I/O,直接去避免缓存穿透的问题。
    写到这里,笔者意识到一个问题,当部署了多个leaf服务,此时流量可能随机的访问这些leaf,可能存在一个问题,那就是leaf生成的id的确是全局唯一,但不见得是全局按时间递增

    client依次请求leaf id.png
    以上是leaf服务初始化或从db中load规则,准备数据的源码内容。接下来是最关键的客户端获取id的过程:
        public Result get(final String key) {
            if (!initOK) {
                return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
            }
            //如果key(bizTag)存在,表示对应的规则已加载到缓存
            if (cache.containsKey(key)) {
                SegmentBuffer buffer = cache.get(key);
                //如果SegmentBuffer未初始化(仅实例化)则先初始化
                if (!buffer.isInitOk()) {
                    synchronized (buffer) {
                        // 双重验证:第一次验证是快速判断SegmentBuffer是否需要初始化,如果需要则进入同步模块,
                        // 在进入同步模块后,为了避免其他线程在当前线程等待锁的过程中进行了初始化,因此会二次初始化
                        if (!buffer.isInitOk()) {
                            try {
                                updateSegmentFromDb(key, buffer.getCurrent());
                                logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                                buffer.setInitOk(true);
                            } catch (Exception e) {
                                logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                            }
                        }
                    }
                }
                //id都是从本地缓存中获取,即便当前缓存中不存在数据,也是先由db同步到cache中,再获取
                return getIdFromSegmentBuffer(cache.get(key));
            }
            return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
        }
    

    最终获取id的过程在getIdFromSegmentBuffer中,代码执行过程参见下图:


    get id的过程.png

    可以看到,在getIdFromSegmentBuffer过程中主要发生了两件事:

    • 当前id+1,得到应该返回的id,但是这个id在返回前需要判断是否超过了规则定义的max_id,如果超过了(严格来说等于也不行)。则将当前的buffer指向另一个备用的segment,他在之前已经预加载过。
    • 第二件事,实际上就是有个异步的判断,在刚进入getIdFromSegmentBuffer就会判断,是否当前的value值超过了10% * max_id,如果超过了,就预加载另一个segment。
      根据上图,和这两点看这段逻辑就会很清晰了。leaf segemnt形式介绍完成,总体来说,它是一种利用数据库持久化id生成规则,而非具体的id数据。而把具体的id生成任务丢给内存来进行,这样做的好处,在于即便在高并发的场景下,内存可以最快的反应请求,并且每次生成的规则都代表了一批数据,因此真实的db
      I/O也不会很高。

    相关文章

      网友评论

          本文标题:Leaf分布式Id生成系统segment方式的源码全解析

          本文链接:https://www.haomeiwen.com/subject/hrbdfjtx.html