写在所有之前
在日常业务中可能遇到生成业务类全局ID的情况,这类ID的关键点在于全局不重复,对于单例来说,这个不难实现,但是对于分布式场景下,如何保证每个独立部署的服务都能生成互补重复的唯一键呢,它的核心思路就是找到一个唯一的“权威”来做这个事情,也就是说独立于分布式的应用来找一位发布者。但是这又引出另一个问题,如何保证这个唯一“权威”不成为系统瓶颈呢?
实际上目前不少支持分布式部署的中间件都有自己的CAP逻辑,美团推出的开源项目Leaf也就是利用了数据库和ZK以及他们的分布式方案最终实现了分布式Id生成系统。
具体的实现逻辑可见美团技术团队的官方文档
- 框架介绍
-
源码地址
本文的目的是从源码的角度介绍Leaf是如何工作的。如前文所说,在Leaf的设计中有两种实现方式,一是基于数据库的segment算法,二是基于ZK的雪花算法。我们先整体看一下Leaf源码的结构:
Leaf项目框架.png
Leaf的源码很简单,分为两部分,leaf-core作为具体的Id生成实现自成一体,另外的leaf-server是官方以web服务的形式对外提供查询api和监控系统。
监控web.png
实际上,我们在阅读源码时只用关注到leaf-core,leaf-server仅仅是展示层,在真实项目中可用,也可自行替换。以下从segment(基于database)和snowflake(基于雪花算法)两个方面分别进行源码解读。
segment
原理解释
实际上,segemnt利用数据库作为一个Id生成规则的存放地址,从表结构可以看到:
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '',
`max_id` bigint NOT NULL DEFAULT '1',
`step` int NOT NULL,
`description` varchar(256) DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
)
biz_tag:业务标签,它意味着该表中每一条数据都是一个业务的id生成规则,比如在一个电商系统中,除了订单id需要生成外,可能还有商品条形码id,如此就会出现两条表记录。
max_id:当前规则会分配的最大id值。注意这里是bigint类型,因此意味着segment方式生成的id就是普通数字。
step:步长,不难理解,当前规则的号段就是从max_id - step ~ max_id。
我们注意到这一系列字段中leaf_alloc表中并不会存储当前的id,那客户端在申请一个新id时,它是怎么知道下一个id是多少呢?实际上,当前id是记录在该服务运行时内存中。
这就涉及到segment形式中的核心概念——segment,它的定义如下:
public class Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
private SegmentBuffer buffer;
}
没错第一个成员变量就是当前id值。可以想见,当leaf服务启动时,程序会读取db中的规则配置,将其读入内存并以Segment形式存储(见其他三个fields),并初始化当前id=0.
这就是segment的大致逻辑,总结起来,它利用了db存储规则,然后在每个leaf-core启动时加载规则,并初始化当前id。读到这里有个小问题:“分布式系统中,自然leaf服务也会分布式部署,当有多个leaf服务时,如何保证生成的id不重复呢”。这里先说结论:
每个leaf-core在启动时都会去读取db中的规则(如果没有则需提前手动insert)。每次读取前都会根据步长(step)取修改max_id:
max_id = max_id + step
理解了吧,也就是说每个leaf服务都有一个独立的号段(max_id-step, max_id),因此它们在分配id时并不会重复。
关于其原理言尽于此,我们直接看看源码:
segemnt核心类如下,
一个实现类SegmentIDGenImpl实现接口IDGen,结束!是不是很简单
segment核心类.png
在IDGen interface中两方法get,init
public interface IDGen {
Result get(String key);
boolean init();
}
自然从init实现方法开始,里面涉及到两个具体方法updateCacheFromDb和updateCacheFromDbAtEveryMinute。从字面理解,将规则从db中加载到segment对象中,并且从开启每分钟同步一次的定时任务。关注点可以放在它的加载过程。
private void updateCacheFromDb() {
logger.info("update cache from db");
StopWatch sw = new Slf4JStopWatch();
try {
// 1.获取所有biz_tag
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
// 2.获取当前缓存中的biz_tag
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌进cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
// 3.至此获得目前需要新增到cache中的tags,存放到insertTagsSet中
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//4. cache中已失效的tags从cache删除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
} catch (Exception e) {
logger.warn("update cache from db exception", e);
} finally {
sw.stop("updateCacheFromDb");
}
}
代码很简单,值得一提的是,第三步中segment对象被放到了SegmentBuffer中,实际上塞到本地缓存中的是SegmentBuffer,它是个什么玩意儿?这涉及到美团对leaf的一层优化。当value==max_id后,会更新db中的规则,并reload规则到segmnt中,这里涉及到数据库的I/O,在高qps情况下这个过程会频繁发生。
单缓存.png
因此leaf在当前value达到某个阈值后,会提前开启下一个max_id的生成,并把该规则读入新segment中,一旦segment1中id达到max_id,则直接内存级别切换segment即可。
双缓存.png
因此引入了SegmentBuffer来管理这两个segemnts:
public class SegmentBuffer {
private String key;
private Segment[] segments; //双buffer
private volatile int currentPos; //当前的使用的segment的index
private volatile boolean nextReady; //下一个segment是否处于可切换状态
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //线程是否在运行中
private final ReadWriteLock lock;
private volatile int step;
private volatile int minStep;
private volatile long updateTimestamp;
}
到这里我们看到了leaf-segemnt形式中缓存的最上层是业务tag-SegmentBuffer,SegmentBuffer中对应了两个Segment,而当前的value就存在segment当中。
存储结构.png
因此我们看到上面提到的updateCacheFromDb方法,实际上是在更新第一层关系,也就是扫描db中所有的bizTag,跟缓存中做比对,新增的bizTag在缓存中创建map,删除缓存中不存在的bizTag,保留与db一致的bizTag。需要注意的是,这时候对于新增的bizTag仅仅是在cache中创建空的SegmentBuffer。与updateCacheFromDb对应,updateSegmentFromDb就在更新第二层。
public void updateSegmentFromDb(String key, Segment segment) {
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
if (!buffer.isInitOk()) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
} else if (buffer.getUpdateTimestamp() == 0) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
} else {
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
//表示15min内更新过,代表更新频繁,这样的化,通过增加步长,
// 缓存更多的id,来减少对db的操作
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
//相当于duration >= 30min,表示该key的访问并不频繁,因此可以降低步长。
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
}
// must set value before set max
long value = leafAlloc.getMaxId() - buffer.getStep();
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
}
这样一来,当客户端请求获取一个id时,会传入当前服务所属bizTag,如果cache中存在则直接找到segment并获取其中的value;如果cache中不存在bizTag则直接返回找不到的异常。这里有个疑问:缓存中找不到bizTag的时候,为什么不去db中主动查询一遍呢?我猜想,开发者的设计理念就是通过缓存去应对高并发场景降低db的I/O,直接去避免缓存穿透的问题。
写到这里,笔者意识到一个问题,当部署了多个leaf服务,此时流量可能随机的访问这些leaf,可能存在一个问题,那就是leaf生成的id的确是全局唯一,但不见得是全局按时间递增。
以上是leaf服务初始化或从db中load规则,准备数据的源码内容。接下来是最关键的客户端获取id的过程:
public Result get(final String key) {
if (!initOK) {
return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
}
//如果key(bizTag)存在,表示对应的规则已加载到缓存
if (cache.containsKey(key)) {
SegmentBuffer buffer = cache.get(key);
//如果SegmentBuffer未初始化(仅实例化)则先初始化
if (!buffer.isInitOk()) {
synchronized (buffer) {
// 双重验证:第一次验证是快速判断SegmentBuffer是否需要初始化,如果需要则进入同步模块,
// 在进入同步模块后,为了避免其他线程在当前线程等待锁的过程中进行了初始化,因此会二次初始化
if (!buffer.isInitOk()) {
try {
updateSegmentFromDb(key, buffer.getCurrent());
logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
buffer.setInitOk(true);
} catch (Exception e) {
logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
}
}
}
}
//id都是从本地缓存中获取,即便当前缓存中不存在数据,也是先由db同步到cache中,再获取
return getIdFromSegmentBuffer(cache.get(key));
}
return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
}
最终获取id的过程在getIdFromSegmentBuffer中,代码执行过程参见下图:
get id的过程.png
可以看到,在getIdFromSegmentBuffer过程中主要发生了两件事:
- 当前id+1,得到应该返回的id,但是这个id在返回前需要判断是否超过了规则定义的max_id,如果超过了(严格来说等于也不行)。则将当前的buffer指向另一个备用的segment,他在之前已经预加载过。
- 第二件事,实际上就是有个异步的判断,在刚进入getIdFromSegmentBuffer就会判断,是否当前的value值超过了10% * max_id,如果超过了,就预加载另一个segment。
根据上图,和这两点看这段逻辑就会很清晰了。leaf segemnt形式介绍完成,总体来说,它是一种利用数据库持久化id生成规则,而非具体的id数据。而把具体的id生成任务丢给内存来进行,这样做的好处,在于即便在高并发的场景下,内存可以最快的反应请求,并且每次生成的规则都代表了一批数据,因此真实的db
I/O也不会很高。
网友评论