HashedWheelTimer算法
序
George Varghese 和 Tony Lauck 1996 年的论文:Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility提出了一种定时轮的方式来管理和维护大量的Timer调度算法.Linux 内核中的定时器采用的就是这个方案。
原理
一个Hash Wheel Timer是一个环形结构,可以想象成时钟,分为很多格子,一个格子代表一段时间(越短Timer精度越高),并用一个List保存在该格子上到期的所有任务,同时一个指针随着时间流逝一格一格转动,并执行对应List中所有到期的任务。任务通过取模决定应该放入哪个格子。
环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽位时, 只需要遍历那个槽位的 task 即可知道哪些任务会超时(而使用线性结构, 你每次 tick 都需要遍历所有 task), 所以, 我们任务量大的时候, 相应的增加 wheel 的 ticksPerWheel 值, 可以减少 tick 时遍历任务的个数.
结构图
netty-hasedwheeltimer以上图为例,假设一个格子是1秒,则整个wheel能表示的时间段为8s,假如当前指针指向2,此时需要调度一个3s后执行的任务,显然应该加入到(2+3=5)的方格中,指针再走3次就可以执行了;如果任务要在10s后执行,应该等指针走完一个round零2格再执行,因此应放入4,同时将round(1)保存到任务中。检查到期任务时应当只执行round为0的,格子上其他任务的round应减1。
效率
- 添加任务:O(1)
- 删除/取消任务:O(1)
- 过期/执行任务:最差情况为O(n)->也就是当HashMap里面的元素全部hash冲突,退化为一条链表的情况。平均O(1)
槽位越多,每个槽位上的链表就越短,这里需要权衡时间与空间。
netty3.10的实现
相关参数
- tickDuration: 每 tick 一次的时间间隔, 每 tick 一次就会到达下一个槽位
- ticksPerWheel: 轮中的 slot 数,hash算法计算目标槽位
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}).
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
HashedWheelBucket定义
/**
* Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
* removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
* extra object creation is needed.
*/
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
boolean remove = false;
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
remove = true;
} else if (timeout.isCancelled()) {
remove = true;
} else {
timeout.remainingRounds --;
}
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove) {
remove(timeout);
}
timeout = next;
}
}
public void remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
public void clearTimeouts(Set<Timeout> set) {
for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
return head;
}
}
- HashedWheelTimeout
private static final class HashedWheelTimeout implements Timeout {
private static final int ST_INIT = 0;
private static final int ST_IN_BUCKET = 1;
private static final int ST_CANCELLED = 2;
private static final int ST_EXPIRED = 3;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
long remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
HashedWheelTimeout next;
HashedWheelTimeout prev;
// The bucket to which the timeout was added
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
public Timer getTimer() {
return timer;
}
public TimerTask getTask() {
return task;
}
public void cancel() {
int state = state();
if (state >= ST_CANCELLED) {
// fail fast if the task was cancelled or expired before.
return;
}
if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) {
// Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket.
// In this case we can just return here as it will be discarded by the WorkerThread when handling
// the adding of HashedWheelTimeout to the HashedWheelBuckets.
return;
}
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) {
return;
}
// Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick
// and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get
// GC'ed once the user has no reference to it anymore.
timer.timeouts.add(this);
}
public void remove() {
if (bucket != null) {
bucket.remove(this);
}
}
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
public boolean isCancelled() {
return state == ST_CANCELLED;
}
public boolean isExpired() {
return state > ST_IN_BUCKET;
}
public HashedWheelTimeout value() {
return this;
}
public void expire() {
if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
assert state() != ST_INIT;
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
StringBuilder buf = new StringBuilder(192);
buf.append(getClass().getSimpleName());
buf.append('(');
buf.append("deadline: ");
if (remaining > 0) {
buf.append(remaining);
buf.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining);
buf.append(" ns ago");
} else {
buf.append("now");
}
if (isCancelled()) {
buf.append(", cancelled");
}
buf.append(", task: ");
buf.append(getTask());
return buf.append(')').toString();
}
}
HashedWheelBucket创建
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
timeouts队列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
Worker
HashedWheelTimer的核心,主要处理tick的转动、过期任务。
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
transferTimeoutsToBuckets();
HashedWheelBucket bucket =
wheel[(int) (tick & mask)];
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
unprocessedTimeouts.add(timeout);
}
}
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
// Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout
// in the queue
timeout.remove();
continue;
}
long calculated = timeout.deadline / tickDuration;
long remainingRounds = (calculated - tick) / wheel.length;
timeout.remainingRounds = remainingRounds;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (DetectionUtil.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
- 定位槽位
HashedWheelBucket[] wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
HashedWheelBucket bucket = wheel[(int) (tick & mask)];
比如有16个槽,则mask为15,假设当前tick=30,则槽位=14
- 更新该槽位任务的remainingRounds
每走一个tick都要更新该tick对应的槽位下面的任务的remainingRounds或者执行到期的任务
bucket.expireTimeouts(deadline);
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
boolean remove = false;
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
remove = true;
} else if (timeout.isCancelled()) {
remove = true;
} else {
timeout.remainingRounds --;
}
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove) {
remove(timeout);
}
timeout = next;
}
}
执行到期任务
public void expire() {
if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
assert state() != ST_INIT;
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
注意,这里是同步执行,会阻塞整个timer的,需要异步。
- transfer
每走一个tick的时候,要把task从queue中取出来,放到槽位。
long calculated = timeout.deadline / tickDuration;
long remainingRounds = (calculated - tick) / wheel.length;
timeout.remainingRounds = remainingRounds;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
使用实例
/**
* tickDuration: 每 tick 一次的时间间隔, 每 tick 一次就会到达下一个槽位
* ticksPerWheel: 轮中的 slot 数
*/
@Test
public void testHashedWheelTimer() throws InterruptedException {
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(1000/**tickDuration**/, TimeUnit.MILLISECONDS, 16 /**ticksPerWheel**/);
System.out.println(LocalTime.now()+" submitted");
Timeout timeout = hashedWheelTimer.newTimeout((t) -> {
new Thread(){
@Override
public void run() {
System.out.println(new Date() + " executed");
System.out.println(hashedWheelTimer);
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date() + " FINISH");
}
}.start();
}, 5, TimeUnit.SECONDS);
hashedWheelTimer.newTimeout((t) -> {
new Thread(){
@Override
public void run() {
System.out.println(new Date() + " TASK2 executed");
System.out.println(hashedWheelTimer);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date() + " TASK2 FINISH");
}
}.start();
}, 15, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(500);
}
网友评论