一、背景
网上这方面的源码解读及使用示例,已经是非常多了。我更多的是站在自己的角度,首次阅读的过程中,结合网上的一些资料,总结以备检查。
data:image/s3,"s3://crabby-images/11b8a/11b8acef286b8955886961395bc9a025ddbcd054" alt=""
data:image/s3,"s3://crabby-images/ee899/ee899e35b5c05571f5eb1baea3e5a65455f91641" alt=""
二、类的结构
data:image/s3,"s3://crabby-images/71346/713468371a49deaf217a0c8ad9e01d9ce86e7adb" alt=""
这里可以看到,它还有三个重要的内部类:
- HashedWheelBucket
- HashedWheelTimeout
- Worker
2.1、类的关系
data:image/s3,"s3://crabby-images/db5a6/db5a6905930358be8a6801de0d971dcf14acf801" alt=""
2.2、HashedWheelTimer
data:image/s3,"s3://crabby-images/e2036/e2036c27c145551796e2c214ef535501944d84e9" alt=""
2.3、HashedWheelBucket
data:image/s3,"s3://crabby-images/cf397/cf3975985ce254320b38795fc4b89c6a7d08bfdb" alt=""
2.4、HashedWheelTimeout
data:image/s3,"s3://crabby-images/791e3/791e3894369b1906c04819a5826896da4c5dbcae" alt=""
三、CAS安全更新
1、volatile修饰变量;2、使用原子类AtomicIntegerFieldUpdater去更新
- HashedWheelTimer
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
// 如果当前值是0,则更新为1
WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)
在start()方法中,CAS更新成功,则工作线程启动。
data:image/s3,"s3://crabby-images/f4f1f/f4f1f5c5cb1bb130a2d5ade04064444fdf9bff26" alt=""
- HashedWheelTimeout
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private volatile int state = ST_INIT;
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
data:image/s3,"s3://crabby-images/978d4/978d4ed81aae2225ff6dc55cdbdcfa2801ca406e" alt=""
data:image/s3,"s3://crabby-images/9c215/9c21542d08f75c81669185627cbdbfa2a4d2990a" alt=""
四、HashedWheelTimeout源码
- 双向链表结构double-linked-list
private static final class HashedWheelTimeout implements Timeout {
// 定义定时任务的3个状态:初始化、取消、过期
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
// 用来CAS方式更新定时任务状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
// 具体到期需要执行的任务
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;
// remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
// HashedWheelTimeout will be added to the correct HashedWheelBucket.
// 离任务执行的轮数,当将此任务加入到bucket中时,计算该值,每过一轮,该值减一。
long remainingRounds;
// This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
// As only the workerThread will act on it there is no need for synchronization / volatile.
// 双向链表结构,由于只有worker线程会访问,这里不需要synchronization / volatile
HashedWheelTimeout next;
HashedWheelTimeout prev;
// The bucket to which the timeout was added
// 定时任务所在的bucket
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public Timer timer() {
return timer;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
//这里只是修改状态为ST_CANCELLED,会在下次tick时,在bucket中移除
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
// 加入到时间轮的待取消队列,并在每次tick的时候,从相应bucket中移除。
timer.cancelledTimeouts.add(this);
return true;
}
// 从bucket中移除任务
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}
@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}
public void expire() {
// 1.先将任务过期
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
//2、继续执行任务
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
//
}
}
五、HashedWheelBucket的源码
HashedWheelBucket用来存放HashedWheelTimeout,结构类似于LinkedList。提供了expireTimeouts(long deadline)方法来过期并执行bucket中的定时任务。
- 链表 linked-list
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
// 过期并执行bucket中的到期任务,tick到该bucket的时候,worker线程会调用这个方法,
//根据deadline和remainingRounds判断任务是否过期
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
// 遍历bucket中的所有定时任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 定时任务到期
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
// 如果round数已经为0,deadline却>当前bucket的deadline,说放错bucket了,这种情况应该不会出现
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
//没有到期,轮数-1
timeout.remainingRounds --;
}
timeout = next;
}
}
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
public void clearTimeouts(Set<Timeout> set) {
for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}
六、remainingRounds
remainingRounds的含义就是:时钟还要完整转几回才能执行到任务。
比如你的任务是在2500ms之后才执行的(deadline = 2500ms),时钟总共10个刻度,而 tickDuration 为100ms,当前时钟指针已经拨动三次(tick=3)。
那 2500 / 100 = 25,
(25 - 3) / 10 约等于 2,
2 就表示 时钟转完当前圈(25-10=15),还要再转一圈(15-10),在第三圈才能执行到该任务。
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
// 如果任务被取消了,则直接过掉
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
七、Worker的源码
任务的执行者
// 用于处理取消的任务
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// 时钟指针转动的次数
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
// 之前如果startTime=0,就会进入await状态,这里就要唤醒它
startTimeInitialized.countDown();
do {
/*
* 等待到下一次 tick 时如果没有时间延迟返回tickDuration * (tick + 1);
* 如果延迟了则不空转,立马返回“当前时间”
* 这个“当前时间”是什么呢?比如时钟指针原本第三次 tick 是在300ms,但是由于前面的任务阻塞了50ms,导致进来的时候已经是350ms了
* 那么这里的返回值就会变成350ms,至于返回值变成350ms会怎么样?貌似也没有怎么样,就是不等待马上执行罢了
*/
final long deadline = waitForNextTick();
if (deadline > 0) {
// 与运算取模,取出数组桶的坐标,相信这个没少见过了
int idx = (int) (tick & mask);
// 前面说过HashedWheelTimeout是可以取消任务的,其实就是在这里取消的
processCancelledTasks();
// 在时间轮中取出“指针指向的块”
HashedWheelBucket bucket =
wheel[idx];
// 将任务填充到时间块中
transferTimeoutsToBuckets();
// 取出任务并执行
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理取消的任务
processCancelledTasks();
}
八、CountDownLatch
- await()
-
countDown()
image.png
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
HashedWheelTimer线程进入同步队列等待
data:image/s3,"s3://crabby-images/9adc6/9adc693e44cf81279a7eb19d8bb86d61f1f7699a" alt=""
wroker线程唤醒
data:image/s3,"s3://crabby-images/b51cb/b51cb1f5538d51e6dc0a82b05108fad45b44f406" alt=""
九、MpscQueue
Multi producer single consumer 多生产者单消费者,无锁队列。
data:image/s3,"s3://crabby-images/e4b98/e4b987f98f509a73aa72fc6338a9bbf945b9bdf0" alt=""
data:image/s3,"s3://crabby-images/8ef32/8ef328cf6562049d28c11a5b072916e4053d802f" alt=""
网友评论