美文网首页GraphQL
GraphQL(六):GraphQL源码解读 - 概览

GraphQL(六):GraphQL源码解读 - 概览

作者: Johnny_ | 来源:发表于2019-07-02 22:40 被阅读0次

    基于GraphQl-JAVA-TOOLS 5.5.2

    把GraphQL的完整流程分为三个部分:schema解析、GraphlQLSchema装载和执行query(这里用query代指query、mutation、subscription)。其中的schema解析和GraphlQLSchema装载属于启动阶段,执行query属于运行阶段。

    一、schema解析

    我们在构建 GraphQLSchema 时会通过 SchemaParserBuilder 去build一个 SchemaParser

    /**
        * Build the parser with the supplied schema and dictionary.
        */
    fun build() = SchemaParser(scan(), options, runtimeWiringBuilder.build())
    

    这里的 scan() 方法最终会去执行 parseDocuments() 。GraphQL的schema解析使用了antlr,这是一个语法分析工具,它先根据预先定义的规则进行词法分析,把传入的文本解析成一个个token,然后将这些token构建成一个树。

        public Document parseDocument(String input, String sourceName) {
    
            CharStream charStream;
            if(sourceName == null) {
                charStream = CharStreams.fromString(input);
            } else{
                charStream = CharStreams.fromString(input, sourceName);
            }
    
            // 构建词法分析器
            GraphqlLexer lexer = new GraphqlLexer(charStream);
    
            CommonTokenStream tokens = new CommonTokenStream(lexer);
    
            GraphqlParser parser = new GraphqlParser(tokens);
            parser.removeErrorListeners();
            parser.getInterpreter().setPredictionMode(PredictionMode.SLL);
            parser.setErrorHandler(new BailErrorStrategy());
    
            // 用定义的词法分析规则进行解析,得到tokens
            GraphqlParser.DocumentContext documentContext = parser.document();
    
            GraphqlAntlrToLanguage antlrToLanguage = new GraphqlAntlrToLanguage(tokens);
    
            // 将tokens构建成一棵分析树
            Document doc = antlrToLanguage.createDocument(documentContext);
    
            // 校验tokens
            Token stop = documentContext.getStop();
            List<Token> allTokens = tokens.getTokens();
            if (stop != null && allTokens != null && !allTokens.isEmpty()) {
                Token last = allTokens.get(allTokens.size() - 1);
                //
                // do we have more tokens in the stream than we consumed in the parse?
                // if yes then its invalid.  We make sure its the same channel
                boolean notEOF = last.getType() != Token.EOF;
                boolean lastGreaterThanDocument = last.getTokenIndex() > stop.getTokenIndex();
                boolean sameChannel = last.getChannel() == stop.getChannel();
                if (notEOF && lastGreaterThanDocument && sameChannel) {
                    throw new ParseCancellationException("There are more tokens in the query that have not been consumed");
                }
            }
            return doc;
        }
    
    

    其中词法分析的结果tokens是这样的:

    image

    最终得到的分析树是这样的:


    image

    其中 FieldDefinition 就是schema解析后的内存模型。

    二、GraphlQLSchema装载

    上一步完成了schema的解析,第二步是把分析树按照GraphQL的规则进行组装。组装的操作在上一篇GraphQL(五):GraphQL身份认证中已有介绍 GraphQLObjectType 的组装,在 SchemaParser 中还有其他类型(GraphQLInputObjectType、GraphQLEnumType、GraphQLInterfaceType、GraphQLUnionType等 )的组装,组装逻辑的入口在 parseSchemaObjects()

    fun parseSchemaObjects(): SchemaObjects {
    
        // Create GraphQL objects
        val interfaces = interfaceDefinitions.map { createInterfaceObject(it) }
        val objects = objectDefinitions.map { createObject(it, interfaces) }
        val unions = unionDefinitions.map { createUnionObject(it, objects) }
        val inputObjects = inputObjectDefinitions.map { createInputObject(it) }
        val enums = enumDefinitions.map { createEnumObject(it) }
    
        // Assign type resolver to interfaces now that we know all of the object types
        interfaces.forEach { (it.typeResolver as TypeResolverProxy).typeResolver = InterfaceTypeResolver(dictionary.inverse(), it, objects) }
        unions.forEach { (it.typeResolver as TypeResolverProxy).typeResolver = UnionTypeResolver(dictionary.inverse(), it, objects) }
    
        // Find query type and mutation/subscription type (if mutation/subscription type exists)
        val queryName = rootInfo.getQueryName()
        val mutationName = rootInfo.getMutationName()
        val subscriptionName = rootInfo.getSubscriptionName()
    
        val query = objects.find { it.name == queryName }
                ?: throw SchemaError("Expected a Query object with name '$queryName' but found none!")
        val mutation = objects.find { it.name == mutationName }
                ?: if (rootInfo.isMutationRequired()) throw SchemaError("Expected a Mutation object with name '$mutationName' but found none!") else null
        val subscription = objects.find { it.name == subscriptionName }
                ?: if (rootInfo.isSubscriptionRequired()) throw SchemaError("Expected a Subscription object with name '$subscriptionName' but found none!") else null
    
        return SchemaObjects(query, mutation, subscription, (objects + inputObjects + enums + interfaces + unions).toSet())
    }
    

    其中的 SchemaObjects 就是 GraphQLSchema 构造器的参数,不同类型的组装逻辑大同小异,这里就不啰嗦了。

    启动过程简单图例:


    image

    三、执行query

    query的执行是在运行时,我们通过统一的入口 GraphQL 执行客户端传过来的schema,GraphQL提供了两个类 ExecutionInputExecutionResult 用于包装输入和输出,GraphQL在执行查询时通过 CompletableFuture 来实现异步,其执行的核心代码如下:

    public CompletableFuture<ExecutionResult> executeAsync(ExecutionInput executionInput) {
        try {
            // 创建InstrumentationState对象,这是一个跟踪Instrumentation全生命周期的对象
            InstrumentationState instrumentationState = instrumentation.createState(new InstrumentationCreateStateParameters(this.graphQLSchema, executionInput));
    
            // 对ExecutionInput进行拦截
            InstrumentationExecutionParameters inputInstrumentationParameters = new InstrumentationExecutionParameters(executionInput, this.graphQLSchema, instrumentationState);
            executionInput = instrumentation.instrumentExecutionInput(executionInput, inputInstrumentationParameters);
    
            // 执行前拦截
            InstrumentationExecutionParameters instrumentationParameters = new InstrumentationExecutionParameters(executionInput, this.graphQLSchema, instrumentationState);
            InstrumentationContext<ExecutionResult> executionInstrumentation = instrumentation.beginExecution(instrumentationParameters);
    
            // 执行前拦截
            GraphQLSchema graphQLSchema = instrumentation.instrumentSchema(this.graphQLSchema, instrumentationParameters);
    
            // 对客户端传递的query进行验证并执行
            CompletableFuture<ExecutionResult> executionResult = parseValidateAndExecute(executionInput, graphQLSchema, instrumentationState);
            executionResult = executionResult.whenComplete(executionInstrumentation::onCompleted);
             
            // 对执行结果进行拦截
            executionResult = executionResult.thenCompose(result -> instrumentation.instrumentExecutionResult(result, instrumentationParameters));
            return executionResult;
        } catch (AbortExecutionException abortException) {
            return CompletableFuture.completedFuture(abortException.toExecutionResult());
        }
    }
    

    我们跳过省略掉验证部分直接看最终的执行,执行最终在 Execution 类中完成:

    private CompletableFuture<ExecutionResult> executeOperation(ExecutionContext executionContext, InstrumentationExecutionParameters instrumentationExecutionParameters, Object root, OperationDefinition operationDefinition) {
            // ...
            CompletableFuture<ExecutionResult> result;
            try {
                ExecutionStrategy executionStrategy;
                if (operation == OperationDefinition.Operation.MUTATION) {
                    executionStrategy = mutationStrategy;
                } else if (operation == SUBSCRIPTION) {
                    executionStrategy = subscriptionStrategy;
                } else {
                    executionStrategy = queryStrategy;
                }
                log.debug("Executing '{}' query operation: '{}' using '{}' execution strategy", executionContext.getExecutionId(), operation, executionStrategy.getClass().getName());
                result = executionStrategy.execute(executionContext, parameters);
            } catch (NonNullableFieldWasNullException e) {
                // ...
            }
            // ...
    
            return deferSupport(executionContext, result);
        }
    

    省略掉若干代码后我们关注 executionStrategy.execute(executionContext, parameters); 这句,这里有一个执行策略,这是我们在开发时可能会用到的。除了已废弃的几种执行策略,当前版本提供了两种执行策略: AsyncExecutionStrategyAsyncSerialExecutionStrategy。前者是并行执行属性值获取,后者是串行执行属性值获取。

    AsyncExecutionStrategy 关键代码:

     public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
            // ...
    
            Map<String, List<Field>> fields = parameters.getFields();
            List<String> fieldNames = new ArrayList<>(fields.keySet());
            List<CompletableFuture<FieldValueInfo>> futures = new ArrayList<>();
            List<String> resolvedFields = new ArrayList<>();
            for (String fieldName : fieldNames) {
                List<Field> currentField = fields.get(fieldName);
    
                ExecutionPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
                ExecutionStrategyParameters newParameters = parameters
                        .transform(builder -> builder.field(currentField).path(fieldPath).parent(parameters));
    
                if (isDeferred(executionContext, newParameters, currentField)) {
                    executionStrategyCtx.onDeferredField(currentField);
                    continue;
                }
                resolvedFields.add(fieldName);
                CompletableFuture<FieldValueInfo> future = resolveFieldWithInfo(executionContext, newParameters);
                futures.add(future);
            }
            CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
            executionStrategyCtx.onDispatched(overallResult);
    
            // 并行执行所有future
            Async.each(futures).whenComplete((completeValueInfos, throwable) -> {
                BiConsumer<List<ExecutionResult>, Throwable> handleResultsConsumer = handleResults(executionContext, resolvedFields, overallResult);
                if (throwable != null) {
                    handleResultsConsumer.accept(null, throwable.getCause());
                    return;
                }
                List<CompletableFuture<ExecutionResult>> executionResultFuture = completeValueInfos.stream().map(FieldValueInfo::getFieldValue).collect(Collectors.toList());
                executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
                Async.each(executionResultFuture).whenComplete(handleResultsConsumer);
            }).exceptionally((ex) -> {
                // if there are any issues with combining/handling the field results,
                // complete the future at all costs and bubble up any thrown exception so
                // the execution does not hang.
                overallResult.completeExceptionally(ex);
                return null;
            });
    
            overallResult.whenComplete(executionStrategyCtx::onCompleted);
            return overallResult;
        }
    

    AsyncSerialExecutionStrategy 关键代码:

    public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
        CompletableFuture<List<ExecutionResult>> resultsFuture = Async.eachSequentially(fieldNames, (fieldName, index, prevResults) -> {
            List<Field> currentField = fields.get(fieldName);
            ExecutionPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
            ExecutionStrategyParameters newParameters = parameters
                    .transform(builder -> builder.field(currentField).path(fieldPath));
            return resolveField(executionContext, newParameters);
        });
    
        CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
        executionStrategyCtx.onDispatched(overallResult);
    
        resultsFuture.whenComplete(handleResults(executionContext, fieldNames, overallResult));
        overallResult.whenComplete(executionStrategyCtx::onCompleted);
        return overallResult;
    }
    
    // eachSequentially最终会调用下面的方法排队执行属性值获取
    private static <T, U> void eachSequentiallyImpl(Iterator<T> iterator, CFFactory<T, U> cfFactory, int index, List<U> tmpResult, CompletableFuture<List<U>> overallResult) {
        if (!iterator.hasNext()) {
            overallResult.complete(tmpResult);
            return;
        }
        CompletableFuture<U> cf;
        try {
            cf = cfFactory.apply(iterator.next(), index, tmpResult);
            Assert.assertNotNull(cf, "cfFactory must return a non null value");
        } catch (Exception e) {
            cf = new CompletableFuture<>();
            cf.completeExceptionally(new CompletionException(e));
        }
        cf.whenComplete((cfResult, exception) -> {
            if (exception != null) {
                overallResult.completeExceptionally(exception);
                return;
            }
            // 上一个属性值获取完后再执行下一个属性值的获取
            tmpResult.add(cfResult);
            eachSequentiallyImpl(iterator, cfFactory, index + 1, tmpResult, overallResult);
        });
    }
    

    相关文章

      网友评论

        本文标题:GraphQL(六):GraphQL源码解读 - 概览

        本文链接:https://www.haomeiwen.com/subject/puyxhctx.html