前言
据国家大气研究中心的查尔斯·奈特称,一般的雪花大约由10^19个水分子组成。在雪花形成过程中,会形成不同的结构分支,所以说大自然中不存在两片完全一样的雪花,每一片雪花都拥有自己漂亮独特的形状。雪花算法表示生成的id如雪花般独一无二。
snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0。
核心思想:分布式,唯一。
一、雪花算法的基本概念
雪花算法是一种全局ID生成算法,其核心思想是将64位的long型ID分为四个部分,分别为:时间戳、工作机器ID、数据中心ID和序列号。通过将数据映射到具有特定结构的分布式系统中,实现数据的存储和查询。该算法由一系列节点组成,每个节点负责存储数据的一部分。这些节点通过哈希函数将数据映射到特定的位置,形成类似于雪花结构的分布式系统。通过这种方式,雪花算法能够在分布式系统中保证ID的唯一性和有序性。
雪花算法具有以下优点:
- 易于扩展:可以方便地添加或删除节点,适应数据量的变化。
- 容错性高:即使部分节点发生故障,整个系统仍可正常运行。
- 负载均衡:数据在节点间分布均匀,有效利用系统资源。
- 适用于各种数据访问模式:支持随机访问和顺序访问等访问模式。
二、雪花算法的原理
![](https://img.haomeiwen.com/i13587608/efa608ed5d58608f.png)
雪花算法是 64 位 的二进制,一共包含了四部分:
1bit-符号位
- 1位标识:最高位是符号位,正数是0,负数是1。由于 id 一般是正数,所以第一位都是0。
41bit-时间戳
- 接下来41位存储毫秒级时间戳,41位可以表示 2^41-1 毫秒。
- 转化成年则是:(2^41-1)/(1000606024356)=69 年。这个时间戳大概可以使用 69年 不重复。
10bit-机器位
- 10位的数据机器位,包括 5 位 datacenterId 和 5 位 workerId,最多可以部署 2^10=1024 台机器。
- 这里的 5 位可以表示的最大整数时 2^5-1=31,即可以用 0、1、2、3、…31 这 32 个数字,来表示不同的 datacenterId 或 workerId
12bit-序列号
- 用来记录同毫秒内产生的不同ID,12位的计数顺序支持每个节点每毫秒(同一机器,同一时间戳)产生 4096 个ID序号。
三、源码
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.net.Inet4Address;
import java.net.UnknownHostException;
@Slf4j
public class SnowflakeIdWorker {
/**
* 开始时间截 (2015-01-01)
*/
private final long twepoch = 1489111610226L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 5L;
/**
* 数据标识id所占的位数
*/
private final long dataCenterIdBits = 5L;
/**
* 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id,结果是31
*/
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 12L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(12+5)
*/
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(5+5+12)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~31)
*/
private long workerId;
/**
* 数据中心ID(0~31)
*/
private long dataCenterId;
/**
* 毫秒内序列(0~4095)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
private static SnowflakeIdWorker idWorker;
static {
idWorker = new SnowflakeIdWorker(getWorkId(), getDataCenterId());
}
//==============================Constructors=====================================
/**
* 构造函数
*
* @param workerId 工作ID (0~31)
* @param dataCenterId 数据中心ID (0~31)
*/
public SnowflakeIdWorker(long workerId, long dataCenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = workerId;
this.dataCenterId = dataCenterId;
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
*
* @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (dataCenterId << dataCenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
*
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
*
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
private static Long getWorkId() {
try {
String hostAddress = Inet4Address.getLocalHost().getHostAddress();
int[] ints = StringUtils.toCodePoints(hostAddress);
int sums = 0;
for (int b : ints) {
sums += b;
}
return (long) (sums % 32);
} catch (UnknownHostException e) {
// 如果获取失败,则使用随机数备用
return RandomUtils.nextLong(0, 31);
}
}
private static Long getDataCenterId() {
int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName());
int sums = 0;
for (int i : ints) {
sums += i;
}
return (long) (sums % 32);
}
/**
* 静态工具类
*
* @return id
*/
public static synchronized Long generateId() {
return idWorker.nextId();
}
}
3.1 注意事项
- 1、雪花算法(SnowflakeAlgorithm)在系统运行时只需要调用一次,然后通过SnowflakeIdInit.snowflakeId.nextId()自增来生成唯一的ID。
- 2、雪花算法是一种分布式唯一ID生成器,它基于Twitter的雪花算法(SnowflakeAlgorithm)实现。该算法通过生成一个64位的ID来确保在分 布式系统中生成唯一的ID。
- 3、在雪花算法中,ID被划分为多个部分,包括时间戳、机器标识和序列号等。首次调用时,会根据当前时间戳、机器标识和序列号生成一个唯一的ID。之后,每次调用nextId()方法时,会根据上次生成的ID计算出下一个ID。
- 4、具体来说,SnowflakeIdInit.snowflakeId.nextId()方法会根据上次生成的ID,增加一定的值(通常是1),然后生成一个新的ID。这个新的ID会比上次生成的ID更大,因为时间戳部分会随着时间的推移而增加。
- 5、雪花算法生成的ID是单调递增的,并且具有较好的分布性和扩展性。但是,由于机器标识和序列号的长度有限,所以在某些情况下可能会出现ID冲突的情况。为了解决这个问题,可以引入冲突检测机制或者使用其他更高级的分布式唯一ID生成器。
3.2 遇到的问题:数据倾斜
使用的过程中,我发现产生的分布式id始终是偶数,这样会产生严重的数据倾斜。
![](https://img.haomeiwen.com/i13587608/6dd57e9e1e6d8945.png)
分布式的组成是时间戳+机器id+序列号,跟踪源码发现如果时间戳不一样,每次序列号都是0,所以64位二进制最后12位一直都是000000000000,而决定奇偶的关键在于最后一个0。为0就是偶数,为1就是奇数。
![](https://img.haomeiwen.com/i13587608/d3ee427a2875cbbc.png)
3.2.1 数据倾斜问题解决
-
方案一、不是同一毫秒,取一个0或者1的随机数
image.png
-
方案二、不同毫秒内自增
四、防止时钟回拨
因为机器的原因会发生时间回拨,我们的雪花算法是强依赖我们的时间的,如果时间发生回拨,有可能会生成重复的ID,在我们上面的nextId中我们用当前时间和上一次的时间进行判断,如果当前时间小于上一次的时间那么肯定是发生了回拨,普通的算法会直接抛出异常,这里我们可以对其进行优化,一般分为两个情况:
- 如果时间回拨时间较短,比如配置5ms以内,那么可以直接等待一定的时间,让机器的时间追上来。
- 如果时间的回拨时间较长,我们不能接受这么长的阻塞等待,那么又有两个策略:
- 1、直接拒绝,抛出异常,打日志,通知RD时钟回滚。
- 2、利用扩展位,上面我们讨论过不同业务场景位数可能用不到那么多,那么我们可以把扩展位数利用起来了,比如当这个时间回拨比较长的时候,我们可以不需要等待,直接在扩展位加1。2位的扩展位允许我们有3次大的时钟回拨,一般来说就够了,如果其超过三次我们还是选择抛出异常,打日志。
通过上面的几种策略可以比较的防护我们的时钟回拨,防止出现回拨之后大量的异常出现。下面是修改之后的代码,这里修改了时钟回拨的逻辑:
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* 分布式全局ID雪花算法解决方案
*
* 防止时钟回拨
* 因为机器的原因会发生时间回拨,我们的雪花算法是强依赖我们的时间的,如果时间发生回拨,
* 有可能会生成重复的ID,在我们上面的nextId中我们用当前时间和上一次的时间进行判断,
* 如果当前时间小于上一次的时间那么肯定是发生了回拨,
* 普通的算法会直接抛出异常,这里我们可以对其进行优化,一般分为两个情况:
* 如果时间回拨时间较短,比如配置5ms以内,那么可以直接等待一定的时间,让机器的时间追上来。
* 如果时间的回拨时间较长,我们不能接受这么长的阻塞等待,那么又有两个策略:
* 直接拒绝,抛出异常,打日志,通知RD时钟回滚。
* 利用扩展位,上面我们讨论过不同业务场景位数可能用不到那么多,那么我们可以把扩展位数利用起来了,
* 比如当这个时间回拨比较长的时候,我们可以不需要等待,直接在扩展位加1。
* 2位的扩展位允许我们有3次大的时钟回拨,一般来说就够了,如果其超过三次我们还是选择抛出异常,打日志。
* 通过上面的几种策略可以比较的防护我们的时钟回拨,防止出现回拨之后大量的异常出现。下面是修改之后的代码,这里修改了时钟回拨的逻辑:
*/
public class SnowflakeIdFactory {
private static final Logger log = LoggerFactory.getLogger(SnowflakeIdFactory.class);
/**
* EPOCH是服务器第一次上线时间点, 设置后不允许修改
* 2018/9/29日,从此时开始计算,可以用到2089年
*/
private static long EPOCH = 1538211907857L;
/**
* 每台workerId服务器有3个备份workerId, 备份workerId数量越多, 可靠性越高, 但是可部署的sequence ID服务越少
*/
private static final long BACKUP_COUNT = 3;
/**
* worker id 的bit数,最多支持8192个节点
*/
private static final long workerIdBits = 5L;
/**
* 数据中心标识位数
*/
private static final long dataCenterIdBits = 5L;
/**
* 序列号,支持单节点最高每毫秒的最大ID数4096
* 毫秒内自增位
*/
private static final long sequenceBits = 12L;
/**
* 机器ID偏左移12位
*/
private static final long workerIdShift = sequenceBits;
/**
* 数据中心ID左移17位(12+5)
*/
private static final long dataCenterIdShift = sequenceBits + workerIdBits;
/**
* 时间毫秒左移22位(5+5+12)
*/
private static final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/**
* sequence掩码,确保sequnce不会超出上限
* 最大的序列号,4096
* -1 的补码(二进制全1)右移12位, 然后取反
* 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
*/
private static final long sequenceMask = -1L ^ (-1L << sequenceBits);
//private final static long sequenceMask = ~(-1L << sequenceBits);
/**
* 实际的最大workerId的值 结果是31,8091
* workerId原则上上限为1024, 但是需要为每台sequence服务预留BACKUP_AMOUNT个workerId,
* (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
*/
//private static final long maxWorkerId = (1L << workerIdBits) / (BACKUP_COUNT + 1);
//原来代码 -1 的补码(二进制全1)右移13位, 然后取反
private static final long maxWorkerId = -1L ^ (-1L << workerIdBits);
//private final static long maxWorkerId = ~(-1L << workerIdBits);
/**
* 支持的最大数据标识id,结果是31
*/
private static final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/**
* long workerIdBits = 5L;
* -1L 的二进制: 1111111111111111111111111111111111111111111111111111111111111111
* -1L<<workerIdBits = -32 ,二进制: 1111111111111111111111111111111111111111111111111111111111100000
* workerMask= -1L ^ -32 = 31, 二进制: 11111
*/
private static long workerMask= -1L ^ (-1L << workerIdBits);
//进程编码
private long processId = 1L;
private static long processMask=-1L ^ (-1L << dataCenterIdBits);
/**
* 工作机器ID(0~31)
* snowflake算法给workerId预留了10位,即workId的取值范围为[0, 1023],
* 事实上实际生产环境不大可能需要部署1024个分布式ID服务,
* 所以:将workerId取值范围缩小为[0, 511],[512, 1023]
* 这个范围的workerId当做备用workerId。workId为0的备用workerId是512,
* workId为1的备用workerId是513,以此类推
*/
private static long workerId;
/**
* 数据中心ID(0~31)
*/
private long dataCenterId;
/**
* 当前毫秒生成的序列
*/
private long sequence = 0L;
/**
* 上次生成ID的时间戳
*/
private long lastTimestamp = -1L;
private long extension = 0L;
private long maxExtension = 0L;
/**
* 保留workerId和lastTimestamp, 以及备用workerId和其对应的lastTimestamp
*/
private static Map<Long, Long> workerIdLastTimeMap = new ConcurrentHashMap<>();
/**
* 最大容忍时间, 单位毫秒, 即如果时钟只是回拨了该变量指定的时间, 那么等待相应的时间即可;
* 考虑到sequence服务的高性能, 这个值不易过大
*/
private static final long MAX_BACKWARD_MS = 3;
private static SnowflakeIdFactory idWorker;
static {
idWorker = new SnowflakeIdFactory();
}
static {
Calendar calendar = Calendar.getInstance();
calendar.set(2018, Calendar.NOVEMBER, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
// EPOCH是服务器第一次上线时间点, 设置后不允许修改
EPOCH = calendar.getTimeInMillis();
// 初始化workerId和其所有备份workerId与lastTimestamp
// 假设workerId为0且BACKUP_AMOUNT为4, 那么map的值为: {0:0L, 256:0L, 512:0L, 768:0L}
// 假设workerId为2且BACKUP_AMOUNT为4, 那么map的值为: {2:0L, 258:0L, 514:0L, 770:0L}
/* for (int i = 0; i<= BACKUP_COUNT; i++){
workerIdLastTimeMap.put(workerId + (i * maxWorkerId), 0L);
}*/
}
//成员类,IdGenUtils的实例对象的保存域
private static class SnowflakeIdGenHolder {
private static final SnowflakeIdFactory instance = new SnowflakeIdFactory();
}
//外部调用获取IdGenUtils的实例对象,确保不可变
public static SnowflakeIdFactory getInstance(){
return SnowflakeIdGenHolder.instance;
}
/**
* 静态工具类
*
* @return
*/
public static Long generateId(){
long id = idWorker.nextId();
return id;
}
//初始化构造,无参构造有参函数,默认节点都是0
public SnowflakeIdFactory(){
//this(0L, 0L);
this.dataCenterId = getDataCenterId(maxDataCenterId);
//获取机器编码
this.workerId = getWorkerId(dataCenterId, maxWorkerId);
}
/**
* 构造函数
* @param workerId 工作ID (0~31)
* @param dataCenterId 数据中心ID (0~31)
*/
public SnowflakeIdFactory(long workerId, long dataCenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = workerId;
this.dataCenterId = dataCenterId;
}
/**
* 获取带自定义前缀的全局唯一编码
*/
public String getStrCodingByPrefix(String prefix){
Long ele = this.nextId();
return prefix + ele.toString();
}
/**
* 获得下一个ID (该方法是线程安全的)
* 在单节点上获得下一个ID,使用Synchronized控制并发,而非CAS的方式,
* 是因为CAS不适合并发量非常高的场景。
*
* 考虑时钟回拨
* 缺陷: 如果连续两次时钟回拨, 可能还是会有问题, 但是这种概率极低极低
* @return
*/
public synchronized long nextId() {
long currentTimestamp = timeGen();
// 当发生时钟回拨时
if (currentTimestamp < lastTimestamp){
// 如果时钟回拨在可接受范围内, 等待即可
long offset = lastTimestamp - currentTimestamp;
if ( offset <= MAX_BACKWARD_MS){
try {
//睡(lastTimestamp - currentTimestamp)ms让其追上
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(offset));
//时间偏差大小小于5ms,则等待两倍时间
//wait(offset << 1);
//Thread.sleep(waitTimestamp);
currentTimestamp = timeGen();
//如果时间还小于当前时间,那么利用扩展字段加1
//或者是采用抛异常并上报
if (currentTimestamp < lastTimestamp) {
//扩展字段
//extension += 1;
//if (extension > maxExtension) {
//服务器时钟被调整了,ID生成器停止服务.
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - currentTimestamp));
//}
}
} catch (Exception e) {
e.printStackTrace();
}
}else {
//扩展字段
/*extension += 1;
if (extension > maxExtension) {
//服务器时钟被调整了,ID生成器停止服务.
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - currentTimestamp));
}*/
tryGenerateKeyOnBackup(currentTimestamp);
}
}
//对时钟回拨简单处理
/* if (currentTimestamp < lastTimestamp) {
//服务器时钟被调整了,ID生成器停止服务.
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - currentTimestamp));
}*/
// 如果和最后一次请求处于同一毫秒, 那么sequence+1
if (lastTimestamp == currentTimestamp) {
// 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//自旋等待到下一毫秒
currentTimestamp = waitUntilNextTime(lastTimestamp);
}
//判断是否溢出,也就是每毫秒内超过4095,当为4096时,与sequenceMask相与,sequence就等于0
/*if (sequence == sequenceMask) {
// 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳
currentTimestamp = this.waitUntilNextTime(lastTimestamp);
}*/
} else {
// 如果是一个更近的时间戳, 那么sequence归零
sequence = 0L;
}
// 更新上次生成id的时间戳
lastTimestamp = currentTimestamp;
// 更新map中保存的workerId对应的lastTimestamp
//workerIdLastTimeMap.put(this.workerId, lastTimestamp);
if (log.isDebugEnabled()) {
log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTimestamp)), workerId, sequence);
}
// 进行移位操作生成int64的唯一ID
//时间戳右移动23位
long timestamp = (currentTimestamp - EPOCH) << timestampLeftShift;
//workerId 右移动10位
long workerId = this.workerId << workerIdShift;
//dataCenterId 右移动(sequenceBits + workerIdBits = 17位)
long dataCenterId = this.dataCenterId << dataCenterIdShift;
return timestamp | dataCenterId | workerId | sequence;
}
/**
* 尝试在workerId的备份workerId上生成
* 核心优化代码在方法tryGenerateKeyOnBackup()中,BACKUP_COUNT即备份workerId数越多,
* sequence服务避免时钟回拨影响的能力越强,但是可部署的sequence服务越少,
* 设置BACKUP_COUNT为3,最多可以部署1024/(3+1)即256个sequence服务,完全够用,
* 抗时钟回拨影响的能力也得到非常大的保障。
* @param currentMillis 当前时间
*/
private long tryGenerateKeyOnBackup(long currentMillis){
// 遍历所有workerId(包括备用workerId, 查看哪些workerId可用)
for (Map.Entry<Long, Long> entry:workerIdLastTimeMap.entrySet()){
this.workerId = entry.getKey();
// 取得备用workerId的lastTime
Long tempLastTime = entry.getValue();
lastTimestamp = tempLastTime==null?0L:tempLastTime;
// 如果找到了合适的workerId
if (lastTimestamp<=currentMillis){
return lastTimestamp;
}
}
// 如果所有workerId以及备用workerId都处于时钟回拨, 那么抛出异常
throw new IllegalStateException("Clock is moving backwards, current time is "
+currentMillis+" milliseconds, workerId map = " + workerIdLastTimeMap);
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long waitUntilNextTime(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 获取WorkerId
* @param dataCenterId
* @param maxWorkerId
* @return
*/
protected static long getWorkerId(long dataCenterId, long maxWorkerId) {
StringBuffer mpid = new StringBuffer();
mpid.append(dataCenterId);
String name = ManagementFactory.getRuntimeMXBean().getName();
if (!name.isEmpty()) {
// GET jvmPid
mpid.append(name.split("@")[0]);
}
// MAC + PID 的 hashcode 获取16个低位
return (mpid.toString().hashCode() & 0xffff) % (maxWorkerId + 1);
}
/**
* 获取机器编码 用来做数据ID
* 数据标识id部分 通常不建议采用下面的MAC地址方式,
* 因为用户通过破解很容易拿到MAC进行破坏
*/
protected static long getDataCenterId(long tempMaxDataCenterId) {
if (tempMaxDataCenterId < 0L || tempMaxDataCenterId > maxDataCenterId) {
tempMaxDataCenterId = maxDataCenterId;
}
long id = 0L;
try {
InetAddress ip = InetAddress.getLocalHost();
NetworkInterface network = NetworkInterface.getByInetAddress(ip);
if (network == null) {
id = 1L;
} else {
byte[] mac = network.getHardwareAddress();
id = ((0x000000FF & (long) mac[mac.length - 1])
| (0x0000FF00 & (((long) mac[mac.length - 2]) << 8))) >> 6;
id = id % (tempMaxDataCenterId + 1);
}
} catch (Exception e) {
System.out.println(" getDatacenterId: " + e.getMessage());
}
return id;
}
public static void testProductIdByMoreThread(int dataCenterId, int workerId, int n) throws InterruptedException {
List<Thread> tlist = new ArrayList<>();
Set<Long> setAll = new HashSet<>();
CountDownLatch cdLatch = new CountDownLatch(10);
long start = System.currentTimeMillis();
int threadNo = dataCenterId;
Map<String,SnowflakeIdFactory> idFactories = new HashMap<>();
for(int i=0;i<10;i++){
//用线程名称做map key.
idFactories.put("snowflake"+i,new SnowflakeIdFactory(workerId, threadNo++));
}
for(int i=0;i<10;i++){
Thread temp =new Thread(new Runnable() {
@Override
public void run() {
Set<Long> setId = new HashSet<>();
SnowflakeIdFactory idWorker = idFactories.get(Thread.currentThread().getName());
for(int j=0;j<n;j++){
setId.add(idWorker.nextId());
}
synchronized (setAll){
setAll.addAll(setId);
log.info("{}生产了{}个id,并成功加入到setAll中.",Thread.currentThread().getName(),n);
}
cdLatch.countDown();
}
},"snowflake"+i);
tlist.add(temp);
}
for(int j=0;j<10;j++){
tlist.get(j).start();
}
cdLatch.await();
long end1 = System.currentTimeMillis() - start;
log.info("共耗时:{}毫秒,预期应该生产{}个id, 实际合并总计生成ID个数:{}",end1,10*n,setAll.size());
}
public static void testProductId(int dataCenterId, int workerId, int n){
SnowflakeIdFactory idWorker = new SnowflakeIdFactory(workerId, dataCenterId);
SnowflakeIdFactory idWorker2 = new SnowflakeIdFactory(workerId+1, dataCenterId);
Set<Long> setOne = new HashSet<>();
Set<Long> setTow = new HashSet<>();
long start = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
setOne.add(idWorker.nextId());//加入set
}
long end1 = System.currentTimeMillis() - start;
log.info("第一批ID预计生成{}个,实际生成{}个<<<<*>>>>共耗时:{}",n,setOne.size(),end1);
for (int i = 0; i < n; i++) {
setTow.add(idWorker2.nextId());//加入set
}
long end2 = System.currentTimeMillis() - start;
log.info("第二批ID预计生成{}个,实际生成{}个<<<<*>>>>共耗时:{}",n,setTow.size(),end2);
setOne.addAll(setTow);
log.info("合并总计生成ID个数:{}",setOne.size());
}
public static void testPerSecondProductIdNums(){
SnowflakeIdFactory idWorker = new SnowflakeIdFactory(1, 2);
long start = System.currentTimeMillis();
int count = 0;
for (int i = 0; System.currentTimeMillis()-start<1000; i++,count=i) {
/** 测试方法一: 此用法纯粹的生产ID,每秒生产ID个数为300w+ */
idWorker.nextId();
/** 测试方法二: 在log中打印,同时获取ID,此用法生产ID的能力受限于log.error()的吞吐能力.
* 每秒徘徊在10万左右. */
//log.error("{}",idWorker.nextId());
}
long end = System.currentTimeMillis()-start;
System.out.println(end);
System.out.println(count);
}
public static void main(String[] args) {
/** case1: 测试每秒生产id个数?
* 结论: 每秒生产id个数300w+ */
testPerSecondProductIdNums();
/** case2: 单线程-测试多个生产者同时生产N个id,验证id是否有重复?
* 结论: 验证通过,没有重复. */
//testProductId(1,2,10000);//验证通过!
//testProductId(1,2,20000);//验证通过!
/** case3: 多线程-测试多个生产者同时生产N个id, 全部id在全局范围内是否会重复?
* 结论: 验证通过,没有重复. */
/* try {
testProductIdByMoreThread(1,2,100000);//单机测试此场景,性能损失至少折半!
} catch (InterruptedException e) {
e.printStackTrace();
}*/
}
}
由于时间回拨导致的生产重复的ID的问题,其实百度和美团都有自己的解决方案了,有兴趣可以去看看,下面不是它们官网文档的信息:
-
百度UIDGenerator:https://github.com/baidu/uid-...
-
美团Leaf:https://tech.meituan.com/2019...
-
leaf-segment 方案
- 优化:双buffer + 预分配
- 容灾:Mysql DB 一主两从,异地机房,半同步方式
- 缺点:如果用segment号段式方案:id是递增,可计算的,不适用于订单ID生成场景,比如竞对在两天中午12点分别下单,通过订单id号相减就能大致计算出公司一天的订单量,这个是不能忍受的。
-
leaf-snowflake方案
-
使用Zookeeper持久顺序节点的特性自动对snowflake节点配置workerID
- 1.启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
- 2.如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
- 3.如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。
-
缓存workerID,减少第三方组件的依赖
-
由于强依赖时钟,对时间的要求比较敏感,在机器工作时NTP同步也会造成秒级别的回退,建议可以直接关闭NTP同步。要么在时钟回拨的时候直接不提供服务直接返回ERROR_CODE,等时钟追上即可。或者做一层重试,然后上报报警系统,更或者是发现有时钟回拨之后自动摘除本身节点并报警
-
-
参考:
https://segmentfault.com/a/1190000040964518
https://blog.csdn.net/bangyanya/article/details/134182630
https://blog.csdn.net/weixin_39075154/article/details/137965936
网友评论