美团Leaf 分布式ID生成器源码分析(一)
There are no two identical leaves in the world.
世界上没有两片完全相同的树叶。
— 莱布尼茨
Leaf 最早期需求是各个业务线的订单ID生成需求。在美团早期,有的业务直接通过DB自增的方式生成ID,有的业务通过redis缓存来生成ID,也有的业务直接用UUID这种方式来生成ID。以上的方式各自有各自的问题,因此我们决定实现一套分布式ID生成服务来满足需求。具体Leaf 设计文档见: leaf 美团分布式ID生成服务
官方代码仓库: Leaf
工程目录结构
项目分为两个模块: leaf-server
和 leaf-core
,下面分开进行介绍
leaf-server
leaf-server
主要作用是使用spring-boot
框架对外提供服务接口.
leaf-core
leaf-core
是核心代码,提供两种生成的ID的方式,包括号段模式和snowflake模式.
源码分析
相关分析代码已上传到github: 美团 LEAF
核心代码都在
leaf-core
中.
SegmentIDGenImpl分析
- 查看
IDGenServiceTest
IDGenServiceTest - Config ID Gen
- 执行
init
方法- 执行
init
方法,从数据库中获取所有的tag,并保留在内存中. - 定时从数据库中获取最新数据
- 执行
- 获取Id
@Override public Result get(final String key) { // 必须在 SegmentIDGenImpl 初始化后执行. init()方法 if (!initOK) { return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION); } // 通过缓存获取SegmentBuffer if (cache.containsKey(key)) { // 从缓存中获取对应key的 SegmentBuffer SegmentBuffer buffer = cache.get(key); // SegmentBuffer 没有初始化,则先进行初始化. if (!buffer.isInitOk()) { synchronized (buffer) { // 双重判断,避免重复执行SegmentBuffer的初始化操作. if (!buffer.isInitOk()) { try { updateSegmentFromDb(key, buffer.getCurrent()); logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent()); buffer.setInitOk(true); } catch (Exception e) { logger.warn("Init buffer {} exception", buffer.getCurrent(), e); } } } } return getIdFromSegmentBuffer(cache.get(key)); } return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION); }
-
updateSegmentFromDb
public void updateSegmentFromDb(String key, Segment segment) { StopWatch sw = new Slf4JStopWatch(); SegmentBuffer buffer = segment.getBuffer(); LeafAlloc leafAlloc; if (!buffer.isInitOk()) { // 第一次初始化 leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setStep(leafAlloc.getStep()); //leafAlloc中的step为DB中的step buffer.setMinStep(leafAlloc.getStep()); } else if (buffer.getUpdateTimestamp() == 0) { // 第二次,需要准备next Segment // 第二号段,设置updateTimestamp leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key); buffer.setUpdateTimestamp(System.currentTimeMillis()); //leafAlloc中的step为DB中的step buffer.setMinStep(leafAlloc.getStep()); } else { // 三次以上 动态设置 nextStep long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp(); int nextStep = buffer.getStep(); /** * 动态调整step * 1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP * 2) 15分钟 < duration < 30分钟 : nothing * 3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数 */ // 15分钟 if (duration < SEGMENT_DURATION) { if (nextStep * 2 > MAX_STEP) { //do nothing } else { // 步数 * 2 nextStep = nextStep * 2; } // 15分 < duration < 30 } else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } else { // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数) nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; } logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep); LeafAlloc temp = new LeafAlloc(); temp.setKey(key); temp.setStep(nextStep); // 更新maxId by CustomStep (nextStep) leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp); // 更新 updateTimestamp buffer.setUpdateTimestamp(System.currentTimeMillis()); // 设置 buffer的step buffer.setStep(nextStep); //leafAlloc的step为DB中的step buffer.setMinStep(leafAlloc.getStep()); } // must set value before set max TODO // 暂时还未想通,这里为什么这样写. // 已经向作者提交了issue.(https://github.com/Meituan-Dianping/Leaf/issues/16) long value = leafAlloc.getMaxId() - buffer.getStep(); segment.getValue().set(value); segment.setMax(leafAlloc.getMaxId()); segment.setStep(buffer.getStep()); sw.stop("updateSegmentFromDb", key + " " + segment); }
-
getIdFromSegmentBuffer
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { while (true) { try { // 获取buffer的读锁 buffer.rLock().lock(); // 获取当前的号段 final Segment segment = buffer.getCurrent(); if ( // nextReady is false (下一个号段没有初始化.) !buffer.isNextReady() // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 ) && (segment.getIdle() < 0.9 * segment.getStep()) // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作. // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段. && buffer.getThreadRunning().compareAndSet(false, true) ) { // 放入线程池进行异步更新. service.execute(new Runnable() { @Override public void run() { Segment next = buffer.getSegments()[buffer.nextPos()]; boolean updateOk = false; try { updateSegmentFromDb(buffer.getKey(), next); // 更新成功,设置标记位为true updateOk = true; logger.info("update segment {} from db {}", buffer.getKey(), next); } catch (Exception e) { logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e); } finally { if (updateOk) { // 获取buffer 的写锁 buffer.wLock().lock(); // next准备完成 buffer.setNextReady(true); // next运行标记位设置为false buffer.getThreadRunning().set(false); buffer.wLock().unlock(); } else { buffer.getThreadRunning().set(false); } } } }); } // 获取value long value = segment.getValue().getAndIncrement(); // value < 当前号段的最大值,则返回改值 if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } } finally { buffer.rLock().unlock(); } // 等待下一个号段执行完成,执行代码在-> execute() // buffer.setNextReady(true); // buffer.getThreadRunning().set(false); waitAndSleep(buffer); try { // buffer 级别加写锁. buffer.wLock().lock(); final Segment segment = buffer.getCurrent(); // 获取value -> 为什么重复获取value, 多线程执行时,在进行waitAndSleep() 后, // current segment可能会被修改. 直接进行一次判断,提高速度,并且防止出错(在交换Segment前进行一次检查). long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); } // 执行到这里, 其他的线程没有进行号段的调换,并且当前号段所有号码已经下发完成. // 判断nextReady是否为true. if (buffer.isNextReady()) { // 调换segment buffer.switchPos(); // 调换完成后, 设置nextReady为false buffer.setNextReady(false); } else { // 进入这里的条件 // 1. 当前号段获取到的值大于maxValue // 2. 另外一个号段还没有准备好 // 3. 等待时长大于waitAndSleep中的时间. logger.error("Both two segments in {} are not ready!", buffer); return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { // finally代码块中释放写锁. buffer.wLock().unlock(); } } }
- waitAndSleep
/** * 等待下一个号段执行完成 * buffer.setNextReady(true); * buffer.getThreadRunning().set(false); * @param buffer */ private void waitAndSleep(SegmentBuffer buffer) { int roll = 0; while (buffer.getThreadRunning().get()) { roll += 1; if(roll > 10000) { try { Thread.sleep(10); break; } catch (InterruptedException e) { logger.warn("Thread {} Interrupted",Thread.currentThread().getName()); break; } } } }
相关代码已上传到github: 美团 LEAF
技术重点解析
-
volatile
修饰变量提升可见性 - 使用读写锁
ReadWriteLock
,提升并发读下的读取速度 - 使用
Atomic
变量 ,利用CAS
机制保证原子性, 提高并发能力.if ( // nextReady is false (下一个号段没有初始化.) !buffer.isNextReady() // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 ) && (segment.getIdle() < 0.9 * segment.getStep()) // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作. // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段. && buffer.getThreadRunning().compareAndSet(false, true) ) { ... }
- 动态调整
step
来适应不同的请求速度./** * 动态调整step * 1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP * 2) 15分钟 < duration < 30分钟 : nothing * 3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数 */ // 15分钟 if (duration < SEGMENT_DURATION) { if (nextStep * 2 > MAX_STEP) { //do nothing } else { // 步数 * 2 nextStep = nextStep * 2; } // 15分 < duration < 30 } else if (duration < SEGMENT_DURATION * 2) { //do nothing with nextStep } else { // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数) nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep; }
- 使用事务,保证
update
操作和select
操作的原子性./** * 使用事务保证这两步的原子性(事务的隔离机制) * 根据数据库中对应tag的step来更新max_value,同时获取 tag的信息 * 1. UPDATE leaf_alloc SET max_id = max_id + step WHERE biz_tag = #{tag} * 2. SELECT biz_tag, max_id, step FROM leaf_alloc WHERE biz_tag = #{tag} * @param tag * @return */ LeafAlloc updateMaxIdAndGetLeafAlloc(String tag);
@Override public LeafAlloc updateMaxIdAndGetLeafAlloc(String tag) { SqlSession sqlSession = sqlSessionFactory.openSession(); try { sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxId", tag); LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", tag); sqlSession.commit(); return result; } finally { sqlSession.close(); } }
文章链接: www.blackchen.site/meituan-leaf-1
作者: BlackChen
网友评论