MultithreadEventLoopGroup
-
MultithreadEventLoopGroup
是之前提到的NioEventLoopGroup
的父类,是一个抽象类。在上节中提到它有一套计算线程数量的逻辑。 - 它继承自
MultithreadEventExecutorGroup
,而且真正的构造函数逻辑就在它的父类中。
MultithreadEventExecutorGroup
构造函数如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
......
//创建一个‘线程任务执行器’,传入一个线程工厂作为参数
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
......
}
第5行创建了一个线程任务执行器 ThreadPerTaskExecutor
,传入一个线程工厂对象作为参数。
线程工厂接口--ThreadFactory
ThreadFactory
是一个接口,只定义了一个newThread
方法如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
线程工厂使得线程的创建
和线程的执行逻辑
得到解耦,线程工厂可以设置线程的优先级、名字、是否后台线程、所属的ThreadGroup
等信息。例如常用的工具类Executors
的内部类DefaultThreadFactory
就是接口ThreadFactory
的一个实现类:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//创建新线程,设置是否后台运行,所属的ThreadGroup,优先级
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
执行器接口--Executor
Executor
接口是一个函数式接口,定义了一个execute
方法:
public interface Executor {
void execute(Runnable command);
}
上面的ThreadPerTaskExecutor
就是Executor
接口的实现类:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
它持有一个线程工厂ThreadFactory
对象,使用代理模式
,执行时使用线程工厂创建了一个新的线程,并启动。
Executor
接口提供了一种执行任务的新方式,不是直接新建线程并运行new Thread(Runnable).start()
。创建Executor
接口的实现类,将Runnable
对象传入执行器中执行。
Executor executor = new XXXExeutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
具体例如:
-
直接执行
Runnable
对象class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }}
-
ThreadPerTaskExecutor
的例子是新建了一个线程执行任务。 -
将任务按照一定的顺序执行,如
io.grpc.internal.SerializingExecutor
, 它将任务首先添加到队列中,然后按序执行。public final class SerializingExecutor implements Executor, Runnable { private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<Runnable>(); public SerializingExecutor(Executor executor) { Preconditions.checkNotNull(executor, "'executor' must not be null."); this.executor = executor; } //将任务添加到队列中,按序执行 @Override public void execute(Runnable r) { runQueue.add(checkNotNull(r, "'r' must not be null.")); schedule(r); } }
网友评论