延迟功能调度器
public class SystemTimer {
// 执行任务线程池
private ExecutorService taskExecutor = Executors.newFixedThreadPool(1, runnable -> {
Thread thread = new Thread(runnable, "executor-pool");
thread.setDaemon(true);
thread.setUncaughtExceptionHandler((t, e) -> System.err.println("Uncaught exception in thread '" + t.getName() + "':" + e));
return thread;
});
// 延迟队列
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue();
// 时间轮
private TimingWheel timingWheel = new TimingWheel(1000L, 5, System.currentTimeMillis(), delayQueue);
private static SystemTimer INSTANCE;
public static SystemTimer getInstance() {
if (INSTANCE == null) {
synchronized (SystemTimer.class) {
if (INSTANCE == null) {
INSTANCE = new SystemTimer();
}
}
}
return INSTANCE;
}
private SystemTimer() {
new Thread(() -> {
while (true){
advanceClock(1000L);
}
}).start();
}
/**
* 推动时间轮转动
* @param timeoutMs
*/
private void advanceClock(Long timeoutMs) {
try {
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null){
timingWheel.advanceClock(bucket.getExpiration());
bucket.flush(this::addTask);
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 添加任务
* @param timedTask
*/
public void addTask(TimerTaskEntry timedTask) {
if (!timingWheel.addTask(timedTask)) {// 到期或取消
if (!timedTask.isCancle()) {// 到期立即执行
taskExecutor.submit(timedTask.getTask());
}
}
}
}
时间轮
public class TimingWheel {
// 一个槽表示的时间范围
private Long tickMs;
// 轮大小
private Integer wheelSize;
// 一个时间轮表示的时间范围
private Long interval;
// 时间轮指针
private volatile long currentTime;
private TimerTaskList[] buckets;
private DelayQueue<TimerTaskList> delayQueue;
// 上层时间轮
private volatile TimingWheel overflowWheel;
public TimingWheel(Long tickMs, Integer wheelSize, Long currentTime, DelayQueue<TimerTaskList> delayQueue) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs * wheelSize;
this.buckets = new TimerTaskList[wheelSize];
this.currentTime = currentTime - (currentTime % tickMs);
this.delayQueue = delayQueue;
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new TimerTaskList(UUID.randomUUID().toString());
}
}
/**
* 添加任务到对应的槽
* @param timedTask
* @return
*/
public boolean addTask(TimerTaskEntry timedTask) {
Long expirationMs = timedTask.getExpirationMs();
if(timedTask.isCancle()){// 任务取消
return false;
}
long delayMs = expirationMs - currentTime;
if(delayMs < tickMs){// 任务到期
return false;
}else if(delayMs < interval){// 添加到对应的槽
long virtualId = expirationMs / tickMs;
int bucketIndex = (int) (virtualId % wheelSize);
TimerTaskList bucket = buckets[bucketIndex];
bucket.addTask(timedTask);
// 设置槽过期时间并将任务入队
if(bucket.setExpiration(expirationMs - (expirationMs % tickMs))){
delayQueue.offer(bucket);
}
return true;
}else{// 添加到上层时间轮
if(overflowWheel == null){
addOverflowWheel();
}
overflowWheel.addTask(timedTask);
return true;
}
}
/**
* 尝试推荐时间轮
* @param expiration
*/
public void advanceClock(Long expiration) {
if(expiration >= currentTime + tickMs){
currentTime = expiration - (expiration % tickMs);
if(overflowWheel != null){
overflowWheel.advanceClock(expiration);
}
}
}
private TimingWheel addOverflowWheel() {
if (overflowWheel == null) {
synchronized (this) {
if (overflowWheel == null) {
// 注意这里第一个参数为interval
overflowWheel = new TimingWheel(interval, wheelSize, currentTime, delayQueue);
}
}
}
return overflowWheel;
}
}
槽(任务列表)
public class TimerTaskList implements Delayed {
// 唯一标识
private String id;
// 槽过期时间
private AtomicLong expiration = new AtomicLong(-1L);
private TimerTaskEntry root = new TimerTaskEntry(-1L,null);
{
root.next = root;
root.prev = root;
}
public TimerTaskList(String id) {
this.id = id;
}
/**
* 添加任务
* @param timedTask
*/
public void addTask(TimerTaskEntry timedTask) {
synchronized (this) {
timedTask.bucket = this;
TimerTaskEntry tail = root.prev;
timedTask.next = root;
timedTask.prev = tail;
tail.next = timedTask;
root.prev = timedTask;
}
}
/**
* 删除任务
* @param timedTask
*/
public void removeTask(TimerTaskEntry timedTask) {
synchronized (this) {
if (timedTask.bucket.id.equals(this.id)) {
timedTask.next.prev = timedTask.prev;
timedTask.prev.next = timedTask.next;
timedTask.bucket = null;
timedTask.next = null;
timedTask.prev = null;
}
}
}
/**
* 重新分配槽
* 执行当前槽任务时会调用该方法
* @param timedTaskFlush
*/
public void flush(Consumer<TimerTaskEntry> timedTaskFlush) {
synchronized (this){
TimerTaskEntry timedTask = root.next;
while (timedTask != null && !timedTask.equals(root)){
removeTask(timedTask);
timedTaskFlush.accept(timedTask);
timedTask = root.next;
}
expiration.set(-1);
}
}
@Override
public long getDelay(TimeUnit unit) {
return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
}
@Override
public int compareTo(Delayed o) {
if (o instanceof TimerTaskList) {
return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
}
return 0;
}
public Boolean setExpiration(Long expiration) {
return this.expiration.getAndSet(expiration) != expiration;
}
public Long getExpiration() {
return expiration.get();
}
}
任务
public class TimerTaskEntry {
// 延迟时间
private Long delayMs;
// 任务
private Runnable task;
// 过期时间戳
private Long expirationMs;
private AtomicBoolean cancel;
protected TimerTaskEntry next;
protected TimerTaskEntry prev;
protected TimerTaskList bucket;
public TimerTaskEntry(Long delayMs, Runnable task) {
this.delayMs = delayMs;
this.task = task;
this.expirationMs = System.currentTimeMillis() + delayMs;
this.cancel = new AtomicBoolean(false);
this.next = this.prev = null;
this.bucket = null;
}
public boolean isCancle() {
return cancel.get();
}
public boolean cancel(){
return cancel.compareAndSet(false,true);
}
public Runnable getTask() {
return task;
}
public Long getExpirationMs() {
return expirationMs;
}
}
测试
public class Test {
public static void main(String[] args) throws InterruptedException {
SystemTimer systemTimer = SystemTimer.getInstance();
TimerTaskEntry taskEntry1 = new TimerTaskEntry(1000L, () -> System.out.println("任务执行1->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry2 = new TimerTaskEntry(2000L, () -> System.out.println("任务执行2->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry3 = new TimerTaskEntry(3000L, () -> System.out.println("任务执行3->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry4 = new TimerTaskEntry(4000L, () -> System.out.println("任务执行4->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry5 = new TimerTaskEntry(5000L, () -> System.out.println("任务执行5->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry6 = new TimerTaskEntry(6000L, () -> System.out.println("任务执行6->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry7 = new TimerTaskEntry(10000L, () -> System.out.println("任务执行7->" + System.currentTimeMillis()));
TimerTaskEntry taskEntry8 = new TimerTaskEntry(20000L, () -> System.out.println("任务执行8->" + System.currentTimeMillis()));
systemTimer.addTask(taskEntry1);
systemTimer.addTask(taskEntry2);
systemTimer.addTask(taskEntry3);
systemTimer.addTask(taskEntry4);
systemTimer.addTask(taskEntry5);
systemTimer.addTask(taskEntry6);
systemTimer.addTask(taskEntry7);
systemTimer.addTask(taskEntry8);
Thread.sleep(20000);
new Thread(() -> {
TimerTaskEntry taskEntry9 = new TimerTaskEntry(1500L, () -> System.out.println("任务执行9->" + System.currentTimeMillis()));
systemTimer.addTask(taskEntry9);
}).start();
}
}
网友评论