Worker Thread 模式及其实现
Worker Thread 模式可以类比现实世界里车间的工作模式:车间里的工人,有活儿了,大家一起干,没活儿了就聊聊天等着。你可以参考下面的示意图来理解,Worker Thread 模式中Worker Thread 对应到现实世界里,其实指的就是车间里的工人。不过这里需要注意的是,车间里的工人数量往往是确定的。

在Java 语言中,通常使用阻塞队列做任务池,然后创建一个线程池来消费阻塞队列中的任务。
下面的示例代码是用线程池实现的 echo 服务端,相比于 Thread-Per-Message 模式的实现,改动非常少,仅仅是创建了一个最多线程数为 500 的线程池 es,然后通过 es.execute() 方法将请求处理的任务提交给线程池处理。
ExecutorService es = Executors
.newFixedThreadPool(500);
final ServerSocketChannel ssc =
ServerSocketChannel.open().bind(
new InetSocketAddress(8080));
// 处理请求
try {
while (true) {
// 接收请求
SocketChannel sc = ssc.accept();
// 将请求处理任务提交给线程池
es.execute(()->{
try {
// 读 Socket
ByteBuffer rb = ByteBuffer
.allocateDirect(1024);
sc.read(rb);
// 模拟处理请求
Thread.sleep(2000);
// 写 Socket
ByteBuffer wb =
(ByteBuffer)rb.flip();
sc.write(wb);
// 关闭 Socket
sc.close();
}catch(Exception e){
throw new UncheckedIOException(e);
}
});
}
} finally {
ssc.close();
es.shutdown();
}
正确地创建线程池
- 使用创建有界的队列来接收任务。
- 在创建线程池时,清晰地指明拒绝策略,
- 在实际工作中给线程赋予一个业务相关的名字。
综合以上这三点建议,echo 程序中创建线程可以使用下面的示例代码。
ExecutorService es = new ThreadPoolExecutor(
50, 500,
60L, TimeUnit.SECONDS,
// 注意要创建有界队列
new LinkedBlockingQueue<Runnable>(2000),
// 建议根据业务需求实现 ThreadFactory
r->{
return new Thread(r, "echo-"+ r.hashCode());
},
// 建议根据业务需求实现 RejectedExecutionHandler
new ThreadPoolExecutor.CallerRunsPolicy());
避免线程死锁
提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重,一不小心可能导致线程相互等待,发生死锁。
网友评论