美文网首页
美团Leaf 源码阅读(一)

美团Leaf 源码阅读(一)

作者: BlackChen | 来源:发表于2019-03-18 16:42 被阅读0次

    美团Leaf 分布式ID生成器源码分析(一)

    There are no two identical leaves in the world.
    世界上没有两片完全相同的树叶。
    — 莱布尼茨

    Leaf 最早期需求是各个业务线的订单ID生成需求。在美团早期,有的业务直接通过DB自增的方式生成ID,有的业务通过redis缓存来生成ID,也有的业务直接用UUID这种方式来生成ID。以上的方式各自有各自的问题,因此我们决定实现一套分布式ID生成服务来满足需求。具体Leaf 设计文档见: leaf 美团分布式ID生成服务

    官方代码仓库: Leaf

    工程目录结构

    项目分为两个模块: leaf-serverleaf-core,下面分开进行介绍

    leaf-server

    leaf-server 主要作用是使用spring-boot框架对外提供服务接口.

    leaf-server结构

    leaf-core

    leaf-core 是核心代码,提供两种生成的ID的方式,包括号段模式和snowflake模式.

    leaf-core

    源码分析

    相关分析代码已上传到github: 美团 LEAF

    核心代码都在leaf-core中.

    SegmentIDGenImpl分析

    1. 查看IDGenServiceTest
      IDGenServiceTest
    2. Config ID Gen
    3. 执行init方法
      • 执行init方法,从数据库中获取所有的tag,并保留在内存中.
      • 定时从数据库中获取最新数据
    4. 获取Id
          @Override
      public Result get(final String key) {
          // 必须在 SegmentIDGenImpl 初始化后执行. init()方法
          if (!initOK) {
              return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
          }
          // 通过缓存获取SegmentBuffer
          if (cache.containsKey(key)) {
      
              // 从缓存中获取对应key的 SegmentBuffer
              SegmentBuffer buffer = cache.get(key);
      
              // SegmentBuffer 没有初始化,则先进行初始化.
              if (!buffer.isInitOk()) {
                  synchronized (buffer) {
                      // 双重判断,避免重复执行SegmentBuffer的初始化操作.
                      if (!buffer.isInitOk()) {
                          try {
                              updateSegmentFromDb(key, buffer.getCurrent());
                              logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                              buffer.setInitOk(true);
                          } catch (Exception e) {
                              logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                          }
                      }
                  }
              }
              return getIdFromSegmentBuffer(cache.get(key));
          }
          return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
      }
      
    5. updateSegmentFromDb
       public void updateSegmentFromDb(String key, Segment segment) {
          StopWatch sw = new Slf4JStopWatch();
          SegmentBuffer buffer = segment.getBuffer();
      
          LeafAlloc leafAlloc;
      
          if (!buffer.isInitOk()) {
              // 第一次初始化
              leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
              buffer.setStep(leafAlloc.getStep());
      
              //leafAlloc中的step为DB中的step
              buffer.setMinStep(leafAlloc.getStep());
          } else if (buffer.getUpdateTimestamp() == 0) {
              // 第二次,需要准备next Segment
              // 第二号段,设置updateTimestamp
              leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
              buffer.setUpdateTimestamp(System.currentTimeMillis());
      
              //leafAlloc中的step为DB中的step
              buffer.setMinStep(leafAlloc.getStep());
          } else {
              // 三次以上 动态设置 nextStep
              long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
              int nextStep = buffer.getStep();
      
              /**
               *  动态调整step
               *  1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP
               *  2) 15分钟 < duration < 30分钟 : nothing
               *  3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数
               */
              // 15分钟
              if (duration < SEGMENT_DURATION) {
                  if (nextStep * 2 > MAX_STEP) {
                      //do nothing
                  } else {
                      // 步数 * 2
                      nextStep = nextStep * 2;
                  }
                  // 15分 < duration < 30
              } else if (duration < SEGMENT_DURATION * 2) {
                  //do nothing with nextStep
              } else {
                  // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数)
                  nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
              }
      
              logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
              LeafAlloc temp = new LeafAlloc();
      
              temp.setKey(key);
              temp.setStep(nextStep);
              // 更新maxId by CustomStep (nextStep)
              leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
      
              // 更新 updateTimestamp
              buffer.setUpdateTimestamp(System.currentTimeMillis());
              // 设置 buffer的step
              buffer.setStep(nextStep);
              //leafAlloc的step为DB中的step
              buffer.setMinStep(leafAlloc.getStep());
          }
      
          // must set value before set max TODO
          // 暂时还未想通,这里为什么这样写.
          // 已经向作者提交了issue.(https://github.com/Meituan-Dianping/Leaf/issues/16)
          long value = leafAlloc.getMaxId() - buffer.getStep();
          segment.getValue().set(value);
      
          segment.setMax(leafAlloc.getMaxId());
          segment.setStep(buffer.getStep());
          sw.stop("updateSegmentFromDb", key + " " + segment);
      }
      
    6. getIdFromSegmentBuffer
       public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
          while (true) {
              try {
                  // 获取buffer的读锁
                  buffer.rLock().lock();
                  // 获取当前的号段
                  final Segment segment = buffer.getCurrent();
      
                  if (    // nextReady is false (下一个号段没有初始化.)
                          !buffer.isNextReady()
                          // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
                          && (segment.getIdle() < 0.9 * segment.getStep())
                          // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
                          // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
                          && buffer.getThreadRunning().compareAndSet(false, true)
                  ) {
                      // 放入线程池进行异步更新.
                      service.execute(new Runnable() {
                          @Override
                          public void run() {
                              Segment next = buffer.getSegments()[buffer.nextPos()];
                              boolean updateOk = false;
                              try {
                                  updateSegmentFromDb(buffer.getKey(), next);
      
                                  // 更新成功,设置标记位为true
                                  updateOk = true;
                                  logger.info("update segment {} from db {}", buffer.getKey(), next);
                              } catch (Exception e) {
                                  logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                              } finally {
                                  if (updateOk) {
                                      // 获取buffer 的写锁
                                      buffer.wLock().lock();
                                      // next准备完成
                                      buffer.setNextReady(true);
                                      // next运行标记位设置为false
                                      buffer.getThreadRunning().set(false);
                                      buffer.wLock().unlock();
                                  } else {
                                      buffer.getThreadRunning().set(false);
                                  }
                              }
                          }
                      });
                  }
      
                  // 获取value
                  long value = segment.getValue().getAndIncrement();
      
                  // value < 当前号段的最大值,则返回改值
                  if (value < segment.getMax()) {
                      return new Result(value, Status.SUCCESS);
                  }
              } finally {
                  buffer.rLock().unlock();
              }
      
              // 等待下一个号段执行完成,执行代码在-> execute()
              // buffer.setNextReady(true);
              // buffer.getThreadRunning().set(false);
              waitAndSleep(buffer);
      
      
              try {
                  // buffer 级别加写锁.
                  buffer.wLock().lock();
                  final Segment segment = buffer.getCurrent();
                  // 获取value -> 为什么重复获取value, 多线程执行时,在进行waitAndSleep() 后,
                  // current segment可能会被修改. 直接进行一次判断,提高速度,并且防止出错(在交换Segment前进行一次检查).
                  long value = segment.getValue().getAndIncrement();
                  if (value < segment.getMax()) {
                      return new Result(value, Status.SUCCESS);
                  }
      
                  // 执行到这里, 其他的线程没有进行号段的调换,并且当前号段所有号码已经下发完成.
                  // 判断nextReady是否为true.
                  if (buffer.isNextReady()) {
                      // 调换segment
                      buffer.switchPos();
                      // 调换完成后, 设置nextReady为false
                      buffer.setNextReady(false);
                  } else {
                      // 进入这里的条件
                      // 1. 当前号段获取到的值大于maxValue
                      // 2. 另外一个号段还没有准备好
                      // 3. 等待时长大于waitAndSleep中的时间.
                      logger.error("Both two segments in {} are not ready!", buffer);
                      return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                  }
              } finally {
                  // finally代码块中释放写锁.
                  buffer.wLock().unlock();
              }
          }
      }
      
    7. waitAndSleep
          /**
       * 等待下一个号段执行完成
       * buffer.setNextReady(true);
       * buffer.getThreadRunning().set(false);
       * @param buffer
       */
      private void waitAndSleep(SegmentBuffer buffer) {
          int roll = 0;
          while (buffer.getThreadRunning().get()) {
              roll += 1;
              if(roll > 10000) {
                  try {
                      Thread.sleep(10);
                      break;
                  } catch (InterruptedException e) {
                      logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
                      break;
                  }
              }
          }
      }
      

    相关代码已上传到github: 美团 LEAF

    技术重点解析

    1. volatile 修饰变量提升可见性
    2. 使用读写锁ReadWriteLock,提升并发读下的读取速度
    3. 使用Atomic变量 ,利用CAS机制保证原子性, 提高并发能力.
      if (    // nextReady is false (下一个号段没有初始化.)
                          !buffer.isNextReady()
                          // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
                          && (segment.getIdle() < 0.9 * segment.getStep())
                          // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
                          // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
                          && buffer.getThreadRunning().compareAndSet(false, true)
                  ) {
                  ...
                  }
      
    4. 动态调整step来适应不同的请求速度.
       /**
               *  动态调整step
               *  1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP
               *  2) 15分钟 < duration < 30分钟 : nothing
               *  3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数
               */
              // 15分钟
              if (duration < SEGMENT_DURATION) {
                  if (nextStep * 2 > MAX_STEP) {
                      //do nothing
                  } else {
                      // 步数 * 2
                      nextStep = nextStep * 2;
                  }
                  // 15分 < duration < 30
              } else if (duration < SEGMENT_DURATION * 2) {
                  //do nothing with nextStep
              } else {
                  // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数)
                  nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
              }
      
    5. 使用事务,保证update 操作和select 操作的原子性.
           /**
        * 使用事务保证这两步的原子性(事务的隔离机制)
        * 根据数据库中对应tag的step来更新max_value,同时获取 tag的信息
        * 1. UPDATE leaf_alloc SET max_id = max_id + step WHERE biz_tag = #{tag}
        * 2. SELECT biz_tag, max_id, step FROM leaf_alloc WHERE biz_tag = #{tag}
        * @param tag
        * @return
        */
       LeafAlloc updateMaxIdAndGetLeafAlloc(String tag);
      
          @Override
      public LeafAlloc updateMaxIdAndGetLeafAlloc(String tag) {
          SqlSession sqlSession = sqlSessionFactory.openSession();
          try {
              sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxId", tag);
              LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", tag);
              sqlSession.commit();
              return result;
          } finally {
              sqlSession.close();
          }
      }
      

    文章链接: www.blackchen.site/meituan-leaf-1
    作者: BlackChen

    相关文章

      网友评论

          本文标题:美团Leaf 源码阅读(一)

          本文链接:https://www.haomeiwen.com/subject/zzlbmqtx.html