1.概念
/**
* Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no
* task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to
* this executor; use a dedicated executor.
*/
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
}
GlobalEventExecutor是具备任务队列的单线程事件执行器,其适合用来实行时间短,碎片化的任务
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor {
//单例对象
public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
//维护的任务队列
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
//线程工厂
final ThreadFactory threadFactory =
new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
//一个循环运行的执行器
private final TaskRunner taskRunner = new TaskRunner();
//标记当前线程是否启动
private final AtomicBoolean started = new AtomicBoolean();
//运行中的线程
volatile Thread thread;
}
2.inEventLoop方法
判断当前执行代码是否在同一个线程,有两种情况
- 第一次没有异步线程,则创建新线程
- 若在非当前运行线程中添加任务,判断异步线程是否在执行,如果没有则创建新的线程(原线程已经执行完毕)
3.测试代码
3.1
{
for (int i=0;i<5;i++)
{
Future<Integer> future=GlobalEventExecutor.INSTANCE.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("eventExecutor threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());
return 1;
}
});
System.out.println("threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());
}
}
输出结果:
执行的线程是同一个线程,在执行线程中, inEventLoop为true
threadId:1 inEventLoop:false
eventExecutor threadId:12 inEventLoop:true
threadId:1 inEventLoop:false
threadId:1 inEventLoop:false
threadId:1 inEventLoop:false
threadId:1 inEventLoop:false
eventExecutor threadId:12 inEventLoop:true
eventExecutor threadId:12 inEventLoop:true
eventExecutor threadId:12 inEventLoop:true
eventExecutor threadId:12 inEventLoop:true
3.2
每隔1秒钟添加任务,将会看到线程都是新创建的,因为线程执行完毕后会自动退出
{
for (int i=0;i<5;i++)
{
Future<Integer> future=GlobalEventExecutor.INSTANCE.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("eventExecutor threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());
return 1;
}
});
Thread.sleep(1000);
System.out.println("threadId:"+Thread.currentThread().getId()+
" inEventLoop:"+GlobalEventExecutor.INSTANCE.inEventLoop());
}
}
输出结果
eventExecutor threadId:12 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:13 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:14 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:15 inEventLoop:true
threadId:1 inEventLoop:false
eventExecutor threadId:16 inEventLoop:true
threadId:1 inEventLoop:false
4.源码分析
启动线程的逻辑:当前没有线程且线程没有在任务执行状态
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
addTask(task);
if (!inEventLoop()) {
startThread();
}
}
private void startThread() {
if (started.compareAndSet(false, true)) {
Thread t = threadFactory.newThread(taskRunner);
// Set the thread before starting it as otherwise inEventLoop() may return false and so produce
// an assert error.
// See https://github.com/netty/netty/issues/4357
thread = t;
t.start();
}
}
final class TaskRunner implements Runnable {
@Override
public void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
try {
task.run();
} catch (Throwable t) {
logger.warn("Unexpected exception from the global event executor: ", t);
}
if (task != quietPeriodTask) {
continue;
}
}
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
// Terminate if there is no task in the queue (except the noop task).
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// Mark the current thread as stopped.
// The following CAS must always success and must be uncontended,
// because only one thread should be running at the same time.
boolean stopped = started.compareAndSet(true, false);
assert stopped;
// Check if there are pending entries added by execute() or schedule*() while we do CAS above.
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// A) No new task was added and thus there's nothing to handle
// -> safe to terminate because there's nothing left to do
// B) A new thread started and handled all the new tasks.
// -> safe to terminate the new thread will take care the rest
break;
}
// There are pending tasks added again.
if (!started.compareAndSet(false, true)) {
// startThread() started a new thread and set 'started' to true.
// -> terminate this thread so that the new thread reads from taskQueue exclusively.
break;
}
// New tasks were added, but this worker was faster to set 'started' to true.
// i.e. a new worker thread was not started by startThread().
// -> keep this thread alive to handle the newly added entries.
}
}
}
}
5.DefaultPromise的实现
本质是使用EventExecutor的execute方法,将通知方法放到异步线程队列中,所以其回调也是在EventExecutor的线程中执行
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
回调调用堆栈
![](https://img.haomeiwen.com/i1297060/62741f80de47f152.png)
测试代码:
Promise<Integer> promise=GlobalEventExecutor.INSTANCE.newPromise();
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println("threadId:"+Thread.currentThread().getId());
System.out.println("operationComplete");
}
});
System.out.println("threadId:"+Thread.currentThread().getId());
promise.trySuccess(2);
网友评论