N+1问题
首先来看看什么是N+1问题,假设我们有如下模型:
image.png
然后我们有这样的查询:
school(schoolId:"school1"){
teachers
{
teacherName
}
}
会得到类似下面的结果:
"data": {
"school": {
"teachers": [
{
"teacherName": "老师11"
},
{
"teacherName": "老师12"
},
{
"teacherName": "老师13"
}
]
}
}
根据我们之前的经验,GraphQL会这样执行查询逻辑:
- 根据schoolId查到学校里的teacherId列表
- 遍历TeacherId列表,查到每个Teacher对象
- 获取Teacher对象的teacherName属性
很容易发现,遍历teacherId列表取查询每个Teacher对象是极不经济的,而N+1指的就是N次Teacher查询+1次teacherId列表的查询。
Java-DataLoader
DataLoader通过将多次查询合并成一次来减少查询次数。比如上面的例子,需要执行三次对Teacher对象的查询,DataLoader会自动将三次查询合并成一次批量查询。除此之外,就算是不同层级的查询,DataLaoder也会自动进行合并,比如将上面的查询改成:
## 查询全校老师名字和每个班级里的老师名字
school(schoolId:"school1"){
classList{
teachers{
teacherName
}
}
teachers
{
teacherName
}
}
虽然是不同层级,DataLoader也会将针对Teacher对象的查询合并成一次批量查询,同时会过滤掉重复的TeacherId,保证最佳的查询性能。
GraphQL集成Java-DataLoader
这里以优化Teacher对象的查询演示如何集成Java-DataLoader(GraphQL-JAVA默认引入了Java-DataLoader,不需要额外引入)。
1. 实现TeacherDataLoader
class TeacherDataLoader : BatchLoader<String, Teacher> {
override fun load(keys: List<String>): CompletableFuture<List<Teacher>> {
return CompletableFuture.supplyAsync {
// 这里是根据ID批量查询Teacher列表
DataStore.getTeachersByTeacherIds(keys)
}
}
}
2. 注入DataLoader
val executionInputBuilder = ExecutionInput.Builder()
// 省略其他内容的注入
// ...
val dataLoaderRegister = DataLoaderRegistry()
dataLoaderRegister.register("teacherBatchLoader", DataLoader.newDataLoader(TeacherDataLoader))
executionInputBuilder.dataLoaderRegistry(buildDataLoaderRegistry())
val executionResult = graphQL.execute(executionInput)
3. 改写Resolver
将所有对Teacher的Fetch修改为通过DataLoader中间层取获取数据(这里以需改SchoolResolver中的teachers为例):
class SchoolResolver : GraphQLResolver<School> {
fun teachers(school: School, env: DataFetchingEnvironment): CompletableFuture<List<Teacher>> {
val teacherIds = DataStore.schoolStore.first { it.schoolId == school.schoolId }.teachers
val dataLoader = env.getDataLoader<String, Teacher>(DataLoaderConstants.TEACHER_DATA_LOADER)
return dataLoader.loadMany(teacherIds)
}
}
通过以上三步就完成了DataLoader的集成,接下来分析DataLoader的工作原理。
DataLoader原理
DataLoader主要利用了Java的CompletableFuture异步任务收集再批量处理,最后将结果写回对应任务。
image.png
以下是部分重点源码解读:
// key的合并和缓存处理
CompletableFuture<V> load(K key, Object loadContext) {
synchronized (dataLoader) {
Object cacheKey = getCacheKey(nonNull(key));
stats.incrementLoadCount();
boolean batchingEnabled = loaderOptions.batchingEnabled();
boolean cachingEnabled = loaderOptions.cachingEnabled();
// cache是默认开启的,同样的key直接拿缓存
if (cachingEnabled) {
if (futureCache.containsKey(cacheKey)) {
stats.incrementCacheHitCount();
return futureCache.get(cacheKey);
}
}
CompletableFuture<V> future = new CompletableFuture<>();
if (batchingEnabled) {
//把key和future对应收集起来,合并key批量查询后写回future
loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
future = invokeLoaderImmediately(key, loadContext);
}
if (cachingEnabled) {
futureCache.set(cacheKey, future);
}
return future;
}
}
// 调用我们写的DataLoader
private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object> callContexts, List<CompletableFuture<V>> queuedFutures) {
stats.incrementBatchLoadCountBy(keys.size());
// 调用我们写的TeacherDataLoader
CompletionStage<List<V>> batchLoad = invokeLoader(keys, callContexts);
return batchLoad
.toCompletableFuture()
.thenApply(values -> {
// keys和结果一定要对应,一个key对应一个future,一个future对应一个结果
assertResultSize(keys, values);
for (int idx = 0; idx < queuedFutures.size(); idx++) {
Object value = values.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
if (value instanceof Throwable) {
stats.incrementLoadErrorCount();
future.completeExceptionally((Throwable) value);
// we don't clear the cached view of this entry to avoid
// frequently loading the same error
} else if (value instanceof Try) {
// we allow the batch loader to return a Try so we can better represent a computation
// that might have worked or not.
Try<V> tryValue = (Try<V>) value;
if (tryValue.isSuccess()) {
future.complete(tryValue.get());
} else {
stats.incrementLoadErrorCount();
future.completeExceptionally(tryValue.getThrowable());
}
} else {
// 把结果写回缓存中的future
V val = (V) value;
future.complete(val);
}
}
return values;
}).exceptionally(ex -> {
stats.incrementBatchLoadExceptionCount();
for (int idx = 0; idx < queuedFutures.size(); idx++) {
K key = keys.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
future.completeExceptionally(ex);
// clear any cached view of this key because they all failed
dataLoader.clear(key);
}
return emptyList();
});
}
除此之外Java-DataLoader还做了一个Statistics用于收集DataLoader执行过程中的状态,比如缓存命中多少次,已经load了多少个对象,有多少次error等。默认情况下是不会执行数据收集的,需要通过DataLoaderDispatcherInstrumentation进行注入:
val options = DataLoaderDispatcherInstrumentationOptions
.newOptions().includeStatistics(true)
val dispatcherInstrumentation = DataLoaderDispatcherInstrumentation(options)
但是有一个问题,在构建GraphQL时只支持一个instrumentation,那么是不是我们仅只能写一个instrumentation呢?好在GraphQL用组合模式提供了一个ChainedInstrumentation,我们得以组合多个instrumentation。
网友评论