引入目的:为解决项目中存在的并且不好优化的n+1查询慢问题
示例效果:
查询一组20个数据,每个子查询需要耗时1秒,返回结果并行组件需要1061ms,串行需要20057ms。并行处理的速度取决于任务数、线程数和子查询中最慢的那个。
package com.github.chenmingang.parseq;
import com.linkedin.parseq.ParTask;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.Tasks;
import java.util.ArrayList;
import java.util.List;
public class OnlineSimulation {
// static EngineAgent engine = EngineFactory.defaultEngine();
static EngineAgent engine = EngineFactory.getEngine(20, 1, 20);
public static void main(String[] args) {
long l1 = System.currentTimeMillis();
List<Model> result1 = new OnlineSimulation().queryAsync();
long l2 = System.currentTimeMillis();
System.out.println("time1:" + (l2 - l1));
//time1:1063
long l3 = System.currentTimeMillis();
List<Model> result2 = new OnlineSimulation().query();
long l4 = System.currentTimeMillis();
System.out.println("time2:" + (l4 - l3));
//time2:20060
engine.shutdown();
}
//n+1 查询
public List<Model> query() {
// 第一步查询
List<Model> modelList = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
modelList.add(new Model(i));
}
// 第二步查询
for (Model model : modelList) {
setName(model);
}
return modelList;
}
//n+1 查询
public List<Model> queryAsync() {
// 第一步查询
List<Model> modelList = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
modelList.add(new Model(i));
}
// 第二步查询
List<Task<Model>> tasks = new ArrayList<>();
for (Model model : modelList) {
Task<Model> task = engine.task(() -> setName(model));
tasks.add(task);
}
ParTask<Model> parTask = Tasks.par(tasks);
engine.run(parTask);
try {
parTask.await();
List<Model> successful = parTask.getSuccessful();
Throwable error = parTask.getError();
if (error != null) {
error.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return modelList;
}
/**
* 执行子查询并合并数据的模拟
*
* @param model
* @return
*/
private Model setName(Model model) {
model.setName("name-" + model.getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// throw new RuntimeException("");
return model;
}
class Model {
private Integer id;
private String name;
public Model(Integer id) {
this.id = id;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
根据这种列表式的查询场景对parseq做的封装
package com.github.chenmingang.parseq;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.batching.BatchingSupport;
import java.util.concurrent.*;
public class EngineFactory {
private static EngineFactory INSTANCE = new EngineFactory();
private final EngineAgent defaultEngine;
private EngineFactory() {
int numCores = Runtime.getRuntime().availableProcessors();
defaultEngine = getEngine(numCores + 1, 1, numCores + 1);
}
public static EngineAgent defaultEngine() {
return INSTANCE.defaultEngine;
}
public static EngineAgent getEngine(int poolSize, int scheduleSize, int queueNum) {
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(scheduleSize);
ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
ThreadPoolExecutor executors = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueNum), threadFactory,
(r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
final EngineBuilder builder = new EngineBuilder().setTaskExecutor(executors).setTimerScheduler(scheduler);
final BatchingSupport batchingSupport = new BatchingSupport();
builder.setPlanDeactivationListener(batchingSupport);
return new EngineAgent(builder.build(), executors, scheduler);
}
}
package com.github.chenmingang.parseq;
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Supplier;
public class EngineAgent {
private Engine engine;
private ThreadPoolExecutor executors;
private ScheduledExecutorService scheduler;
public EngineAgent(Engine engine, ThreadPoolExecutor executors,ScheduledExecutorService scheduler) {
this.engine = engine;
this.executors = executors;
this.scheduler = scheduler;
}
public <T> SettablePromise<T> async(Supplier<T> supplier) {
final SettablePromise<T> promise = Promises.settable();
getExecutors().execute(() -> {
try {
promise.done(supplier.get());
} catch (Exception e) {
promise.fail(e);
}
});
return promise;
}
public <T> Task<T> task(Supplier<T> supplier) {
return Task.async(() -> async(supplier));
}
public void run(final Task<?> task) {
engine.run(task);
}
public void shutdown() {
engine.shutdown();
executors.shutdown();
}
public ThreadPoolExecutor getExecutors() {
return executors;
}
public ScheduledExecutorService getScheduler() {
return scheduler;
}
}
依赖版本
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq-batching</artifactId>
<version>2.6.5</version>
</dependency>
网友评论