美文网首页
分布式ID-Leaf

分布式ID-Leaf

作者: 麦大大吃不胖 | 来源:发表于2020-12-30 22:47 被阅读0次

    by shihang.mai

    1. 分布式id类比

    用身份证号码推导分布式id(雪花算法)

    分布式id推导
    • 城市->server
    • 区域->thread
    • 出生日期->时间戳
    • 递增数字->server中的id

    因我们用10进制,18位身份证号足够。而计算机用2进制,id要足够长,用long,64位

    分布式id算法具备的条件:全局唯一、局部唯一、按情况保持递增

    2. Leaf原理

    Leaf=snowflake+segment

    2.1 Segment

    极致提高一个服务器性能,可以从以下几方面入手:

    1. 资源锁细粒度化。如100个库存,改为4分,每个锁25.
    2. 业务使用锁细粒度化。业务代码中,只对争用代码上锁。
    3. 无锁化。真无锁(ThreadLoacl),假无锁(CAS)。
    4. 异步+线程池。原来单线程执行1 2 3步,现在用线程池执行1 2步,再进行第3步。CompleteableFuture
    

    Leaf的数据表

    DROP TABLE IF EXISTS `leaf_alloc`;
    
    CREATE TABLE `leaf_alloc` (
      `biz_tag` varchar(128)  NOT NULL DEFAULT '',
      `max_id` bigint(20) NOT NULL DEFAULT '1',
      `step` int(11) NOT NULL,
      `description` varchar(256)  DEFAULT NULL,
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`biz_tag`)
    ) ENGINE=InnoDB;
    
    Leaf-Segment
    1. 首先会从Mysql中获取id集到id池1(对应代码中的Segment),两个id池组成双缓冲
    2. 当id池1使用达90%时,异步线程拉取id集到id池2
    3. 当id池1使用完后,切换id池2获取,两者角色交换,如此往复

    参考"极致提高一个服务器性能",套用Leaf

    1. 数据库中已分biz_tag,做到了第1点
    2. 对Leaf改造的话,每个做业务的类中加入threadLocal,业务线程从id池获取id集,这样以后获取id都先从threadLocal中获取,更能提高性能.而多个不同的业务线程去id池取,可以用CAS竞争获取。做到了第3点。(Leaf没实现)
    3. 异步填充id池2,做到了第4点

    2.2 Snowflake

    雪花算法ID由long64位组成,其中含有时间戳。图中举例它的组成,实际并不是这样组成的,只要保证全局唯一、局部唯一、按情况保持递增即可

    雪花算法有明显的弊端

    1. 依赖时间。会遇到时间回拨的情况,如果时间慢了,没所谓,校准也就向前,但是如果时间快了,就有毒了,就有可能出现重复Id了(时间回拨,回到之前的时间,刚好sequence也重新轮回)
    2. nodeId和areaId分配。一两个还好,直接在properties设置一下就行,多了这样做不就有毒?

    上面两点,都可以通过zk进行解决。

    Leaf-Snowflake

    针对问题2的解决方法:
    利用zk创建持久节点的顺序性去分配nodeId和areaId

    针对问题1的解决方法:

    1. 在创建持久节点时,会有创建时间。假如node1下线,由于是持久性节点,再一次上来的时候它可以拿到唯一的id,即上一次分配的id,保证了id的不重复和新分配。可以根据节点的创建时间和重新上线的时间做对比,大于则报错。
    2. 利用zk创建临时节点(临时节点可以保证大概率节点是活的,大概率:zk有一个心跳时间,在心跳时间内挂了),在node1重新上线的时候,通过rpc获取node2、node3、node4的系统时间,如果时间基本一致,那么认为没发生时间回拨(基本:默认500ms内,不会发生sequence轮回)

    3. 吐槽

    1. 想用restful,但是实际又没用
    @RequestMapping(value = "/api/segment/get/{key}")
        public String getSegmentId(@PathVariable("key") String key) {
            return get(key, segmentService.getId(key));
        }
    
    1. 表设计用biz_tag字符串做主键,字符串主键在b+树会导致频繁的页分裂
    DROP TABLE IF EXISTS `leaf_alloc`;
    
    CREATE TABLE `leaf_alloc` (
      `biz_tag` varchar(128)  NOT NULL DEFAULT '',
      `max_id` bigint(20) NOT NULL DEFAULT '1',
      `step` int(11) NOT NULL,
      `description` varchar(256)  DEFAULT NULL,
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`biz_tag`)
    ) ENGINE=InnoDB;
    
    1. 在构造方法写init的事、写死数据源、方法dataSource.setMaxActive和dataSource.setMinIdle()都没、异常处理应该用@ControllerAdvice处理
    public SegmentService() throws SQLException, InitException {
            Properties properties = PropertyFactory.getProperties();
            boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true"));
            if (flag) {
                // Config dataSource
                dataSource = new DruidDataSource();
                dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
                dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
                dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
                dataSource.init();
    
                // Config Dao
                IDAllocDao dao = new IDAllocDaoImpl(dataSource);
    
                // Config ID Gen
                idGen = new SegmentIDGenImpl();
                ((SegmentIDGenImpl) idGen).setDao(dao);
                if (idGen.init()) {
                    logger.info("Segment Service Init Successfully");
                } else {
                    throw new InitException("Segment Service Init Fail");
                }
            } else {
                idGen = new ZeroIDGen();
                logger.info("Zero ID Gen Service Init Successfully");
            }
        }
    
    1. 3循环做集合处理,用guava不香吗.
    private void updateCacheFromDb() {
            logger.info("update cache from db");
            StopWatch sw = new Slf4JStopWatch();
            try {
                List<String> dbTags = dao.getAllTags();
                if (dbTags == null || dbTags.isEmpty()) {
                    return;
                }
                List<String> cacheTags = new ArrayList<String>(cache.keySet());
                Set<String> insertTagsSet = new HashSet<>(dbTags);
                Set<String> removeTagsSet = new HashSet<>(cacheTags);
                //db中新加的tags灌进cache
                for(int i = 0; i < cacheTags.size(); i++){
                    String tmp = cacheTags.get(i);
                    if(insertTagsSet.contains(tmp)){
                        insertTagsSet.remove(tmp);
                    }
                }
                for (String tag : insertTagsSet) {
                    SegmentBuffer buffer = new SegmentBuffer();
                    buffer.setKey(tag);
                    Segment segment = buffer.getCurrent();
                    segment.setValue(new AtomicLong(0));
                    segment.setMax(0);
                    segment.setStep(0);
                    cache.put(tag, buffer);
                    logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
                }
                //cache中已失效的tags从cache删除
                for(int i = 0; i < dbTags.size(); i++){
                    String tmp = dbTags.get(i);
                    if(removeTagsSet.contains(tmp)){
                        removeTagsSet.remove(tmp);
                    }
                }
                for (String tag : removeTagsSet) {
                    cache.remove(tag);
                    logger.info("Remove tag {} from IdCache", tag);
                }
            } catch (Exception e) {
                logger.warn("update cache from db exception", e);
            } finally {
                sw.stop("updateCacheFromDb");
            }
        }
    
    1. 每分钟去做集合运算改ConcurrentHashMap<业务key,SegmentBuffer(id池)>,改为zk不香吗,直接用watch机制,直接实现即时使用
    public boolean init() {
            logger.info("Init ...");
            // 确保加载到kv后才初始化成功
            updateCacheFromDb();
            initOK = true;
            updateCacheFromDbAtEveryMinute();
            return initOK;
        }
    
    1. 用Executors创建线程池?不怕内存爆?
    private void updateCacheFromDbAtEveryMinute() {
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("check-idCache-thread");
                    t.setDaemon(true);
                    return t;
                }
            });
            service.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    updateCacheFromDb();
                }
            }, 60, 60, TimeUnit.SECONDS);
        }
    

    相关文章

      网友评论

          本文标题:分布式ID-Leaf

          本文链接:https://www.haomeiwen.com/subject/vxbjnktx.html