by shihang.mai
1. 分布式id类比
用身份证号码推导分布式id(雪花算法)
分布式id推导- 城市->server
- 区域->thread
- 出生日期->时间戳
- 递增数字->server中的id
因我们用10进制,18位身份证号足够。而计算机用2进制,id要足够长,用long,64位
分布式id算法具备的条件:全局唯一、局部唯一、按情况保持递增
2. Leaf原理
Leaf=snowflake+segment
2.1 Segment
极致提高一个服务器性能,可以从以下几方面入手:
1. 资源锁细粒度化。如100个库存,改为4分,每个锁25.
2. 业务使用锁细粒度化。业务代码中,只对争用代码上锁。
3. 无锁化。真无锁(ThreadLoacl),假无锁(CAS)。
4. 异步+线程池。原来单线程执行1 2 3步,现在用线程池执行1 2步,再进行第3步。CompleteableFuture
Leaf的数据表
DROP TABLE IF EXISTS `leaf_alloc`;
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '',
`max_id` bigint(20) NOT NULL DEFAULT '1',
`step` int(11) NOT NULL,
`description` varchar(256) DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;
Leaf-Segment
- 首先会从Mysql中获取id集到id池1(对应代码中的Segment),两个id池组成双缓冲
- 当id池1使用达90%时,异步线程拉取id集到id池2
- 当id池1使用完后,切换id池2获取,两者角色交换,如此往复
参考"极致提高一个服务器性能",套用Leaf
- 数据库中已分biz_tag,做到了第1点
- 对Leaf改造的话,每个做业务的类中加入threadLocal,业务线程从id池获取id集,这样以后获取id都先从threadLocal中获取,更能提高性能.而多个不同的业务线程去id池取,可以用CAS竞争获取。做到了第3点。(Leaf没实现)
- 异步填充id池2,做到了第4点
2.2 Snowflake
雪花算法ID由long64位组成,其中含有时间戳。图中举例它的组成,实际并不是这样组成的,只要保证全局唯一、局部唯一、按情况保持递增即可
雪花算法有明显的弊端
- 依赖时间。会遇到时间回拨的情况,如果时间慢了,没所谓,校准也就向前,但是如果时间快了,就有毒了,就有可能出现重复Id了(时间回拨,回到之前的时间,刚好sequence也重新轮回)
- nodeId和areaId分配。一两个还好,直接在properties设置一下就行,多了这样做不就有毒?
上面两点,都可以通过zk进行解决。
Leaf-Snowflake针对问题2的解决方法:
利用zk创建持久节点的顺序性去分配nodeId和areaId
针对问题1的解决方法:
- 在创建持久节点时,会有创建时间。假如node1下线,由于是持久性节点,再一次上来的时候它可以拿到唯一的id,即上一次分配的id,保证了id的不重复和新分配。可以根据节点的创建时间和重新上线的时间做对比,大于则报错。
- 利用zk创建临时节点(临时节点可以保证大概率节点是活的,大概率:zk有一个心跳时间,在心跳时间内挂了),在node1重新上线的时候,通过rpc获取node2、node3、node4的系统时间,如果时间基本一致,那么认为没发生时间回拨(基本:默认500ms内,不会发生sequence轮回)
3. 吐槽
- 想用restful,但是实际又没用
@RequestMapping(value = "/api/segment/get/{key}")
public String getSegmentId(@PathVariable("key") String key) {
return get(key, segmentService.getId(key));
}
- 表设计用biz_tag字符串做主键,字符串主键在b+树会导致频繁的页分裂
DROP TABLE IF EXISTS `leaf_alloc`;
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '',
`max_id` bigint(20) NOT NULL DEFAULT '1',
`step` int(11) NOT NULL,
`description` varchar(256) DEFAULT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;
- 在构造方法写init的事、写死数据源、方法dataSource.setMaxActive和dataSource.setMinIdle()都没、异常处理应该用@ControllerAdvice处理
public SegmentService() throws SQLException, InitException {
Properties properties = PropertyFactory.getProperties();
boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true"));
if (flag) {
// Config dataSource
dataSource = new DruidDataSource();
dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
dataSource.init();
// Config Dao
IDAllocDao dao = new IDAllocDaoImpl(dataSource);
// Config ID Gen
idGen = new SegmentIDGenImpl();
((SegmentIDGenImpl) idGen).setDao(dao);
if (idGen.init()) {
logger.info("Segment Service Init Successfully");
} else {
throw new InitException("Segment Service Init Fail");
}
} else {
idGen = new ZeroIDGen();
logger.info("Zero ID Gen Service Init Successfully");
}
}
- 3循环做集合处理,用guava不香吗.
private void updateCacheFromDb() {
logger.info("update cache from db");
StopWatch sw = new Slf4JStopWatch();
try {
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//db中新加的tags灌进cache
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
}
//cache中已失效的tags从cache删除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
logger.info("Remove tag {} from IdCache", tag);
}
} catch (Exception e) {
logger.warn("update cache from db exception", e);
} finally {
sw.stop("updateCacheFromDb");
}
}
- 每分钟去做集合运算改ConcurrentHashMap<业务key,SegmentBuffer(id池)>,改为zk不香吗,直接用watch机制,直接实现即时使用
public boolean init() {
logger.info("Init ...");
// 确保加载到kv后才初始化成功
updateCacheFromDb();
initOK = true;
updateCacheFromDbAtEveryMinute();
return initOK;
}
- 用Executors创建线程池?不怕内存爆?
private void updateCacheFromDbAtEveryMinute() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("check-idCache-thread");
t.setDaemon(true);
return t;
}
});
service.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
updateCacheFromDb();
}
}, 60, 60, TimeUnit.SECONDS);
}
网友评论