美文网首页GraphQL AI-大数据
第8课 GraphQL集成DataLoader

第8课 GraphQL集成DataLoader

作者: 笔名辉哥 | 来源:发表于2021-09-08 16:03 被阅读0次

    N+1问题

    首先来看看什么是N+1问题,假设我们有如下模型:

    image

    然后我们有这样的查询:

      school(schoolId:"school1"){
        teachers
        {
          teacherName
        }
      }
    
    

    会得到类似下面的结果:

      "data": {
        "school": {
          "teachers": [
            {
              "teacherName": "老师11"
            },
            {
              "teacherName": "老师12"
            },
            {
              "teacherName": "老师13"
            }
          ]
        }
      }
    
    

    根据我们之前的经验,GraphQL会这样执行查询逻辑:

    1. 根据schoolId查到学校里的teacherId列表
    2. 遍历TeacherId列表,查到每个Teacher对象
    3. 获取Teacher对象的teacherName属性

    很容易发现,遍历teacherId列表取查询每个Teacher对象是极不经济的,而N+1指的就是N次Teacher查询+1次teacherId列表的查询。

    Java-DataLoader

    DataLoader通过将多次查询合并成一次来减少查询次数。比如上面的例子,需要执行三次对Teacher对象的查询,DataLoader会自动将三次查询合并成一次批量查询。除此之外,就算是不同层级的查询,DataLaoder也会自动进行合并,比如将上面的查询改成:

    ## 查询全校老师名字和每个班级里的老师名字
    school(schoolId:"school1"){
      classList{
        teachers{
          teacherName
        }
      }
      teachers
      {
        teacherName
      }
    }
    
    

    虽然是不同层级,DataLoader也会将针对Teacher对象的查询合并成一次批量查询,同时会过滤掉重复的TeacherId,保证最佳的查询性能。

    GraphQL集成Java-DataLoader

    这里以优化Teacher对象的查询演示如何集成Java-DataLoader(GraphQL-JAVA默认引入了Java-DataLoader,不需要额外引入)。

    1. 实现TeacherDataLoader

    class TeacherDataLoader : BatchLoader<String, Teacher> {
        override fun load(keys: List<String>): CompletableFuture<List<Teacher>> {
            return CompletableFuture.supplyAsync {
                // 这里是根据ID批量查询Teacher列表
                DataStore.getTeachersByTeacherIds(keys)
            }
        }
    }
    
    

    2. 注入DataLoader

    val executionInputBuilder = ExecutionInput.Builder()
    
    // 省略其他内容的注入
    // ...
    
    val dataLoaderRegister = DataLoaderRegistry()
    dataLoaderRegister.register("teacherBatchLoader", DataLoader.newDataLoader(TeacherDataLoader))
    executionInputBuilder.dataLoaderRegistry(buildDataLoaderRegistry())
    
    val executionResult = graphQL.execute(executionInput)
    
    

    3. 改写Resolver

    将所有对Teacher的Fetch修改为通过DataLoader中间层取获取数据(这里以需改SchoolResolver中的teachers为例):

    class SchoolResolver : GraphQLResolver<School> {
        fun teachers(school: School, env: DataFetchingEnvironment): CompletableFuture<List<Teacher>> {
            val teacherIds = DataStore.schoolStore.first { it.schoolId == school.schoolId }.teachers
            val dataLoader = env.getDataLoader<String, Teacher>(DataLoaderConstants.TEACHER_DATA_LOADER)
            return dataLoader.loadMany(teacherIds)
        }
    }
    
    

    通过以上三步就完成了DataLoader的集成,接下来分析DataLoader的工作原理。

    DataLoader原理

    DataLoader主要利用了Java的CompletableFuture异步任务收集再批量处理,最后将结果写回对应任务。

    image

    以下是部分重点源码解读:

    // key的合并和缓存处理
    CompletableFuture<V> load(K key, Object loadContext) {
            synchronized (dataLoader) {
                Object cacheKey = getCacheKey(nonNull(key));
                stats.incrementLoadCount();
    
                boolean batchingEnabled = loaderOptions.batchingEnabled();
                boolean cachingEnabled = loaderOptions.cachingEnabled();
    
                // cache是默认开启的,同样的key直接拿缓存
                if (cachingEnabled) {
                    if (futureCache.containsKey(cacheKey)) {
                        stats.incrementCacheHitCount();
                        return futureCache.get(cacheKey);
                    }
                }
    
                CompletableFuture<V> future = new CompletableFuture<>();
                if (batchingEnabled) {
                    //把key和future对应收集起来,合并key批量查询后写回future
                    loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
                } else {
                    stats.incrementBatchLoadCountBy(1);
                    // immediate execution of batch function
                    future = invokeLoaderImmediately(key, loadContext);
                }
                if (cachingEnabled) {
                    futureCache.set(cacheKey, future);
                }
                return future;
            }
        }
    
    
    // 调用我们写的DataLoader
    private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object> callContexts, List<CompletableFuture<V>> queuedFutures) {
            stats.incrementBatchLoadCountBy(keys.size());
            // 调用我们写的TeacherDataLoader
            CompletionStage<List<V>> batchLoad = invokeLoader(keys, callContexts);
            return batchLoad
                    .toCompletableFuture()
                    .thenApply(values -> {
                        // keys和结果一定要对应,一个key对应一个future,一个future对应一个结果
                        assertResultSize(keys, values);
    
                        for (int idx = 0; idx < queuedFutures.size(); idx++) {
                            Object value = values.get(idx);
                            CompletableFuture<V> future = queuedFutures.get(idx);
                            if (value instanceof Throwable) {
                                stats.incrementLoadErrorCount();
                                future.completeExceptionally((Throwable) value);
                                // we don't clear the cached view of this entry to avoid
                                // frequently loading the same error
                            } else if (value instanceof Try) {
                                // we allow the batch loader to return a Try so we can better represent a computation
                                // that might have worked or not.
                                Try<V> tryValue = (Try<V>) value;
                                if (tryValue.isSuccess()) {
                                    future.complete(tryValue.get());
                                } else {
                                    stats.incrementLoadErrorCount();
                                    future.completeExceptionally(tryValue.getThrowable());
                                }
                            } else {
                                // 把结果写回缓存中的future
                                V val = (V) value;
                                future.complete(val);
                            }
                        }
                        return values;
                    }).exceptionally(ex -> {
                        stats.incrementBatchLoadExceptionCount();
                        for (int idx = 0; idx < queuedFutures.size(); idx++) {
                            K key = keys.get(idx);
                            CompletableFuture<V> future = queuedFutures.get(idx);
                            future.completeExceptionally(ex);
                            // clear any cached view of this key because they all failed
                            dataLoader.clear(key);
                        }
                        return emptyList();
                    });
        }
    
    

    除此之外Java-DataLoader还做了一个Statistics用于收集DataLoader执行过程中的状态,比如缓存命中多少次,已经load了多少个对象,有多少次error等。默认情况下是不会执行数据收集的,需要通过DataLoaderDispatcherInstrumentation进行注入:

    val options = DataLoaderDispatcherInstrumentationOptions
            .newOptions().includeStatistics(true)
    val dispatcherInstrumentation = DataLoaderDispatcherInstrumentation(options)
    
    

    但是有一个问题,在构建GraphQL时只支持一个instrumentation,那么是不是我们仅只能写一个instrumentation呢?好在GraphQL用组合模式提供了一个ChainedInstrumentation,我们得以组合多个instrumentation。

    作者:Johnny_
    链接:https://www.jianshu.com/p/196472a0f813
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

        本文标题:第8课 GraphQL集成DataLoader

        本文链接:https://www.haomeiwen.com/subject/bagywltx.html