美文网首页
八 hugegraph 源代码 uploader

八 hugegraph 源代码 uploader

作者: NazgulSun | 来源:发表于2019-07-21 20:41 被阅读0次

uploader 简介

  • 提供一个批处理的工具,hugegraph为大数据而生,根据其官网的测试吞吐量来看,多节点多线程下能够达到5w个节点/s 的插入量。
  • uploader 按照batch的方式插入数据,单个batch 包含的节点数量可以指定,默认为500/batch, 也可以指定并发的线程数。
  • 假设1一亿个节点, 单个batch 500, 10个线程并发, uploader的做法就是 遍历 节点文件,每500个 构建一个batch,丢入 线程池。使用 信号量作为流量控制,也就是说同时只有10个线程同时工作。

线程池

uploader 使用 java concurrent 包的 executionservice

        this.batchService = ExecutorUtil.newFixedThreadPool(options.numThreads,
                                                            BATCH_WORKER);
        this.singleService = ExecutorUtil.newFixedThreadPool(options.numThreads,
                                                             SINGLE_WORKER);

使用java8 的CompetionFuture 来走异步编程,喜欢做回调并且喜欢异步风格的可以学习一下这个类。

        CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
            LOG.warn("Batch insert {} error, try single insert", type, e);
            this.submitInSingle(struct, batch);
            return null;
        }).whenComplete((r, e) -> this.batchSemaphore.release());

如何做的流量控制

  • 接着上面的例子,对于每个 executionpool,使用一个 10个信号的信号量。
  • 向 pool里提交任务的时候,需要获得 一个信号量,如果没有就等待,这就保证了只有最多10个任务在pool里面。
  • 流量控制来避免unbounded pool,因为unbounded可能导致oom,这个异步编程的best practice 不推荐使用无界队列。
  • 当一个task 结束时候,在 whenComplete的回调里面,释放信号量。
   public <GE extends GraphElement> void submitBatch(ElementStruct struct,
                                                      List<GE> batch) {
        ElemType type = struct.type();
        try {
            *** this.batchSemaphore.acquire(); ***
        } catch (InterruptedException e) {
            throw new LoadException("Interrupted while waiting to submit %s " +
                                    "batch in batch mode", e, type);
        }

        InsertTask<GE> task = new BatchInsertTask<>(this.context, struct,
                                                    batch);
        CompletableFuture.runAsync(task, this.batchService).exceptionally(e -> {
            LOG.warn("Batch insert {} error, try single insert", type, e);
            this.submitInSingle(struct, batch);
            return null;
***        }).whenComplete((r, e) -> this.batchSemaphore.release());  ***
    }

如何处理批处理的异常和监控

  • uploader 有两种task,一个是batch task,一个是singleTask。
  • batch task 一次会提交500个点给服务端,但是出现一个失败,就all failed
  • 如果batch 失败,就会降级到单个节点提交, 一个一个节点的提交,不过这种方式会降低吞吐量。

对于监控的话,hugegraph 使用了 apache的 StopWatch,这个工具类可以值得我们借鉴,以后再也不要用 long current = System.currentMIllians()
Stopwatch 更加面向对象。

相关文章

网友评论

      本文标题:八 hugegraph 源代码 uploader

      本文链接:https://www.haomeiwen.com/subject/kevvlctx.html