生产者 - 消费者模式
生产者 - 消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务 队列中,而消费者线程从任务队列中获取任务并执行。
生产者 - 消费者模式示意图
生产者 - 消费者模式的优点
- 从架构设计的角度来看,就是解耦。解耦对于 大型系统的设计非常重要,而解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。在生产者 - 消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者 - 消费者模式是一个不错的解耦方案。
- 支持异步,并且能够平衡生产者和消费者的速度差异,恰好能支持你用适量的线程。在生产者 - 消费者模式中,生产者线程只需要将任务添加到任务队列而无需等待任务被消费者线程执行完,也就是说任务的生产和消费是异步的。
- 支持批量执行以提升性能。
在监控系统动态采集的案例中,如果被监控系统有很多,相比每一条回传数据都直接 INSERT 到数据库,更好的方案是批量执行 SQL,那如何实现呢?这就要用到生产者 - 消费者模式了。
在下面的示例代码中,我们创建了 5 个消费者线程负责批量执行 SQL,这 5 个消费者线程以 while(true){} 循环方式批量地获取任务并批量地执行。需要注意的是,从任务队列中获取批量任务的方法 pollTasks() 中,首先是以阻塞方式获取任务队列中的一条任务,而后则是以非阻塞的方式获取任务;之所以首先采用阻塞方式,是因为如果任务队列中没有任务,这样的方式能够避免无谓的循环。
// 任务队列
BlockingQueue<Task> bq=new
LinkedBlockingQueue<>(2000);
// 启动 5 个消费者线程
// 执行批量任务
void start() {
ExecutorService es=xecutors
.newFixedThreadPool(5);
for (int i=0; i<5; i++) {
es.execute(()->{
try {
while (true) {
// 获取批量任务
List<Task> ts=pollTasks();
// 执行批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
// 从任务队列中获取批量任务
List<Task> pollTasks()
throws InterruptedException{
List<Task> ts=new LinkedList<>();
// 阻塞式获取一条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
// 非阻塞式获取一条任务
t = bq.poll();
}
return ts;
}
// 批量执行任务
execTasks(List<Task> ts) {
// 省略具体代码无数
}
支持分阶段提交以提升性能
利用生产者 - 消费者模式还可以轻松地支持一种分阶段提交的应用场景。下面是一个异步刷盘日志组件的设计方案:
class Logger {
// 任务队列
final BlockingQueue<LogMsg> bq
= new BlockingQueue<>();
//flush 批量
static final int batchSize=500;
// 只需要一个线程写日志
ExecutorService es =
Executors.newFixedThreadPool(1);
// 启动写日志线程
void start(){
File file=File.createTempFile(
"foo", ".log");
final FileWriter writer=
new FileWriter(file);
this.es.execute(()->{
try {
// 未刷盘日志数量
int curIdx = 0;
long preFT=System.currentTimeMillis();
while (true) {
LogMsg log = bq.poll(
5, TimeUnit.SECONDS);
// 写日志
if (log != null) {
writer.write(log.toString());
++curIdx;
}
// 如果不存在未刷盘数据,则无需刷盘
if (curIdx <= 0) {
continue;
}
// 根据规则刷盘
if (log!=null && log.level==LEVEL.ERROR ||
curIdx == batchSize ||
System.currentTimeMillis()-preFT>5000){
writer.flush();
curIdx = 0;
preFT=System.currentTimeMillis();
}
}
}catch(Exception e){
e.printStackTrace();
} finally {
try {
writer.flush();
writer.close();
}catch(IOException e){
e.printStackTrace();
}
}
});
}
// 写 INFO 级别日志
void info(String msg) {
bq.put(new LogMsg(
LEVEL.INFO, msg));
}
// 写 ERROR 级别日志
void error(String msg) {
bq.put(new LogMsg(
LEVEL.ERROR, msg));
}
}
// 日志级别
enum LEVEL {
INFO, ERROR
}
class LogMsg {
LEVEL level;
String msg;
// 省略构造函数实现
LogMsg(LEVEL lvl, String msg){}
// 省略 toString() 实现
String toString(){}
}
网友评论