美文网首页
leaf SegmentIDGenImpl 源码解析以及优化的建

leaf SegmentIDGenImpl 源码解析以及优化的建

作者: pcgreat | 来源:发表于2020-09-22 21:16 被阅读0次

    leaf SegmentIDGenImpl

    1. leaf SegmentIDGenImpl 对应的是Leaf-segment的实现 ,源码位置
      https://github.com/Meituan-Dianping/Leaf/blob/26e01a28b6d2088a6746c91b4c5d26a703f6bb0e/leaf-core/src/main/java/com/sankuai/inf/leaf/segment/SegmentIDGenImpl.java
    2. Leaf-segment设计详情可以看美团同学的文章
      https://tech.meituan.com/2017/04/21/mt-leaf.html
    在我看来Leaf-segment 相对于 mysql 自增id
    1. 根据业务分表后,比如根据区域id 或create 时间 ,无需担心 id 重复问题
    2. 相对于 mysql , leaf 可以快速横向扩展 ,提升吞吐量
    3. Leaf服务内部有号段缓存,即使DB宕机,短时间内Leaf仍能正常对外提供服务 ,在段id 消耗完之前 是没有问题的。 这个时间是不确定的 ,长时间运行后, 最长不到30分钟 ,最短不到15分钟 ,应该是这个区间 (理论值)。
    4. 其实 Leaf-segment 可以用mysql 也可以tidb ,db 的吞吐量扩展这块不受限于mysql
    5. 可以比较方便迁移 从mysql 自增id 到Leaf-segment

    数据库只有leaf_alloc一张表 ,其实对应的当前业务最新的Segment

    CREATE TABLE `leaf_alloc` (
      `biz_tag` varchar(128)  NOT NULL DEFAULT '', -- 业务id
      `max_id` bigint(20) NOT NULL DEFAULT '1',-- 当前segment max id 
      `step` int(11) NOT NULL,--  默认step 
      `description` varchar(256)  DEFAULT NULL,
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`biz_tag`)
    ) ENGINE=InnoDB;
    

    设计上 max_id 表示的当前已分配段的最大id ,是 last_max_id + step ,step 是可以滑动变化的。但是数据库step 是初始step 永远不变的 。

    SegmentBuffer设计

        private String key; // 业务key
        private Segment[] segments; //双buffer
        private volatile int currentPos; //当前的使用的segment的index
        private volatile boolean nextReady; //下一个segment是否处于可切换状态
        private volatile boolean initOk; //是否初始化完成
        private final AtomicBoolean threadRunning; //线程是否在运行中
        private final ReadWriteLock lock;
    
        private volatile int step;   // 当前step 滑动
        private volatile int minStep; // 对应数据库 字段step 
        private volatile long updateTimestamp;  // max_id 更新时间 ,可以算是第一个segment 初始化完成的时间 或者第二segment 初始化完成时间
    
     public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
            while (true) {
                buffer.rLock().lock();
                try {
                    final Segment segment = buffer.getCurrent();
                    if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                        service.execute(new Runnable() {
                            @Override
                            public void run() {
                                Segment next = buffer.getSegments()[buffer.nextPos()];
                                boolean updateOk = false;
                                try {
                                    updateSegmentFromDb(buffer.getKey(), next);
                                    updateOk = true;
                                    logger.info("update segment {} from db {}", buffer.getKey(), next);
                                } catch (Exception e) {
                                    logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                                } finally {
                                    if (updateOk) {
                                        buffer.wLock().lock();
                                        buffer.setNextReady(true);
                                        buffer.getThreadRunning().set(false);
                                        buffer.wLock().unlock();
                                    } else {
                                        buffer.getThreadRunning().set(false);
                                    }
                                }
                            }
                        });
                    }
                    long value = segment.getValue().getAndIncrement();
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                } finally {
                    buffer.rLock().unlock();
                }
                waitAndSleep(buffer);
                buffer.wLock().lock();
                try {
                    final Segment segment = buffer.getCurrent();
                    long value = segment.getValue().getAndIncrement();
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                    if (buffer.isNextReady()) {
                        buffer.switchPos();
                        buffer.setNextReady(false);
                    } else {
                        logger.error("Both two segments in {} are not ready!", buffer);
                        return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                    }
                } finally {
                    buffer.wLock().unlock();
                }
            }
        }
    

    leaf 在设计上使用了SegmentBuffer 保存对应业务的分段 ,每个buffer 有两个segment 。第一次获取id 时候 会修改数据库并初始化第一个segment ,在第一个segment id已使用10%以上的时候 ,会开启一个thread且仅有一个thread初始化第二个段 。当第一个segment id 分配完毕的时候 且第二segment 初始化完毕的时候 ,会将第二个segment 切换成第一个segment 。上面这段代码其实还是值得大家学习的。

    优化的点
        private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
    
    
    1. 在于 service 是一个可以扩展到 Integer.MAX_VALUE 的线程池 。但假如 db 出现了block , 实际可能最大长度应该是 SegmentBuffer 个数 ,SegmentBuffer 如果有2000个 ,这个线程池会有2000个线程 ,如果有2w呢 那会有20000个线程 ,消耗巨大不说 。设计的初衷,就是一个能够不断扩展的线程池 . 我个人建议 线程扩展上有个上限200 ,大于上限后的任务 ,入队列 。

    Segment 设计

    public class Segment {
        private AtomicLong value = new AtomicLong(0); // 当前id 
        private volatile long max; // segment max id 
        private volatile int step; // segment step  
        private SegmentBuffer buffer; //  关联
    

    Segment初始化过程从功能来讲 ,是满简单地 ,从数据库中 获取 Segment 的 step(有滑动step 设计) , 计算段的 当前id ,max_id 以及更新 SegmentBuffer的updateTimestamp , 并同步数据库的max_id 和 updateTimestamp 。 由于滑动step 设计,代码变得复杂 。

     public void updateSegmentFromDb(String key, Segment segment) {
            StopWatch sw = new Slf4JStopWatch();
            SegmentBuffer buffer = segment.getBuffer();
            LeafAlloc leafAlloc;
            if (!buffer.isInitOk()) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else if (buffer.getUpdateTimestamp() == 0) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else {
                long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
                int nextStep = buffer.getStep();
                if (duration < SEGMENT_DURATION) {
                    if (nextStep * 2 > MAX_STEP) {
                        //do nothing
                    } else {
                        nextStep = nextStep * 2;
                    }
                } else if (duration < SEGMENT_DURATION * 2) {
                    //do nothing with nextStep
                } else {
                    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
                }
                logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
                LeafAlloc temp = new LeafAlloc();
                temp.setKey(key);
                temp.setStep(nextStep);
                leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(nextStep);
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
            }
            // must set value before set max
            long value = leafAlloc.getMaxId() - buffer.getStep();
            segment.getValue().set(value);
            segment.setMax(leafAlloc.getMaxId());
            segment.setStep(buffer.getStep());
            sw.stop("updateSegmentFromDb", key + " " + segment);
        }
    

    滑动step设计思想

    其实还是蛮简单地,和我们之前做的非常类似 。 如果 segment id 消耗完的时间<15分钟 且不超过MAX_STEP , nextStep = nextStep * 2; segment id 消耗完的时间>=30分钟且nextStep/2 > 初始step 则
    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;

    设计目的

    1 滑动step 会有更少的db访问 ,更多的id ,更高的吞吐量

    缺点

    自适应的step的上限为1000000 如果 leaf instance down 了最多会有100w 的空隙 , 可怕

           if (duration < SEGMENT_DURATION) {
                    if (nextStep * 2 > MAX_STEP) {
                        //do nothing
                    } else {
                        nextStep = nextStep * 2;
                    }
          // 如果 30分钟> segment id 消耗完的时间>=15分钟 
                } else if (duration < SEGMENT_DURATION * 2) {
                    //do nothing with nextStep
                 //    segment id 消耗完的时间>=30分钟 
                } else {
                    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
                }
    

    DB 同步数据到 segment 设计

    SegmentIDGenImpl init 方法 ,第一次调用updateCacheFromDb时候 ,会从db读取数据 创建对应的SegmentBuffer 并添加cache 中(同时完成segment的初始化 )。 然后每分钟调用一次updateCacheFromDb ,会同步新添加SegmentBuffer 到 当前instance的cache 中 ,同时删除数据库中没有,而cache 中有的SegmentBuffer 。

        @Override
        public boolean init() {
            logger.info("Init ...");
            // 确保加载到kv后才初始化成功
            updateCacheFromDb();
            initOK = true;
            updateCacheFromDbAtEveryMinute();
            return initOK;
        }
    
    

    这一块设计目的
    1 db 和 instance cache 中 SegmentBuffer 的数据一致性 。
    2 避免大量无用业务SegmentBuffer 占用内存

    设计上的缺点
    1 删除逻辑 是通过db 物理删除 来决定的 。 开发人员误删,会造成线上取不到id ,这个会是个大问题

    源码上看 SegmentIDGenImpl 分段id生成 比雪花id 生成方式复杂的多。但只有理清楚核心流程 ,还是比较容易看得 。

    package com.sankuai.inf.leaf.segment;
    
    import com.sankuai.inf.leaf.IDGen;
    import com.sankuai.inf.leaf.common.Result;
    import com.sankuai.inf.leaf.common.Status;
    import com.sankuai.inf.leaf.segment.dao.IDAllocDao;
    import com.sankuai.inf.leaf.segment.model.*;
    import org.perf4j.StopWatch;
    import org.perf4j.slf4j.Slf4JStopWatch;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class SegmentIDGenImpl implements IDGen {
        private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);
    
        /**
         * IDCache未初始化成功时的异常码
         */
        private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
        /**
         * key不存在时的异常码
         */
        private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
        /**
         * SegmentBuffer中的两个Segment均未从DB中装载时的异常码
         */
        private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
        /**
         * 最大步长不超过100,0000
         */
        private static final int MAX_STEP = 1000000;
        /**
         * 一个Segment维持时间为15分钟
         */
        private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
        private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
        private volatile boolean initOK = false;
        private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();
        private IDAllocDao dao;
    
        public static class UpdateThreadFactory implements ThreadFactory {
    
            private static int threadInitNumber = 0;
    
            private static synchronized int nextThreadNum() {
                return threadInitNumber++;
            }
    
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
            }
        }
    
        @Override
        public boolean init() {
            logger.info("Init ...");
            // 确保加载到kv后才初始化成功
            updateCacheFromDb();
            initOK = true;
            updateCacheFromDbAtEveryMinute();
            return initOK;
        }
    
        private void updateCacheFromDbAtEveryMinute() {
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("check-idCache-thread");
                    t.setDaemon(true);
                    return t;
                }
            });
            service.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    updateCacheFromDb();
                }
            }, 60, 60, TimeUnit.SECONDS);
        }
    
        private void updateCacheFromDb() {
            logger.info("update cache from db");
            StopWatch sw = new Slf4JStopWatch();
            try {
                List<String> dbTags = dao.getAllTags();
                if (dbTags == null || dbTags.isEmpty()) {
                    return;
                }
                List<String> cacheTags = new ArrayList<String>(cache.keySet());
                Set<String> insertTagsSet = new HashSet<>(dbTags);
                Set<String> removeTagsSet = new HashSet<>(cacheTags);
                //db中新加的tags灌进cache
                for(int i = 0; i < cacheTags.size(); i++){
                    String tmp = cacheTags.get(i);
                    if(insertTagsSet.contains(tmp)){
                        insertTagsSet.remove(tmp);
                    }
                }
                for (String tag : insertTagsSet) {
                    SegmentBuffer buffer = new SegmentBuffer();
                    buffer.setKey(tag);
                    Segment segment = buffer.getCurrent();
                    segment.setValue(new AtomicLong(0));
                    segment.setMax(0);
                    segment.setStep(0);
                    cache.put(tag, buffer);
                    logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
                }
                //cache中已失效的tags从cache删除
                for(int i = 0; i < dbTags.size(); i++){
                    String tmp = dbTags.get(i);
                    if(removeTagsSet.contains(tmp)){
                        removeTagsSet.remove(tmp);
                    }
                }
                for (String tag : removeTagsSet) {
                    cache.remove(tag);
                    logger.info("Remove tag {} from IdCache", tag);
                }
            } catch (Exception e) {
                logger.warn("update cache from db exception", e);
            } finally {
                sw.stop("updateCacheFromDb");
            }
        }
    
        @Override
        public Result get(final String key) {
            if (!initOK) {
                return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
            }
            if (cache.containsKey(key)) {
                SegmentBuffer buffer = cache.get(key);
                if (!buffer.isInitOk()) {
                    synchronized (buffer) {
                        if (!buffer.isInitOk()) {
                            try {
                                updateSegmentFromDb(key, buffer.getCurrent());
                                logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                                buffer.setInitOk(true);
                            } catch (Exception e) {
                                logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                            }
                        }
                    }
                }
                return getIdFromSegmentBuffer(cache.get(key));
            }
            return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
        }
    
        public void updateSegmentFromDb(String key, Segment segment) {
            StopWatch sw = new Slf4JStopWatch();
            SegmentBuffer buffer = segment.getBuffer();
            LeafAlloc leafAlloc;
            if (!buffer.isInitOk()) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else if (buffer.getUpdateTimestamp() == 0) {
                leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(leafAlloc.getStep());
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
            } else {
                long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
                int nextStep = buffer.getStep();
                if (duration < SEGMENT_DURATION) {
                    if (nextStep * 2 > MAX_STEP) {
                        //do nothing
                    } else {
                        nextStep = nextStep * 2;
                    }
                } else if (duration < SEGMENT_DURATION * 2) {
                    //do nothing with nextStep
                } else {
                    nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
                }
                logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
                LeafAlloc temp = new LeafAlloc();
                temp.setKey(key);
                temp.setStep(nextStep);
                leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
                buffer.setUpdateTimestamp(System.currentTimeMillis());
                buffer.setStep(nextStep);
                buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
            }
            // must set value before set max
            long value = leafAlloc.getMaxId() - buffer.getStep();
            segment.getValue().set(value);
            segment.setMax(leafAlloc.getMaxId());
            segment.setStep(buffer.getStep());
            sw.stop("updateSegmentFromDb", key + " " + segment);
        }
    
        public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
            while (true) {
                buffer.rLock().lock();
                try {
                    final Segment segment = buffer.getCurrent();
                    if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                        service.execute(new Runnable() {
                            @Override
                            public void run() {
                                Segment next = buffer.getSegments()[buffer.nextPos()];
                                boolean updateOk = false;
                                try {
                                    updateSegmentFromDb(buffer.getKey(), next);
                                    updateOk = true;
                                    logger.info("update segment {} from db {}", buffer.getKey(), next);
                                } catch (Exception e) {
                                    logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                                } finally {
                                    if (updateOk) {
                                        buffer.wLock().lock();
                                        buffer.setNextReady(true);
                                        buffer.getThreadRunning().set(false);
                                        buffer.wLock().unlock();
                                    } else {
                                        buffer.getThreadRunning().set(false);
                                    }
                                }
                            }
                        });
                    }
                    long value = segment.getValue().getAndIncrement();
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                } finally {
                    buffer.rLock().unlock();
                }
                waitAndSleep(buffer);
                buffer.wLock().lock();
                try {
                    final Segment segment = buffer.getCurrent();
                    long value = segment.getValue().getAndIncrement();
                    if (value < segment.getMax()) {
                        return new Result(value, Status.SUCCESS);
                    }
                    if (buffer.isNextReady()) {
                        buffer.switchPos();
                        buffer.setNextReady(false);
                    } else {
                        logger.error("Both two segments in {} are not ready!", buffer);
                        return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                    }
                } finally {
                    buffer.wLock().unlock();
                }
            }
        }
    
        private void waitAndSleep(SegmentBuffer buffer) {
            int roll = 0;
            while (buffer.getThreadRunning().get()) {
                roll += 1;
                if(roll > 10000) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                        break;
                    } catch (InterruptedException e) {
                        logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
                        break;
                    }
                }
            }
        }
    
        public List<LeafAlloc> getAllLeafAllocs() {
            return dao.getAllLeafAllocs();
        }
    
        public Map<String, SegmentBuffer> getCache() {
            return cache;
        }
    
        public IDAllocDao getDao() {
            return dao;
        }
    
        public void setDao(IDAllocDao dao) {
            this.dao = dao;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:leaf SegmentIDGenImpl 源码解析以及优化的建

          本文链接:https://www.haomeiwen.com/subject/virgyktx.html