最近看了一些关于多线程的介绍,让我感受最深的是有关多线程异步和同步共同使用已达到优化代码的目的,很值得去学习这种思想。
想看流程图:
Untitled Diagram (1).png
现有这种需求,从题库中根据每个学生的学生id生成相对应的一份试卷,试卷上的题目是在题库中根据学生的情况生成的。生成的题目还涉及到图片的下载和最终pdf文档的生成和上传,生成的过程中可能有些题目可能还会更新。
CompletionService 先了解一下
//线程池
private static ExecutorService dockMakeService = Executors.newFixedThreadPool(Consts.THREAD_COUNT_BASE * 2);
// CompletionService 是将线程池Executor和阻塞队列BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更加加单,
//能够让异步任务的执行结果有序化。先执行完的任务放到阻塞队列中。
private static CompletionService docCompletionService = new ExecutorCompletionService(dockMakeService);
1 先要解决的问题,处理文档的异步化。
package com.chen.thread.vo;
import java.util.List;
//处理生成文档的实体类
public class PendingDocVo {
private final String docName;
private final List<Integer> problemVoList;
public PendingDocVo(String docName, List<Integer> problemVoList) {
this.docName = docName;
this.problemVoList = problemVoList;
}
public String getDocName() {
return docName;
}
public List<Integer> getProblemVoList() {
return problemVoList;
}
}
public static class MakeDocTask implements Callable<String> {
private PendingDocVo pendingDocVo;
public MakeDocTask(PendingDocVo pendingDocVo) {
this.pendingDocVo = pendingDocVo;
}
@Override
public String call() throws Exception {
System.out.println("开始处理文档:" + pendingDocVo.getDocName() + "............");
long start = System.currentTimeMillis();
// 每份试卷的题目都会有相同的部分,这个时候就需要考虑如何给处理题目加缓存了?
return localName;
}
}
2 处理题目异步化同时通过缓存来避免 同样的线程执行了同样的任务。 可以使用java的构造函数的重载来完成。
package com.chen.thread.vo;
import java.util.concurrent.Future;
/**
* 并发处理题目时 需要题目返回的结果
*
*/
public class MultiProblemVo {
//题目处理后的任务
private final String problemText;
private final Future<ProblemCacheVo> problemFuture;
//当多个线程在处理题目时,如果题目已经被处理完了就直接使用,放到这构造函数式string类型的对象中
public MultiProblemVo(String problemText) {
this.problemText = problemText;
this.problemFuture=null;
}
//当多个线程在处理题目时,如果题目没有被处理,就放到构造函数是Future的的对象中。
public MultiProblemVo(Future<ProblemCacheVo> problemFuture) {
this.problemFuture = problemFuture;
this.problemText=null;
}
public String getProblemText() {
return problemText;
}
public Future<ProblemCacheVo> getProblemFuture() {
return problemFuture;
}
}
3 处理题目的线程
//存放正在处理的题目的缓存
private static ConcurrentHashMap<Integer, Future<ProblemCacheVo>> processingCachVo
= new ConcurrentHashMap<>();
//处理题目的任务 将数据库的文本题目
private static class ProblemTask implements Callable<ProblemCacheVo> {
private ProblemDBVo problemDBVo;
private Integer problemId;
public ProblemTask(ProblemDBVo problemDBVo, Integer problemId) {
this.problemDBVo = problemDBVo;
this.problemId = problemId;
}
@Override
public ProblemCacheVo call() throws Exception {
try {
ProblemCacheVo problemCacheVo = new ProblemCacheVo();
//业务代码 比如每个题目处理的时间 为500ms
problemCacheVo.setProcessedContent(BaseProblemService.makeProblem(problemId, problemDBVo.getContent()));
problemCacheVo.setProblemSha(problemDBVo.getSha());
problemCach.put(problemId, problemCacheVo);
return problemCacheVo;
} finally {
//题目处理完成之后需要从正在处理题目的缓存任务中remove掉
processingCachVo.remove(problemId);
}
}
}
4 遍历每个文档中的所有题目 判断是否需要启用线程来执行
public static MultiProblemVo makeProblemService(Integer problemId) {
//题目id 是否有题目缓存
ProblemCacheVo problemCacheVo = problemCach.get(problemId);
if (problemCacheVo == null) {
// 没有题目时需要重启线程执行该任务
System.out.println("题目【" + problemId + "】不存在需要新起任务");
return new MultiProblemVo(getProblemFuture(problemId));
} else {
//存在题目时,还需要判断题目题目是否已经更新
String problemSha = ProblemBank.gerProblemSha(problemId);
//没有更新就直接用
if (problemCacheVo.getProblemSha().equals(problemSha)) {
System.out.println("题目【" + problemId + "】在缓存中已存在并且没有被修改可以直接使用");
return new MultiProblemVo(problemCacheVo.getProcessedContent());
} else {
//更新的题目需要重新创建线程
System.out.println("题目【" + problemId + "】在缓存中存在,但是被修改过,需要重新生成新的任务");
return new MultiProblemVo(getProblemFuture(problemId));
}
}
}
//返回题目的工作任务(虽然在处理时是FutureTask,但是最终返回的结果是是一个future)
public static Future<ProblemCacheVo> getProblemFuture(Integer problemId) {
Future<ProblemCacheVo> problemFuture = processingCachVo.get(problemId);
if (problemFuture == null) {
ProblemDBVo problemDBVo = ProblemBank.getProblem(problemId);
// 题目交给线程去执行
ProblemTask problemTask = new Problem-Task(problemDBVo, problemId);
//将处理题目的线程也能够封装成FutureTask
FutureTask<ProblemCacheVo> ft = new FutureTask<>(problemTask);
//从处理缓存个的线程中判断是否存在该线程任务
problemFuture = processingCachVo.putIfAbsent(problemId, ft);
if (problemFuture == null) {
//表示没有别的线程正在处理当前的题目
problemFuture = ft;
makeProbleExec.execute(ft);
System.out.println("题目【" + problemId + "】计算任务启动,请稍等");
} else {
System.out.println("其他线程启动了题目【" + problemId + "】的计算任务,该任务不必开启");
}
}
return problemFuture;
}
网友评论