uploader 简介
- 提供一个批处理的工具,hugegraph为大数据而生,根据其官网的测试吞吐量来看,多节点多线程下能够达到5w个节点/s 的插入量。
- uploader 按照batch的方式插入数据,单个batch 包含的节点数量可以指定,默认为500/batch, 也可以指定并发的线程数。
- 假设1一亿个节点, 单个batch 500, 10个线程并发, uploader的做法就是 遍历 节点文件,每500个 构建一个batch,丢入 线程池。使用 信号量作为流量控制,也就是说同时只有10个线程同时工作。
线程池
uploader 使用 java concurrent 包的 executionservice
this.batchService = ExecutorUtil.newFixedThreadPool(options.numThreads,
BATCH_WORKER);
this.singleService = ExecutorUtil.newFixedThreadPool(options.numThreads,
SINGLE_WORKER);
使用java8 的CompetionFuture 来走异步编程,喜欢做回调并且喜欢异步风格的可以学习一下这个类。
CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
LOG.warn("Batch insert {} error, try single insert", type, e);
this.submitInSingle(struct, batch);
return null;
}).whenComplete((r, e) -> this.batchSemaphore.release());
如何做的流量控制
- 接着上面的例子,对于每个 executionpool,使用一个 10个信号的信号量。
- 向 pool里提交任务的时候,需要获得 一个信号量,如果没有就等待,这就保证了只有最多10个任务在pool里面。
- 流量控制来避免unbounded pool,因为unbounded可能导致oom,这个异步编程的best practice 不推荐使用无界队列。
- 当一个task 结束时候,在 whenComplete的回调里面,释放信号量。
public <GE extends GraphElement> void submitBatch(ElementStruct struct,
List<GE> batch) {
ElemType type = struct.type();
try {
*** this.batchSemaphore.acquire(); ***
} catch (InterruptedException e) {
throw new LoadException("Interrupted while waiting to submit %s " +
"batch in batch mode", e, type);
}
InsertTask<GE> task = new BatchInsertTask<>(this.context, struct,
batch);
CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
LOG.warn("Batch insert {} error, try single insert", type, e);
this.submitInSingle(struct, batch);
return null;
*** }).whenComplete((r, e) -> this.batchSemaphore.release()); ***
}
如何处理批处理的异常和监控
- uploader 有两种task,一个是batch task,一个是singleTask。
- batch task 一次会提交500个点给服务端,但是出现一个失败,就all failed
- 如果batch 失败,就会降级到单个节点提交, 一个一个节点的提交,不过这种方式会降低吞吐量。
对于监控的话,hugegraph 使用了 apache的 StopWatch,这个工具类可以值得我们借鉴,以后再也不要用 long current = System.currentMIllians()
Stopwatch 更加面向对象。
网友评论