美文网首页
KSQL源码分析-01:SQL 查询是如何运行的?

KSQL源码分析-01:SQL 查询是如何运行的?

作者: larluo_罗浩 | 来源:发表于2018-08-12 16:15 被阅读464次

    你是否想深入理解,当你在命令行提交一个SQL查询的时候,KSQL究竟发生了什么 ?

    主要分成两部分: 客户端调用与服务端调用

    1. 客户端调用

    当我们启动ksql的时候 , 运行了ksql命令.

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat bin/ksql
    #!/bin/bash
    
    exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.Ksql "$@"
    

    这个命令很简单,就是运行Ksql.main(args)这个java类。那么我们接着看KSQL这个类 :

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-cli/src/main/java/io/confluent/ksql/Ksql.java
      public static void main(String[] args) throws IOException {
        final Options options = args.length == 0 ? Options.parse("http://localhost:8088")
                                                 : Options.parse(args);
        if (options == null) {
          System.exit(-1);
        }
    
        try {
    
          final Properties properties = loadProperties(options.getConfigFile());
          final KsqlRestClient restClient = new KsqlRestClient(options.getServer(), properties);
    
          options.getUserNameAndPassword().ifPresent(
              creds -> restClient.setupAuthenticationCredentials(creds.left, creds.right)
          );
    
          final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent();
          versionChecker.start(KsqlModuleType.CLI, properties);
    
          try (Cli cli = new Cli(options.getStreamedQueryRowLimit(),
                                       options.getStreamedQueryTimeoutMs(),
                                       restClient,
                                       new JLineTerminal(options.getOutputFormat(), restClient))
          ) {
            cli.runInteractively();
          }
        } catch (final Exception e) {
          final String msg = ErrorMessageUtil.buildErrorMessage(e);
          LOGGER.error(msg);
          System.err.println(msg);
          System.exit(-1);
        }
      }
    

    这个类可选地加载一些配置文件以及一些命令行参数后调用Cli.runInteractively。

    接着看Cli.java这个类

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
      public void runInteractively() {
        displayWelcomeMessage();
        validateClient(terminal.writer(), restClient);
        boolean eof = false;
        while (!eof) {
          try {
            handleLine(readLine());
          } catch (EndOfFileException exception) {
            // EOF is fine, just terminate the REPL
            terminal.writer().println("Exiting KSQL.");
            eof = true;
          } catch (Exception exception) {
            LOGGER.error("", exception);
            terminal.writer().println(ErrorMessageUtil.buildErrorMessage(exception));
          }
          terminal.flush();
        }
      }
    
      public void handleLine(String line) throws Exception {
        String trimmedLine = Optional.ofNullable(line).orElse("").trim();
    
        if (trimmedLine.isEmpty()) {
          return;
        }
    
        String[] commandArgs = trimmedLine.split("\\s+", 2);
        CliSpecificCommand cliSpecificCommand =
            terminal.getCliSpecificCommands().get(commandArgs[0].toLowerCase());
        if (cliSpecificCommand != null) {
          cliSpecificCommand.execute(commandArgs.length > 1 ? commandArgs[1] : "");
        } else {
          handleStatements(line);
        }
      }
    
      private void handleStatements(String line)
          throws InterruptedException, IOException, ExecutionException {
        StringBuilder consecutiveStatements = new StringBuilder();
        for (SqlBaseParser.SingleStatementContext statementContext :
            new KsqlParser().getStatements(line)) {
          String statementText = KsqlEngine.getStatementString(statementContext);
          if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
              || statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
            consecutiveStatements = printOrDisplayQueryResults(
                consecutiveStatements,
                statementContext,
                statementText
            );
    
          } else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
            listProperties(statementText);
    
          } else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
            setProperty(statementContext);
    
          } else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
            consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
          } else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
            runScript(statementContext, statementText);
          } else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
            registerTopic(consecutiveStatements, statementContext, statementText);
          } else {
            consecutiveStatements.append(statementText);
          }
        }
        if (consecutiveStatements.length() != 0) {
          printKsqlResponse(
              restClient.makeKsqlRequest(consecutiveStatements.toString())
          );
        }
      }
      private StringBuilder printOrDisplayQueryResults(
          StringBuilder consecutiveStatements,
          SqlBaseParser.SingleStatementContext statementContext,
          String statementText
      ) throws InterruptedException, IOException, ExecutionException {
        if (consecutiveStatements.length() != 0) {
          printKsqlResponse(
              restClient.makeKsqlRequest(consecutiveStatements.toString())
          );
          consecutiveStatements = new StringBuilder();
        }
        if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
          handleStreamedQuery(statementText);
        } else {
          handlePrintedTopic(statementText);
        }
        return consecutiveStatements;
      }
      private void printKsqlResponse(RestResponse<KsqlEntityList> response) throws IOException {
        if (response.isSuccessful()) {
          KsqlEntityList ksqlEntities = response.getResponse();
          boolean noErrorFromServer = true;
          for (KsqlEntity entity : ksqlEntities) {
            if (entity instanceof CommandStatusEntity
                && (
                ((CommandStatusEntity) entity).getCommandStatus().getStatus()
                    == CommandStatus.Status.ERROR)
            ) {
              String fullMessage = ((CommandStatusEntity) entity).getCommandStatus().getMessage();
              terminal.printError(fullMessage.split("\n")[0], fullMessage);
              noErrorFromServer = false;
            }
          }
          if (noErrorFromServer) {
            terminal.printKsqlEntityList(response.getResponse());
          }
        } else {
          terminal.printErrorMessage(response.getErrorMessage());
        }
      }
    
    
    

    这里 CLI.runInteractively方法 做什么 呢?
    从CLI.readLine方法 中读取输入,调用CLI.handleLine进行处理。
    CLI.handleLine判断是否是Console.getCliSpecificCommands命令,
    如果不是,则调用CLI.handleStatements.
    那么哪些是Console.getCliSpecificCommands命令呢?

      public LinkedHashMap<String, CliSpecificCommand> getCliSpecificCommands() {
        return cliSpecificCommands;
      }
    
      public void registerCliSpecificCommand(final CliSpecificCommand cliSpecificCommand) {
        cliSpecificCommands.put(cliSpecificCommand.getName(), cliSpecificCommand);
      }
    
      private void registerDefaultCommands() {
        registerCliSpecificCommand(new Help());
    
        registerCliSpecificCommand(new Clear());
    
        registerCliSpecificCommand(new Output());
    
        registerCliSpecificCommand(new History());
    
        registerCliSpecificCommand(new Version());
    
        registerCliSpecificCommand(new Exit());
      }
    

    现在就进入了handleStatements的处理逻辑:
    handleStatement通过KsqlParser.getStatements(line)进行语句解析后得到语句类型。

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-parser/src/main/java/io/confluent/ksql/parser/KsqlParser.java
    import org.antlr.v4.runtime.ANTLRInputStream
      public List<SqlBaseParser.SingleStatementContext> getStatements(String sql) {
        try {
          ParserRuleContext tree = getParseTree(sql);
          SqlBaseParser.StatementsContext statementsContext = (SqlBaseParser.StatementsContext) tree;
          return statementsContext.singleStatement();
        } catch (Exception e) {
          throw new ParseFailedException(e.getMessage(), e);
        }
      }
      private ParserRuleContext getParseTree(String sql) {
    
        SqlBaseLexer
            sqlBaseLexer =
            new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
        CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
        SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);
    
        sqlBaseLexer.removeErrorListeners();
        sqlBaseLexer.addErrorListener(ERROR_LISTENER);
    
        sqlBaseParser.removeErrorListeners();
        sqlBaseParser.addErrorListener(ERROR_LISTENER);
    
        Function<SqlBaseParser, ParserRuleContext> parseFunction = SqlBaseParser::statements;
        ParserRuleContext tree;
        try {
          // first, try parsing with potentially faster SLL mode
          sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
          tree = parseFunction.apply(sqlBaseParser);
        } catch (ParseCancellationException ex) {
          // if we fail, parse with LL mode
          tokenStream.reset(); // rewind input stream
          sqlBaseParser.reset();
    
          sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
          tree = parseFunction.apply(sqlBaseParser);
        }
    
        return tree;
      }
    

    KsqlParser.getStatements 接着调用antlr的SqlBaseParser::statements进行解析:

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 
    grammar SqlBase;
    
    tokens {
        DELIMITER
    }
    
    statements
        : (singleStatement)* EOF
        ;
    
    singleStatement
        : statement ';'
        ;
    singleExpression
        : expression EOF
        ;
    
    statement
        : query                                                                 #querystatement
        | (LIST | SHOW) PROPERTIES                                              #listProperties
        | (LIST | SHOW) TOPICS                                                  #listTopics
        | (LIST | SHOW) REGISTERED TOPICS                                       #listRegisteredTopics
        | (LIST | SHOW) STREAMS EXTENDED?                                   #listStreams
        | (LIST | SHOW) TABLES EXTENDED?                                    #listTables
        | (LIST | SHOW) FUNCTIONS                                            #listFunctions
        | DESCRIBE EXTENDED? (qualifiedName | TOPIC qualifiedName)              #showColumns
        | DESCRIBE FUNCTION qualifiedName                                       #describeFunction
        | PRINT (qualifiedName | STRING) (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)?   #printTopic
        | (LIST | SHOW) QUERIES EXTENDED?                                   #listQueries
        | TERMINATE QUERY? qualifiedName                                        #terminateQuery
        | SET STRING EQ STRING                                                  #setProperty
        | UNSET STRING                                                          #unsetProperty
        | LOAD expression                                                       #loadProperties
        | REGISTER TOPIC (IF NOT EXISTS)? qualifiedName
                (WITH tableProperties)?                                         #registerTopic
        | CREATE STREAM (IF NOT EXISTS)? qualifiedName
                    ('(' tableElement (',' tableElement)* ')')?
                    (WITH tableProperties)?                                     #createStream
        | CREATE STREAM (IF NOT EXISTS)? qualifiedName
                (WITH tableProperties)? AS query
                                           (PARTITION BY identifier)?           #createStreamAs
        | CREATE TABLE (IF NOT EXISTS)? qualifiedName
                        ('(' tableElement (',' tableElement)* ')')?
                        (WITH tableProperties)?                                 #createTable
        | CREATE TABLE (IF NOT EXISTS)? qualifiedName
                (WITH tableProperties)? AS query                                #createTableAs
        | INSERT INTO qualifiedName query (PARTITION BY identifier)?            #insertInto
        | DROP TOPIC (IF EXISTS)? qualifiedName                                 #dropTopic
        | DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)?                  #dropStream
        | DROP TABLE (IF EXISTS)? qualifiedName  (DELETE TOPIC)?                  #dropTable
        | EXPLAIN ANALYZE?
                ('(' explainOption (',' explainOption)* ')')? (statement | qualifiedName)         #explain
        | EXPORT CATALOG TO STRING                                              #exportCatalog
        | RUN SCRIPT STRING                                                     #runScript
        ;
    query
        :  queryNoWith
        ;
    queryNoWith:
          queryTerm
          (LIMIT limit=(INTEGER_VALUE | ALL))?
        ;
    queryTerm
        : queryPrimary                                                             #queryTermDefault
        ;
    
    queryPrimary
        : querySpecification                   #queryPrimaryDefault
        | TABLE qualifiedName                  #table
        | VALUES expression (',' expression)*  #inlineTable
        | '(' queryNoWith  ')'                 #subquery
        ;
    querySpecification
        : SELECT STREAM? selectItem (',' selectItem)*
          (INTO into=relationPrimary)?
          (FROM from=relation (',' relation)*)?
          (WINDOW  windowExpression)?
          (WHERE where=booleanExpression)?
          (GROUP BY groupBy)?
          (HAVING having=booleanExpression)?
        ;
    
    

    通过匹配 SqlBase.g4 语法,得到的statements为 querystatement.
    再次回到 CLI.handleStatements.

      private void handleStatements(String line)
          throws InterruptedException, IOException, ExecutionException {
        StringBuilder consecutiveStatements = new StringBuilder();
        for (SqlBaseParser.SingleStatementContext statementContext :
            new KsqlParser().getStatements(line)) {
          String statementText = KsqlEngine.getStatementString(statementContext);
          if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
              || statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
            consecutiveStatements = printOrDisplayQueryResults(
                consecutiveStatements,
                statementContext,
                statementText
            );
    
          } else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
            listProperties(statementText);
    
          } else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
            setProperty(statementContext);
    
          } else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
            consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
          } else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
            runScript(statementContext, statementText);
          } else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
            registerTopic(consecutiveStatements, statementContext, statementText);
          } else {
            consecutiveStatements.append(statementText);
          }
        }
        if (consecutiveStatements.length() != 0) {
          printKsqlResponse(
              restClient.makeKsqlRequest(consecutiveStatements.toString())
          );
        }
      }
    
    

    由于这里的类型 是SqlBaseParser.QuerystatementContext,调用CLI.printOrDisplayQueryResults.

      private StringBuilder printOrDisplayQueryResults(
          StringBuilder consecutiveStatements,
          SqlBaseParser.SingleStatementContext statementContext,
          String statementText
      ) throws InterruptedException, IOException, ExecutionException {
        if (consecutiveStatements.length() != 0) {
          printKsqlResponse(
              restClient.makeKsqlRequest(consecutiveStatements.toString())
          );
          consecutiveStatements = new StringBuilder();
        }
        if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
          handleStreamedQuery(statementText);
        } else {
          handlePrintedTopic(statementText);
        }
        return consecutiveStatements;
      }
      private void handleStreamedQuery(String query)
          throws InterruptedException, ExecutionException, IOException {
        RestResponse<KsqlRestClient.QueryStream> queryResponse =
            restClient.makeQueryRequest(query);
    
        LOGGER.debug("Handling streamed query");
    
        if (queryResponse.isSuccessful()) {
          try (KsqlRestClient.QueryStream queryStream = queryResponse.getResponse()) {
            Future<?> queryStreamFuture = queryStreamExecutorService.submit(new Runnable() {
              @Override
              public void run() {
                for (long rowsRead = 0; keepReading(rowsRead) && queryStream.hasNext(); rowsRead++) {
                  try {
                    StreamedRow row = queryStream.next();
                    terminal.printStreamedRow(row);
                    if (row.getFinalMessage() != null || row.getErrorMessage() != null) {
                      break;
                    }
                  } catch (IOException exception) {
                    throw new RuntimeException(exception);
                  }
                }
              }
            });
    
            terminal.handle(Terminal.Signal.INT, signal -> {
              terminal.handle(Terminal.Signal.INT, Terminal.SignalHandler.SIG_IGN);
              queryStreamFuture.cancel(true);
            });
    
            try {
              if (streamedQueryTimeoutMs == null) {
                queryStreamFuture.get();
                Thread.sleep(1000); // TODO: Make things work without this
              } else {
                try {
                  queryStreamFuture.get(streamedQueryTimeoutMs, TimeUnit.MILLISECONDS);
                } catch (TimeoutException exception) {
                  queryStreamFuture.cancel(true);
                }
              }
            } catch (CancellationException exception) {
              // It's fine
            }
          } finally {
            terminal.writer().println("Query terminated");
            terminal.flush();
          }
        } else {
          terminal.printErrorMessage(queryResponse.getErrorMessage());
        }
      }
    
    

    CLI.handleStreamedQuery使用KsqlRestClient.makeQueryRequest进行服务请求。

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java
      public RestResponse<QueryStream> makeQueryRequest(String ksql) {
        KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties);
        Response response = makePostRequest("query", jsonRequest);
        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
          return RestResponse.successful(new QueryStream(response));
        } else {
          return RestResponse.erroneous(response.readEntity(KsqlErrorMessage.class));
        }
      }
      private Response makePostRequest(String path, Object jsonEntity) {
        try {
          return client.target(serverAddress)
              .path(path)
              .request(MediaType.APPLICATION_JSON_TYPE)
              .post(Entity.json(jsonEntity));
        } catch (Exception exception) {
          throw new KsqlRestClientException("Error issuing POST to KSQL server", exception);
        }
      }
    
    

    最终使用java自带的jax-rs: javax.ws.rs.client.Clien调用服务端的rest服务。样例发下:

    curl -X "POST" "http://localhost:8088/query" \
         -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
         -d $'{
      "ksql": "SELECT * FROM TEST_STREAM;",
      "streamsProperties": {}
    }'
    



    2. 服务端代码

    当启动ksql服务器时,调用了sql-server-start脚本

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./bin/ksql-server-start
    exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.rest.server.KsqlServerMain $EXTRA_ARGS "$@"
    

    这里很直接 ,就是调用KsqlServerMain.main(args)这个java类:

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java
      public static void main(final String[] args) {
        try {
          final ServerOptions serverOptions = ServerOptions.parse(args);
          if (serverOptions == null) {
            return;
          }
    
          final Properties properties = serverOptions.loadProperties(System::getProperties);
          final String installDir = properties.getProperty("ksql.server.install.dir");
          final Optional<String> queriesFile = serverOptions.getQueriesFile(properties);
          final Executable executable = createExecutable(properties, queriesFile, installDir);
          new KsqlServerMain(executable).tryStartApp();
        } catch (final Exception e) {
          log.error("Failed to start KSQL", e);
          System.exit(-1);
        }
      }
      @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
      private static Executable createExecutable(
          final Properties properties,
          final Optional<String> queriesFile,
          final String installDir
      ) throws Exception {
        if (queriesFile.isPresent()) {
          return StandaloneExecutor.create(properties, queriesFile.get(), installDir);
        }
    
        if (!properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
          properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KSQL_REST_SERVER_DEFAULT_APP_ID);
        }
        final KsqlRestConfig restConfig = new KsqlRestConfig(properties);
        return KsqlRestApplication.buildApplication(
            restConfig,
            new KsqlVersionCheckerAgent()
        );
      }
      void tryStartApp() throws Exception {
        try {
          log.info("Starting server");
          executable.start();
          log.info("Server up and running");
          executable.join();
        } finally {
          log.info("Server shutting down");
          executable.stop();
        }
      }
    

    KsqlServerMain调用KsqlRestApplication.buildApplication构建rest服务器,接着调用tryStartApp启动服务。

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
      public static KsqlRestApplication buildApplication(
          KsqlRestConfig restConfig,
          VersionCheckerAgent versionCheckerAgent
      )
          throws Exception {
    
        final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);
    
        final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
    
        KsqlEngine ksqlEngine = new KsqlEngine(ksqlConfig);
        KafkaTopicClient topicClient = ksqlEngine.getTopicClient();
        UdfLoader.newInstance(ksqlConfig, ksqlEngine.getMetaStore(), ksqlInstallDir).load();
    
        String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
        String commandTopic =
            restConfig.getCommandTopic(ksqlServiceId);
        ensureCommandTopic(restConfig, topicClient, commandTopic);
    
        Map<String, Expression> commandTopicProperties = new HashMap<>();
        commandTopicProperties.put(
            DdlConfig.VALUE_FORMAT_PROPERTY,
            new StringLiteral("json")
        );
        commandTopicProperties.put(
            DdlConfig.KAFKA_TOPIC_NAME_PROPERTY,
            new StringLiteral(commandTopic)
        );
    
        ksqlEngine.getDdlCommandExec().execute(new RegisterTopicCommand(new RegisterTopic(
            QualifiedName.of(COMMANDS_KSQL_TOPIC_NAME),
            false,
            commandTopicProperties
        )), false);
    
        ksqlEngine.getDdlCommandExec().execute(new CreateStreamCommand(
            "statementText",
            new CreateStream(
                QualifiedName.of(COMMANDS_STREAM_NAME),
                Collections.singletonList(new TableElement(
                    "STATEMENT",
                    new PrimitiveType(Type.KsqlType.STRING)
                )),
                false,
                Collections.singletonMap(
                    DdlConfig.TOPIC_NAME_PROPERTY,
                    new StringLiteral(COMMANDS_KSQL_TOPIC_NAME)
                )
            ),
            ksqlEngine.getTopicClient(),
            true
        ), false);
    
        Map<String, Object> commandConsumerProperties = restConfig.getCommandConsumerProperties();
        KafkaConsumer<CommandId, Command> commandConsumer = new KafkaConsumer<>(
            commandConsumerProperties,
            getJsonDeserializer(CommandId.class, true),
            getJsonDeserializer(Command.class, false)
        );
    
        KafkaProducer<CommandId, Command> commandProducer = new KafkaProducer<>(
            restConfig.getCommandProducerProperties(),
            getJsonSerializer(true),
            getJsonSerializer(false)
        );
    
        CommandStore commandStore = new CommandStore(
            commandTopic,
            commandConsumer,
            commandProducer,
            new CommandIdAssigner(ksqlEngine.getMetaStore())
        );
    
        StatementParser statementParser = new StatementParser(ksqlEngine);
    
        StatementExecutor statementExecutor = new StatementExecutor(
            ksqlConfig,
            ksqlEngine,
            statementParser
        );
    
        CommandRunner commandRunner = new CommandRunner(
            statementExecutor,
            commandStore
        );
    
        RootDocument rootDocument = new RootDocument();
    
        StatusResource statusResource = new StatusResource(statementExecutor);
        StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
            ksqlConfig,
            ksqlEngine,
            statementParser,
            restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)
        );
        KsqlResource ksqlResource = new KsqlResource(
            ksqlConfig,
            ksqlEngine,
            commandStore,
            statementExecutor,
            restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)
        );
    
        commandRunner.processPriorCommands();
    
        return new KsqlRestApplication(
            ksqlEngine,
            ksqlConfig,
            restConfig,
            commandRunner,
            rootDocument,
            statusResource,
            streamedQueryResource,
            ksqlResource,
            versionCheckerAgent
        );
      }
      @Override
      public void start() throws Exception {
        super.start();
        commandRunnerThread.start();
        Properties metricsProperties = new Properties();
        metricsProperties.putAll(getConfiguration().getOriginals());
        if (versionCheckerAgent != null) {
          versionCheckerAgent.start(KsqlModuleType.SERVER, metricsProperties);
        }
    
        displayWelcomeMessage();
      }
    
      @Override
      public void setupResources(Configurable<?> config, KsqlRestConfig appConfig) {
        config.register(rootDocument);
        config.register(new ServerInfoResource(serverInfo));
        config.register(statusResource);
        config.register(ksqlResource);
        config.register(streamedQueryResource);
        config.register(new KsqlExceptionMapper());
      }
    
    
    

    KsqlRestApplication.buildApplication运行过程中,会读 取commandTopic进行恢复,这里不作过多介绍。后续将会详细展开.
    接着,KsqlRestApplication.buildApplication加载相关的restful api资源。

    随后, KsqlRestApplication.start启动restful服务。
    KsqlRestApplication.start首先调用父类Application.start 方法,该application位于另一项目 中:
    https://github.com/confluentinc/rest-utils.git

    [larluo@larluo-nixos:~/work/git/my-repo/rest-utils]$ cat ./core/src/main/java/io/confluent/rest/Application.java
      /**
       * Register resources or additional Providers, ExceptionMappers, and other JAX-RS components with
       * the Jersey application. This, combined with your Configuration class, is where you can
       * customize the behavior of the application.
       */
      public abstract void setupResources(Configurable<?> config, T appConfig);
    
      public void start() throws Exception {
        if (server == null) {
          createServer();
        }
        server.start();
      }
      public Server createServer() throws RestConfigException, ServletException {
        // The configuration for the JAX-RS REST service
        ResourceConfig resourceConfig = new ResourceConfig();
    
        Map<String, String> configuredTags = getConfiguration().getMap(RestConfig.METRICS_TAGS_CONFIG);
    
        Map<String, String> combinedMetricsTags = new HashMap<>(getMetricsTags());
        combinedMetricsTags.putAll(configuredTags);
    
        configureBaseApplication(resourceConfig, combinedMetricsTags);
        setupResources(resourceConfig, getConfiguration());
    
        // Configure the servlet container
        ServletContainer servletContainer = new ServletContainer(resourceConfig);
        final FilterHolder servletHolder = new FilterHolder(servletContainer);
    
        server = new Server() {
          @Override
          protected void doStop() throws Exception {
            super.doStop();
            Application.this.metrics.close();
            Application.this.onShutdown();
            Application.this.shutdownLatch.countDown();
          }
        };
    
        MBeanContainer mbContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
        server.addEventListener(mbContainer);
        server.addBean(mbContainer);
    
        MetricsListener metricsListener = new MetricsListener(metrics, "jetty", combinedMetricsTags);
    
        List<URI> listeners = parseListeners(config.getList(RestConfig.LISTENERS_CONFIG),
                config.getInt(RestConfig.PORT_CONFIG), Arrays.asList("http", "https"), "http");
        for (URI listener : listeners) {
          log.info("Adding listener: " + listener.toString());
          NetworkTrafficServerConnector connector;
          if (listener.getScheme().equals("http")) {
            connector = new NetworkTrafficServerConnector(server);
          } else {
            SslContextFactory sslContextFactory = new SslContextFactory();
            if (!config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG).isEmpty()) {
              sslContextFactory.setKeyStorePath(
                  config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG)
              );
              sslContextFactory.setKeyStorePassword(
                  config.getPassword(RestConfig.SSL_KEYSTORE_PASSWORD_CONFIG).value()
              );
              sslContextFactory.setKeyManagerPassword(
                  config.getPassword(RestConfig.SSL_KEY_PASSWORD_CONFIG).value()
              );
              sslContextFactory.setKeyStoreType(
                  config.getString(RestConfig.SSL_KEYSTORE_TYPE_CONFIG)
              );
    
              if (!config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG).isEmpty()) {
                sslContextFactory.setKeyManagerFactoryAlgorithm(
                        config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG));
              }
            }
    
            sslContextFactory.setNeedClientAuth(config.getBoolean(RestConfig.SSL_CLIENT_AUTH_CONFIG));
    
            List<String> enabledProtocols = config.getList(RestConfig.SSL_ENABLED_PROTOCOLS_CONFIG);
            if (!enabledProtocols.isEmpty()) {
              sslContextFactory.setIncludeProtocols(enabledProtocols.toArray(new String[0]));
            }
    
            List<String> cipherSuites = config.getList(RestConfig.SSL_CIPHER_SUITES_CONFIG);
            if (!cipherSuites.isEmpty()) {
              sslContextFactory.setIncludeCipherSuites(cipherSuites.toArray(new String[0]));
            }
    
            if (!config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG).isEmpty()) {
              sslContextFactory.setEndpointIdentificationAlgorithm(
                      config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
            }
    
            if (!config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG).isEmpty()) {
              sslContextFactory.setTrustStorePath(
                  config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG)
              );
              sslContextFactory.setTrustStorePassword(
                  config.getPassword(RestConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG).value()
              );
              sslContextFactory.setTrustStoreType(
                  config.getString(RestConfig.SSL_TRUSTSTORE_TYPE_CONFIG)
              );
    
              if (!config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG).isEmpty()) {
                sslContextFactory.setTrustManagerFactoryAlgorithm(
                        config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG)
                );
              }
            }
    
            sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROTOCOL_CONFIG));
            if (!config.getString(RestConfig.SSL_PROVIDER_CONFIG).isEmpty()) {
              sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROVIDER_CONFIG));
            }
    
            connector = new NetworkTrafficServerConnector(server, sslContextFactory);
          }
    
          connector.addNetworkTrafficListener(metricsListener);
          connector.setPort(listener.getPort());
          connector.setHost(listener.getHost());
          server.addConnector(connector);
        }
    
        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
        context.setContextPath("/");
    
    
        ServletHolder defaultHolder = new ServletHolder("default", DefaultServlet.class);
        defaultHolder.setInitParameter("dirAllowed", "false");
    
        ResourceCollection staticResources = getStaticResources();
        if (staticResources != null) {
          context.setBaseResource(staticResources);
        }
    
        configureSecurityHandler(context);
    
        List<String> unsecurePaths = config.getList(RestConfig.AUTHENTICATION_SKIP_PATHS);
        setUnsecurePathConstraints(context, unsecurePaths);
    
        String allowedOrigins = getConfiguration().getString(
            RestConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG
        );
        if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
          FilterHolder filterHolder = new FilterHolder(CrossOriginFilter.class);
          filterHolder.setName("cross-origin");
          filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
          String allowedMethods = getConfiguration().getString(
              RestConfig.ACCESS_CONTROL_ALLOW_METHODS
          );
          if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) {
            filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
          }
          context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
        }
        configurePreResourceHandling(context);
        context.addFilter(servletHolder, "/*", null);
        configurePostResourceHandling(context);
        context.addServlet(defaultHolder, "/*");
    
        RequestLogHandler requestLogHandler = new RequestLogHandler();
        requestLogHandler.setRequestLog(requestLog);
    
        HandlerCollection handlers = new HandlerCollection();
        handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
    
        /* Needed for graceful shutdown as per `setStopTimeout` documentation */
        StatisticsHandler statsHandler = new StatisticsHandler();
        statsHandler.setHandler(handlers);
    
        final ServletContextHandler webSocketServletContext =
            new ServletContextHandler(ServletContextHandler.SESSIONS);
        webSocketServletContext.setContextPath(
            config.getString(RestConfig.WEBSOCKET_PATH_PREFIX_CONFIG)
        );
        final ContextHandlerCollection contexts = new ContextHandlerCollection();
        contexts.setHandlers(new Handler[] {
            statsHandler,
            webSocketServletContext
        });
    
        server.setHandler(wrapWithGzipHandler(contexts));
    
        ServerContainer container =
            WebSocketServerContainerInitializer.configureContext(webSocketServletContext);
        registerWebSocketEndpoints(container);
        int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG);
        if (gracefulShutdownMs > 0) {
          server.setStopTimeout(gracefulShutdownMs);
        }
        server.setStopAtShutdown(true);
    
        return server;
      }
    

    Application.start()启动过程 中会调用子类KsqlRestApplication.setupResource加载资源路径.
    对于查询请求,对应的资源为StreamedQueryResource.

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
      @POST
      @Consumes(MediaType.APPLICATION_JSON)
      public Response streamQuery(KsqlRequest request) throws Exception {
        String ksql = request.getKsql();
        Statement statement;
        if (ksql == null) {
          return Errors.badRequest("\"ksql\" field must be given");
        }
        Map<String, Object> clientLocalProperties =
            Optional.ofNullable(request.getStreamsProperties()).orElse(Collections.emptyMap());
        try {
          statement = statementParser.parseSingleStatement(ksql);
        } catch (IllegalArgumentException | KsqlException e) {
          return Errors.badRequest(e);
        }
    
        if (statement instanceof Query) {
          QueryStreamWriter queryStreamWriter;
          try {
            queryStreamWriter = new QueryStreamWriter(
                ksqlConfig,
                ksqlEngine,
                disconnectCheckInterval,
                ksql,
                clientLocalProperties,
                objectMapper);
          } catch (KsqlException e) {
            return Errors.badRequest(e);
          }
          log.info("Streaming query '{}'", ksql);
          return Response.ok().entity(queryStreamWriter).build();
    
        } else if (statement instanceof PrintTopic) {
          TopicStreamWriter topicStreamWriter = getTopicStreamWriter(
              clientLocalProperties,
              (PrintTopic) statement
          );
          return Response.ok().entity(topicStreamWriter).build();
        }
        return Errors.badRequest(String .format(
            "Statement type `%s' not supported for this resource",
            statement.getClass().getName()));
      }
    

    StreamedQueryResource.streamQuery判断为Query类型,构建 QueryStreamWriter对象进行处理.

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java
      QueryStreamWriter(
          final KsqlConfig ksqlConfig,
          final KsqlEngine ksqlEngine,
          final long disconnectCheckInterval,
          final String queryString,
          final Map<String, Object> overriddenProperties,
          final ObjectMapper objectMapper
      ) throws Exception {
        QueryMetadata queryMetadata =
            ksqlEngine.buildMultipleQueries(
                queryString, ksqlConfig, overriddenProperties).get(0);
        this.objectMapper = objectMapper;
        if (!(queryMetadata instanceof QueuedQueryMetadata)) {
          throw new Exception(String.format(
              "Unexpected metadata type: expected QueuedQueryMetadata, found %s instead",
              queryMetadata.getClass()
          ));
        }
    
        this.disconnectCheckInterval = disconnectCheckInterval;
        this.queryMetadata = ((QueuedQueryMetadata) queryMetadata);
        this.queryMetadata.setLimitHandler(new LimitHandler());
        this.queryMetadata.getKafkaStreams().setUncaughtExceptionHandler(new StreamsExceptionHandler());
        this.ksqlEngine = ksqlEngine;
    
        queryMetadata.getKafkaStreams().start();
      }
    

    QueryStreamWriter.ctor调用KsqlEngine.buildMultipleQueries构建kafka streams dsl后,运行kafka streams进行查询处理.

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
      public List<QueryMetadata> buildMultipleQueries(
          final String queriesString,
          final KsqlConfig ksqlConfig,
          final Map<String, Object> overriddenProperties) {
        for (String property : overriddenProperties.keySet()) {
          if (IMMUTABLE_PROPERTIES.contains(property)) {
            throw new IllegalArgumentException(
                String.format("Cannot override property '%s'", property)
            );
          }
        }
    
        // Multiple queries submitted as the same time should success or fail as a whole,
        // Thus we use tempMetaStore to store newly created tables, streams or topics.
        // MetaStore tempMetaStore = new MetaStoreImpl(metaStore);
        MetaStore tempMetaStore = metaStore.clone();
    
        // Build query AST from the query string
        List<Pair<String, Statement>> queries = parseQueries(
            queriesString,
            tempMetaStore
        );
    
        return planQueries(queries, ksqlConfig, overriddenProperties, tempMetaStore);
      }
      List<Pair<String, Statement>> parseQueries(
          final String queriesString,
          final MetaStore tempMetaStore
      ) {
        try {
          MetaStore tempMetaStoreForParser = tempMetaStore.clone();
          // Parse and AST creation
          KsqlParser ksqlParser = new KsqlParser();
    
          List<SqlBaseParser.SingleStatementContext> parsedStatements
              = ksqlParser.getStatements(queriesString);
          List<Pair<String, Statement>> queryList = new ArrayList<>();
    
          for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
            Pair<Statement, DataSourceExtractor> statementInfo = ksqlParser.prepareStatement(
                singleStatementContext,
                tempMetaStoreForParser
            );
            Statement statement = statementInfo.getLeft();
            if (StatementRewriteForStruct.requiresRewrite(statement)) {
              statement = new StatementRewriteForStruct(
                  statement,
                  statementInfo.getRight())
                  .rewriteForStruct();
            }
            Pair<String, Statement> queryPair =
                buildSingleQueryAst(
                    statement,
                    getStatementString(singleStatementContext),
                    tempMetaStore,
                    tempMetaStoreForParser
                );
            if (queryPair != null) {
              queryList.add(queryPair);
            }
          }
          return queryList;
        } catch (Exception e) {
          throw new ParseFailedException("Exception while processing statements :" + e.getMessage(), e);
        }
      }
    private Pair<String, Statement> buildSingleQueryAst(
          final Statement statement,
          final String statementString,
          final MetaStore tempMetaStore,
          final MetaStore tempMetaStoreForParser
      ) {
    
        log.info("Building AST for {}.", statementString);
    
        if (statement instanceof Query) {
          return new Pair<>(statementString, statement);
        } else if (statement instanceof CreateAsSelect) {
          CreateAsSelect createAsSelect = (CreateAsSelect) statement;
          QuerySpecification querySpecification =
              (QuerySpecification) createAsSelect.getQuery().getQueryBody();
          Query query = addInto(
              createAsSelect.getQuery(),
              querySpecification,
              createAsSelect.getName().getSuffix(),
              createAsSelect.getProperties(),
              createAsSelect.getPartitionByColumn(),
              true
          );
          tempMetaStoreForParser.putSource(
              queryEngine.getResultDatasource(
                  querySpecification.getSelect(),
                  createAsSelect.getName().getSuffix()
              ).cloneWithTimeKeyColumns());
          return new Pair<>(statementString, query);
        } else if (statement instanceof InsertInto) {
          InsertInto insertInto = (InsertInto) statement;
          if (tempMetaStoreForParser.getSource(insertInto.getTarget().getSuffix()) == null) {
            throw new KsqlException(String.format("Sink, %s, does not exist for the INSERT INTO "
                                                  + "statement.", insertInto.getTarget().getSuffix()));
          }
    
          if (tempMetaStoreForParser.getSource(insertInto.getTarget().getSuffix()).getDataSourceType()
              != DataSource.DataSourceType.KSTREAM) {
            throw new KsqlException(String.format("INSERT INTO can only be used to insert into a "
                                                  + "stream. %s is a table.",
                                                  insertInto.getTarget().getSuffix()));
          }
    
          QuerySpecification querySpecification =
              (QuerySpecification) insertInto.getQuery().getQueryBody();
    
          Query query = addInto(
              insertInto.getQuery(),
              querySpecification,
              insertInto.getTarget().getSuffix(),
              new HashMap<>(),
              insertInto.getPartitionByColumn(),
              false
          );
    
          return new Pair<>(statementString, query);
        } else  if (statement instanceof DdlStatement) {
          return buildSingleDdlStatement(statement,
                                         statementString,
                                         tempMetaStore,
                                         tempMetaStoreForParser);
        }
    
        return null;
      }
      private List<QueryMetadata> planQueries(
          final List<Pair<String, Statement>> statementList,
          final KsqlConfig ksqlConfig,
          final Map<String, Object> overriddenProperties,
          final MetaStore tempMetaStore
      ) {
        // Logical plan creation from the ASTs
        List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
            tempMetaStore,
            statementList,
            ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
        );
    
        // Physical plan creation from logical plans.
        List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
            logicalPlans,
            statementList,
            ksqlConfig,
            overriddenProperties,
            clientSupplier,
            true
        );
    
        for (QueryMetadata queryMetadata : runningQueries) {
          if (queryMetadata instanceof PersistentQueryMetadata) {
            livePersistentQueries.add(queryMetadata);
            PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
            persistentQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
            metaStore.updateForPersistentQuery(persistentQueryMetadata.getQueryId().getId(),
                                               persistentQueryMetadata.getSourceNames(),
                                               persistentQueryMetadata.getSinkNames());
          }
          allLiveQueries.add(queryMetadata);
        }
    
        return runningQueries;
      }
    
      public QueryMetadata getQueryExecutionPlan(final Query query, final KsqlConfig ksqlConfig) {
    
        // Logical plan creation from the ASTs
        List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
            metaStore,
            Collections.singletonList(new Pair<>("", query)),
            ksqlConfig);
    
        // Physical plan creation from logical plans.
        List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
            logicalPlans,
            Collections.singletonList(new Pair<>("", query)),
            ksqlConfig,
            Collections.emptyMap(),
            clientSupplier,
            false
        );
        return runningQueries.get(0);
      }
    
    
    

    KsqlEngine.buildMultipleQueries首先调用KsqlEngine.parseQueries生成AST,接着根据 AST调用KsqlEngine.planQueries生成KakaStreams。

    首先来看KsqlEngine.parseQueries

      List<Pair<String, Statement>> parseQueries(
          final String queriesString,
          final MetaStore tempMetaStore
      ) {
        try {
          MetaStore tempMetaStoreForParser = tempMetaStore.clone();
          // Parse and AST creation
          KsqlParser ksqlParser = new KsqlParser();
    
          List<SqlBaseParser.SingleStatementContext> parsedStatements
              = ksqlParser.getStatements(queriesString);
          List<Pair<String, Statement>> queryList = new ArrayList<>();
    
          for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
            Pair<Statement, DataSourceExtractor> statementInfo = ksqlParser.prepareStatement(
                singleStatementContext,
                tempMetaStoreForParser
            );
            Statement statement = statementInfo.getLeft();
            if (StatementRewriteForStruct.requiresRewrite(statement)) {
              statement = new StatementRewriteForStruct(
                  statement,
                  statementInfo.getRight())
                  .rewriteForStruct();
            }
            Pair<String, Statement> queryPair =
                buildSingleQueryAst(
                    statement,
                    getStatementString(singleStatementContext),
                    tempMetaStore,
                    tempMetaStoreForParser
                );
            if (queryPair != null) {
              queryList.add(queryPair);
            }
          }
          return queryList;
        } catch (Exception e) {
          throw new ParseFailedException("Exception while processing statements :" + e.getMessage(), e);
        }
      }
    

    这里调用KsqlParser.getStatements进行语句解析:

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-parser/src/main/java/io/confluent/ksql/parser/KsqlParser.java
    import org.antlr.v4.runtime.ANTLRInputStream
      public List<SqlBaseParser.SingleStatementContext> getStatements(String sql) {
        try {
          ParserRuleContext tree = getParseTree(sql);
          SqlBaseParser.StatementsContext statementsContext = (SqlBaseParser.StatementsContext) tree;
          return statementsContext.singleStatement();
        } catch (Exception e) {
          throw new ParseFailedException(e.getMessage(), e);
        }
      }
      private ParserRuleContext getParseTree(String sql) {
    
        SqlBaseLexer
            sqlBaseLexer =
            new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
        CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
        SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);
    
        sqlBaseLexer.removeErrorListeners();
        sqlBaseLexer.addErrorListener(ERROR_LISTENER);
    
        sqlBaseParser.removeErrorListeners();
        sqlBaseParser.addErrorListener(ERROR_LISTENER);
    
        Function<SqlBaseParser, ParserRuleContext> parseFunction = SqlBaseParser::statements;
        ParserRuleContext tree;
        try {
          // first, try parsing with potentially faster SLL mode
          sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
          tree = parseFunction.apply(sqlBaseParser);
        } catch (ParseCancellationException ex) {
          // if we fail, parse with LL mode
          tokenStream.reset(); // rewind input stream
          sqlBaseParser.reset();
    
          sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
          tree = parseFunction.apply(sqlBaseParser);
        }
    
        return tree;
      }
    

    KsqlParser.getStatements 接着调用antlr的SqlBaseParser::statements进行解析:

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 
    grammar SqlBase;
    
    tokens {
        DELIMITER
    }
    
    statements
        : (singleStatement)* EOF
        ;
    
    singleStatement
        : statement ';'
        ;
    singleExpression
        : expression EOF
        ;
    
    statement
        : query                                                                 #querystatement
        | (LIST | SHOW) PROPERTIES                                              #listProperties
        | (LIST | SHOW) TOPICS                                                  #listTopics
        | (LIST | SHOW) REGISTERED TOPICS                                       #listRegisteredTopics
        | (LIST | SHOW) STREAMS EXTENDED?                                   #listStreams
        | (LIST | SHOW) TABLES EXTENDED?                                    #listTables
        | (LIST | SHOW) FUNCTIONS                                            #listFunctions
        | DESCRIBE EXTENDED? (qualifiedName | TOPIC qualifiedName)              #showColumns
        | DESCRIBE FUNCTION qualifiedName                                       #describeFunction
        | PRINT (qualifiedName | STRING) (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)?   #printTopic
        | (LIST | SHOW) QUERIES EXTENDED?                                   #listQueries
        | TERMINATE QUERY? qualifiedName                                        #terminateQuery
        | SET STRING EQ STRING                                                  #setProperty
        | UNSET STRING                                                          #unsetProperty
        | LOAD expression                                                       #loadProperties
        | REGISTER TOPIC (IF NOT EXISTS)? qualifiedName
                (WITH tableProperties)?                                         #registerTopic
        | CREATE STREAM (IF NOT EXISTS)? qualifiedName
                    ('(' tableElement (',' tableElement)* ')')?
                    (WITH tableProperties)?                                     #createStream
        | CREATE STREAM (IF NOT EXISTS)? qualifiedName
                (WITH tableProperties)? AS query
                                           (PARTITION BY identifier)?           #createStreamAs
        | CREATE TABLE (IF NOT EXISTS)? qualifiedName
                        ('(' tableElement (',' tableElement)* ')')?
                        (WITH tableProperties)?                                 #createTable
        | CREATE TABLE (IF NOT EXISTS)? qualifiedName
                (WITH tableProperties)? AS query                                #createTableAs
        | INSERT INTO qualifiedName query (PARTITION BY identifier)?            #insertInto
        | DROP TOPIC (IF EXISTS)? qualifiedName                                 #dropTopic
        | DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)?                  #dropStream
        | DROP TABLE (IF EXISTS)? qualifiedName  (DELETE TOPIC)?                  #dropTable
        | EXPLAIN ANALYZE?
                ('(' explainOption (',' explainOption)* ')')? (statement | qualifiedName)         #explain
        | EXPORT CATALOG TO STRING                                              #exportCatalog
        | RUN SCRIPT STRING                                                     #runScript
        ;
    query
        :  queryNoWith
        ;
    queryNoWith:
          queryTerm
          (LIMIT limit=(INTEGER_VALUE | ALL))?
        ;
    queryTerm
        : queryPrimary                                                             #queryTermDefault
        ;
    
    queryPrimary
        : querySpecification                   #queryPrimaryDefault
        | TABLE qualifiedName                  #table
        | VALUES expression (',' expression)*  #inlineTable
        | '(' queryNoWith  ')'                 #subquery
        ;
    querySpecification
        : SELECT STREAM? selectItem (',' selectItem)*
          (INTO into=relationPrimary)?
          (FROM from=relation (',' relation)*)?
          (WINDOW  windowExpression)?
          (WHERE where=booleanExpression)?
          (GROUP BY groupBy)?
          (HAVING having=booleanExpression)?
        ;
    
    

    解析成功后对于每个语句通过KsqlEngine.buildSingleQueryAst构建 Pair<String, Statement>.

    接着查看planQueries:

      private List<QueryMetadata> planQueries(
          final List<Pair<String, Statement>> statementList,
          final KsqlConfig ksqlConfig,
          final Map<String, Object> overriddenProperties,
          final MetaStore tempMetaStore
      ) {
        // Logical plan creation from the ASTs
        List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
            tempMetaStore,
            statementList,
            ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
        );
    
        // Physical plan creation from logical plans.
        List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
            logicalPlans,
            statementList,
            ksqlConfig,
            overriddenProperties,
            clientSupplier,
            true
        );
    
        for (QueryMetadata queryMetadata : runningQueries) {
          if (queryMetadata instanceof PersistentQueryMetadata) {
            livePersistentQueries.add(queryMetadata);
            PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
            persistentQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
            metaStore.updateForPersistentQuery(persistentQueryMetadata.getQueryId().getId(),
                                               persistentQueryMetadata.getSourceNames(),
                                               persistentQueryMetadata.getSinkNames());
          }
          allLiveQueries.add(queryMetadata);
        }
    
        return runningQueries;
      }
    
    

    这里KsqlEngine.planQueries通过QueryEngine.buildLogicalPlans及QueryEngine.buildPhysicalPlans构造逻辑计划以及物理计划,其中物理计划生成 kafka streams dsl。

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
      List<Pair<String, PlanNode>> buildLogicalPlans(
          final MetaStore metaStore,
          final List<Pair<String, Statement>> statementList,
          final KsqlConfig config) {
    
        List<Pair<String, PlanNode>> logicalPlansList = new ArrayList<>();
        // TODO: the purpose of tempMetaStore here
        MetaStore tempMetaStore = metaStore.clone();
    
        for (Pair<String, Statement> statementQueryPair : statementList) {
          if (statementQueryPair.getRight() instanceof Query) {
            PlanNode logicalPlan = buildQueryLogicalPlan(
                statementQueryPair.getLeft(),
                (Query) statementQueryPair.getRight(),
                tempMetaStore, config
            );
            logicalPlansList.add(new Pair<>(statementQueryPair.getLeft(), logicalPlan));
          } else {
            logicalPlansList.add(new Pair<>(statementQueryPair.getLeft(), null));
          }
    
          log.info("Build logical plan for {}.", statementQueryPair.getLeft());
        }
        return logicalPlansList;
      }
      private PlanNode buildQueryLogicalPlan(
          final String sqlExpression,
          final Query query,
          final MetaStore tempMetaStore,
          final KsqlConfig config) {
        final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(
            tempMetaStore,
            ksqlEngine.getFunctionRegistry(),
            config
        );
        final Analysis analysis = queryAnalyzer.analyze(sqlExpression, query);
        final AggregateAnalysis aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
        final PlanNode logicalPlan
            = new LogicalPlanner(analysis, aggAnalysis, ksqlEngine.getFunctionRegistry()).buildPlan();
        if (logicalPlan instanceof KsqlStructuredDataOutputNode) {
          KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
              (KsqlStructuredDataOutputNode) logicalPlan;
    
          StructuredDataSource
              structuredDataSource =
              new KsqlStream(
                  sqlExpression,
                  ksqlStructuredDataOutputNode.getId().toString(),
                  ksqlStructuredDataOutputNode.getSchema(),
                  ksqlStructuredDataOutputNode.getKeyField(),
                  ksqlStructuredDataOutputNode.getTimestampExtractionPolicy(),
                  ksqlStructuredDataOutputNode.getKsqlTopic()
              );
          if (analysis.isDoCreateInto()) {
            tempMetaStore.putTopic(ksqlStructuredDataOutputNode.getKsqlTopic());
            tempMetaStore.putSource(structuredDataSource.cloneWithTimeKeyColumns());
          }
        }
        return logicalPlan;
      }
      List<QueryMetadata> buildPhysicalPlans(
          final List<Pair<String, PlanNode>> logicalPlans,
          final List<Pair<String, Statement>> statementList,
          final KsqlConfig ksqlConfig,
          final Map<String, Object> overriddenProperties,
          final KafkaClientSupplier clientSupplier,
          final boolean updateMetastore
      ) {
    
        List<QueryMetadata> physicalPlans = new ArrayList<>();
    
        for (int i = 0; i < logicalPlans.size(); i++) {
    
          Pair<String, PlanNode> statementPlanPair = logicalPlans.get(i);
          if (statementPlanPair.getRight() == null) {
            Statement statement = statementList.get(i).getRight();
            if (!(statement instanceof DdlStatement)) {
              throw new KsqlException("expecting a statement implementing DDLStatement but got: "
                                      + statement.getClass());
            }
            handleDdlStatement(
                statementPlanPair.getLeft(),
                (DdlStatement) statement
            );
          } else {
            buildQueryPhysicalPlan(
                physicalPlans, statementPlanPair, ksqlConfig,
                overriddenProperties, clientSupplier, updateMetastore
            );
          }
    
        }
        return physicalPlans;
      }
      private void buildQueryPhysicalPlan(
          final List<QueryMetadata> physicalPlans,
          final Pair<String, PlanNode> statementPlanPair,
          final KsqlConfig ksqlConfig,
          final Map<String, Object> overriddenProperties,
          final KafkaClientSupplier clientSupplier,
          final boolean updateMetastore
      ) {
    
        final StreamsBuilder builder = new StreamsBuilder();
    
        // Build a physical plan, in this case a Kafka Streams DSL
        final PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(
            builder,
            ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties),
            ksqlEngine.getTopicClient(),
            ksqlEngine.getFunctionRegistry(),
            overriddenProperties,
            updateMetastore,
            ksqlEngine.getMetaStore(),
            ksqlEngine.getSchemaRegistryClient(),
            ksqlEngine.getQueryIdGenerator(),
            new KafkaStreamsBuilderImpl(clientSupplier)
        );
        physicalPlans.add(physicalPlanBuilder.buildPhysicalPlan(statementPlanPair));
      }
    
    

    QueryEngine.buildLogicalPlans主要由QueryAnalyzer实现,进行AstVisitor生成 Analysis,这里不做进一步介绍,后续将会详细讲解...
    QueryEngine.buildPhysicalPlans主要由PhysicalPlanBuilder.buildPhysicalPlan实现
    PhysicalPlanBuilder.buildQueryPhysicalPlan中构建了KafkaStreamsBuilderImpl对象后续使用。

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java
      public QueryMetadata buildPhysicalPlan(final Pair<String, PlanNode> statementPlanPair) {
        final SchemaKStream resultStream = statementPlanPair
            .getRight()
            .buildStream(
                builder,
                ksqlConfig,
                kafkaTopicClient,
                functionRegistry,
                overriddenStreamsProperties,
                schemaRegistryClient
            );
        final OutputNode outputNode = resultStream.outputNode();
        boolean isBareQuery = outputNode instanceof KsqlBareOutputNode;
    
        // Check to make sure the logical and physical plans match up;
        // important to do this BEFORE actually starting up
        // the corresponding Kafka Streams job
        if (isBareQuery && !(resultStream instanceof QueuedSchemaKStream)) {
          throw new KsqlException(String.format(
              "Mismatch between logical and physical output; "
              + "expected a QueuedSchemaKStream based on logical "
              + "KsqlBareOutputNode, found a %s instead",
              resultStream.getClass().getCanonicalName()
          ));
        }
        String serviceId = getServiceId();
        String persistanceQueryPrefix =
            ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG);
        String transientQueryPrefix =
            ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);
    
        if (isBareQuery) {
          return buildPlanForBareQuery(
              (QueuedSchemaKStream) resultStream,
              (KsqlBareOutputNode) outputNode,
              serviceId,
              transientQueryPrefix,
              statementPlanPair.getLeft()
          );
    
        } else if (outputNode instanceof KsqlStructuredDataOutputNode) {
    
          KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
              (KsqlStructuredDataOutputNode) outputNode;
          ksqlStructuredDataOutputNode = ksqlStructuredDataOutputNode.cloneWithDoCreateInto(
              ((KsqlStructuredDataOutputNode) statementPlanPair.getRight()).isDoCreateInto()
          );
          return buildPlanForStructuredOutputNode(
              statementPlanPair.getLeft(),
              resultStream,
              ksqlStructuredDataOutputNode,
              serviceId,
              persistanceQueryPrefix,
              statementPlanPair.getLeft());
    
    
        } else {
          throw new KsqlException(
              "Sink data source of type: "
              + outputNode.getClass()
              + " is not supported.");
        }
      }
      private QueryMetadata buildPlanForBareQuery(
          final QueuedSchemaKStream schemaKStream,
          final KsqlBareOutputNode bareOutputNode,
          final String serviceId,
          final String transientQueryPrefix,
          final String statement
      ) {
    
        final String applicationId = addTimeSuffix(getBareQueryApplicationId(
            serviceId,
            transientQueryPrefix
        ));
    
        KafkaStreams streams = buildStreams(
            bareOutputNode,
            builder,
            applicationId,
            ksqlConfig,
            overriddenStreamsProperties
        );
    
        SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);
    
        return new QueuedQueryMetadata(
            statement,
            streams,
            bareOutputNode,
            schemaKStream.getExecutionPlan(""),
            schemaKStream.getQueue(),
            (sourceSchemaKstream instanceof SchemaKTable)
                ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM,
            applicationId,
            kafkaTopicClient,
            builder.build(),
            overriddenStreamsProperties
        );
      }
    
      private KafkaStreams buildStreams(
          final OutputNode outputNode,
          final StreamsBuilder builder,
          final String applicationId,
          final KsqlConfig ksqlConfig,
          final Map<String, Object> overriddenProperties
      ) {
        final Map<String, Object> newStreamsProperties
            = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
        newStreamsProperties.putAll(overriddenProperties);
        newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    
        updateListProperty(
            newStreamsProperties,
            StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
            ConsumerCollector.class.getCanonicalName()
        );
        updateListProperty(
            newStreamsProperties,
            StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
            ProducerCollector.class.getCanonicalName()
        );
        return kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
      }
    
    

    这里PhysicalPlanBuilder.buildPhysicalPlan调用标准输出形式 的PhysicalPlanBuilder.buildPlanForBareQuery,进一步调用KafkaStreamsBuilderImpl.buildKafkaStreams生成KafkaStreams

    [larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/physical/KafkaStreamsBuilderImpl.java
      public KafkaStreams buildKafkaStreams(StreamsBuilder builder, StreamsConfig conf) {
        return new KafkaStreams(builder.build(), conf, clientSupplier);
      }
    

    最终QueryStreamWriter.ctor运行构建 好的kafka streams dsl完成 实时查询。。。

    相关文章

      网友评论

          本文标题:KSQL源码分析-01:SQL 查询是如何运行的?

          本文链接:https://www.haomeiwen.com/subject/alesbftx.html