你是否想深入理解,当你在命令行提交一个SQL查询的时候,KSQL究竟发生了什么 ?
主要分成两部分: 客户端调用与服务端调用
1. 客户端调用
当我们启动ksql的时候 , 运行了ksql命令.
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat bin/ksql
#!/bin/bash
exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.Ksql "$@"
这个命令很简单,就是运行Ksql.main(args)这个java类。那么我们接着看KSQL这个类 :
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-cli/src/main/java/io/confluent/ksql/Ksql.java
public static void main(String[] args) throws IOException {
final Options options = args.length == 0 ? Options.parse("http://localhost:8088")
: Options.parse(args);
if (options == null) {
System.exit(-1);
}
try {
final Properties properties = loadProperties(options.getConfigFile());
final KsqlRestClient restClient = new KsqlRestClient(options.getServer(), properties);
options.getUserNameAndPassword().ifPresent(
creds -> restClient.setupAuthenticationCredentials(creds.left, creds.right)
);
final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent();
versionChecker.start(KsqlModuleType.CLI, properties);
try (Cli cli = new Cli(options.getStreamedQueryRowLimit(),
options.getStreamedQueryTimeoutMs(),
restClient,
new JLineTerminal(options.getOutputFormat(), restClient))
) {
cli.runInteractively();
}
} catch (final Exception e) {
final String msg = ErrorMessageUtil.buildErrorMessage(e);
LOGGER.error(msg);
System.err.println(msg);
System.exit(-1);
}
}
这个类可选地加载一些配置文件以及一些命令行参数后调用Cli.runInteractively。
接着看Cli.java这个类
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
public void runInteractively() {
displayWelcomeMessage();
validateClient(terminal.writer(), restClient);
boolean eof = false;
while (!eof) {
try {
handleLine(readLine());
} catch (EndOfFileException exception) {
// EOF is fine, just terminate the REPL
terminal.writer().println("Exiting KSQL.");
eof = true;
} catch (Exception exception) {
LOGGER.error("", exception);
terminal.writer().println(ErrorMessageUtil.buildErrorMessage(exception));
}
terminal.flush();
}
}
public void handleLine(String line) throws Exception {
String trimmedLine = Optional.ofNullable(line).orElse("").trim();
if (trimmedLine.isEmpty()) {
return;
}
String[] commandArgs = trimmedLine.split("\\s+", 2);
CliSpecificCommand cliSpecificCommand =
terminal.getCliSpecificCommands().get(commandArgs[0].toLowerCase());
if (cliSpecificCommand != null) {
cliSpecificCommand.execute(commandArgs.length > 1 ? commandArgs[1] : "");
} else {
handleStatements(line);
}
}
private void handleStatements(String line)
throws InterruptedException, IOException, ExecutionException {
StringBuilder consecutiveStatements = new StringBuilder();
for (SqlBaseParser.SingleStatementContext statementContext :
new KsqlParser().getStatements(line)) {
String statementText = KsqlEngine.getStatementString(statementContext);
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
|| statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
statementText
);
} else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
listProperties(statementText);
} else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
setProperty(statementContext);
} else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
} else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
runScript(statementContext, statementText);
} else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
registerTopic(consecutiveStatements, statementContext, statementText);
} else {
consecutiveStatements.append(statementText);
}
}
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
}
}
private StringBuilder printOrDisplayQueryResults(
StringBuilder consecutiveStatements,
SqlBaseParser.SingleStatementContext statementContext,
String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
consecutiveStatements = new StringBuilder();
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
handleStreamedQuery(statementText);
} else {
handlePrintedTopic(statementText);
}
return consecutiveStatements;
}
private void printKsqlResponse(RestResponse<KsqlEntityList> response) throws IOException {
if (response.isSuccessful()) {
KsqlEntityList ksqlEntities = response.getResponse();
boolean noErrorFromServer = true;
for (KsqlEntity entity : ksqlEntities) {
if (entity instanceof CommandStatusEntity
&& (
((CommandStatusEntity) entity).getCommandStatus().getStatus()
== CommandStatus.Status.ERROR)
) {
String fullMessage = ((CommandStatusEntity) entity).getCommandStatus().getMessage();
terminal.printError(fullMessage.split("\n")[0], fullMessage);
noErrorFromServer = false;
}
}
if (noErrorFromServer) {
terminal.printKsqlEntityList(response.getResponse());
}
} else {
terminal.printErrorMessage(response.getErrorMessage());
}
}
这里 CLI.runInteractively方法 做什么 呢?
从CLI.readLine方法 中读取输入,调用CLI.handleLine进行处理。
CLI.handleLine判断是否是Console.getCliSpecificCommands命令,
如果不是,则调用CLI.handleStatements.
那么哪些是Console.getCliSpecificCommands命令呢?
public LinkedHashMap<String, CliSpecificCommand> getCliSpecificCommands() {
return cliSpecificCommands;
}
public void registerCliSpecificCommand(final CliSpecificCommand cliSpecificCommand) {
cliSpecificCommands.put(cliSpecificCommand.getName(), cliSpecificCommand);
}
private void registerDefaultCommands() {
registerCliSpecificCommand(new Help());
registerCliSpecificCommand(new Clear());
registerCliSpecificCommand(new Output());
registerCliSpecificCommand(new History());
registerCliSpecificCommand(new Version());
registerCliSpecificCommand(new Exit());
}
现在就进入了handleStatements的处理逻辑:
handleStatement通过KsqlParser.getStatements(line)进行语句解析后得到语句类型。
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-parser/src/main/java/io/confluent/ksql/parser/KsqlParser.java
import org.antlr.v4.runtime.ANTLRInputStream
public List<SqlBaseParser.SingleStatementContext> getStatements(String sql) {
try {
ParserRuleContext tree = getParseTree(sql);
SqlBaseParser.StatementsContext statementsContext = (SqlBaseParser.StatementsContext) tree;
return statementsContext.singleStatement();
} catch (Exception e) {
throw new ParseFailedException(e.getMessage(), e);
}
}
private ParserRuleContext getParseTree(String sql) {
SqlBaseLexer
sqlBaseLexer =
new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);
sqlBaseLexer.removeErrorListeners();
sqlBaseLexer.addErrorListener(ERROR_LISTENER);
sqlBaseParser.removeErrorListeners();
sqlBaseParser.addErrorListener(ERROR_LISTENER);
Function<SqlBaseParser, ParserRuleContext> parseFunction = SqlBaseParser::statements;
ParserRuleContext tree;
try {
// first, try parsing with potentially faster SLL mode
sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
tree = parseFunction.apply(sqlBaseParser);
} catch (ParseCancellationException ex) {
// if we fail, parse with LL mode
tokenStream.reset(); // rewind input stream
sqlBaseParser.reset();
sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
tree = parseFunction.apply(sqlBaseParser);
}
return tree;
}
KsqlParser.getStatements 接着调用antlr的SqlBaseParser::statements进行解析:
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4
grammar SqlBase;
tokens {
DELIMITER
}
statements
: (singleStatement)* EOF
;
singleStatement
: statement ';'
;
singleExpression
: expression EOF
;
statement
: query #querystatement
| (LIST | SHOW) PROPERTIES #listProperties
| (LIST | SHOW) TOPICS #listTopics
| (LIST | SHOW) REGISTERED TOPICS #listRegisteredTopics
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
| DESCRIBE EXTENDED? (qualifiedName | TOPIC qualifiedName) #showColumns
| DESCRIBE FUNCTION qualifiedName #describeFunction
| PRINT (qualifiedName | STRING) (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)? #printTopic
| (LIST | SHOW) QUERIES EXTENDED? #listQueries
| TERMINATE QUERY? qualifiedName #terminateQuery
| SET STRING EQ STRING #setProperty
| UNSET STRING #unsetProperty
| LOAD expression #loadProperties
| REGISTER TOPIC (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? #registerTopic
| CREATE STREAM (IF NOT EXISTS)? qualifiedName
('(' tableElement (',' tableElement)* ')')?
(WITH tableProperties)? #createStream
| CREATE STREAM (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? AS query
(PARTITION BY identifier)? #createStreamAs
| CREATE TABLE (IF NOT EXISTS)? qualifiedName
('(' tableElement (',' tableElement)* ')')?
(WITH tableProperties)? #createTable
| CREATE TABLE (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? AS query #createTableAs
| INSERT INTO qualifiedName query (PARTITION BY identifier)? #insertInto
| DROP TOPIC (IF EXISTS)? qualifiedName #dropTopic
| DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropStream
| DROP TABLE (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropTable
| EXPLAIN ANALYZE?
('(' explainOption (',' explainOption)* ')')? (statement | qualifiedName) #explain
| EXPORT CATALOG TO STRING #exportCatalog
| RUN SCRIPT STRING #runScript
;
query
: queryNoWith
;
queryNoWith:
queryTerm
(LIMIT limit=(INTEGER_VALUE | ALL))?
;
queryTerm
: queryPrimary #queryTermDefault
;
queryPrimary
: querySpecification #queryPrimaryDefault
| TABLE qualifiedName #table
| VALUES expression (',' expression)* #inlineTable
| '(' queryNoWith ')' #subquery
;
querySpecification
: SELECT STREAM? selectItem (',' selectItem)*
(INTO into=relationPrimary)?
(FROM from=relation (',' relation)*)?
(WINDOW windowExpression)?
(WHERE where=booleanExpression)?
(GROUP BY groupBy)?
(HAVING having=booleanExpression)?
;
通过匹配 SqlBase.g4 语法,得到的statements为 querystatement.
再次回到 CLI.handleStatements.
private void handleStatements(String line)
throws InterruptedException, IOException, ExecutionException {
StringBuilder consecutiveStatements = new StringBuilder();
for (SqlBaseParser.SingleStatementContext statementContext :
new KsqlParser().getStatements(line)) {
String statementText = KsqlEngine.getStatementString(statementContext);
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
|| statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
statementText
);
} else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
listProperties(statementText);
} else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
setProperty(statementContext);
} else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
} else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
runScript(statementContext, statementText);
} else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
registerTopic(consecutiveStatements, statementContext, statementText);
} else {
consecutiveStatements.append(statementText);
}
}
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
}
}
由于这里的类型 是SqlBaseParser.QuerystatementContext,调用CLI.printOrDisplayQueryResults.
private StringBuilder printOrDisplayQueryResults(
StringBuilder consecutiveStatements,
SqlBaseParser.SingleStatementContext statementContext,
String statementText
) throws InterruptedException, IOException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
);
consecutiveStatements = new StringBuilder();
}
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
handleStreamedQuery(statementText);
} else {
handlePrintedTopic(statementText);
}
return consecutiveStatements;
}
private void handleStreamedQuery(String query)
throws InterruptedException, ExecutionException, IOException {
RestResponse<KsqlRestClient.QueryStream> queryResponse =
restClient.makeQueryRequest(query);
LOGGER.debug("Handling streamed query");
if (queryResponse.isSuccessful()) {
try (KsqlRestClient.QueryStream queryStream = queryResponse.getResponse()) {
Future<?> queryStreamFuture = queryStreamExecutorService.submit(new Runnable() {
@Override
public void run() {
for (long rowsRead = 0; keepReading(rowsRead) && queryStream.hasNext(); rowsRead++) {
try {
StreamedRow row = queryStream.next();
terminal.printStreamedRow(row);
if (row.getFinalMessage() != null || row.getErrorMessage() != null) {
break;
}
} catch (IOException exception) {
throw new RuntimeException(exception);
}
}
}
});
terminal.handle(Terminal.Signal.INT, signal -> {
terminal.handle(Terminal.Signal.INT, Terminal.SignalHandler.SIG_IGN);
queryStreamFuture.cancel(true);
});
try {
if (streamedQueryTimeoutMs == null) {
queryStreamFuture.get();
Thread.sleep(1000); // TODO: Make things work without this
} else {
try {
queryStreamFuture.get(streamedQueryTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException exception) {
queryStreamFuture.cancel(true);
}
}
} catch (CancellationException exception) {
// It's fine
}
} finally {
terminal.writer().println("Query terminated");
terminal.flush();
}
} else {
terminal.printErrorMessage(queryResponse.getErrorMessage());
}
}
CLI.handleStreamedQuery使用KsqlRestClient.makeQueryRequest进行服务请求。
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java
public RestResponse<QueryStream> makeQueryRequest(String ksql) {
KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties);
Response response = makePostRequest("query", jsonRequest);
if (response.getStatus() == Response.Status.OK.getStatusCode()) {
return RestResponse.successful(new QueryStream(response));
} else {
return RestResponse.erroneous(response.readEntity(KsqlErrorMessage.class));
}
}
private Response makePostRequest(String path, Object jsonEntity) {
try {
return client.target(serverAddress)
.path(path)
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.json(jsonEntity));
} catch (Exception exception) {
throw new KsqlRestClientException("Error issuing POST to KSQL server", exception);
}
}
最终使用java自带的jax-rs: javax.ws.rs.client.Clien调用服务端的rest服务。样例发下:
curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d $'{
"ksql": "SELECT * FROM TEST_STREAM;",
"streamsProperties": {}
}'
2. 服务端代码
当启动ksql服务器时,调用了sql-server-start脚本
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./bin/ksql-server-start
exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.rest.server.KsqlServerMain $EXTRA_ARGS "$@"
这里很直接 ,就是调用KsqlServerMain.main(args)这个java类:
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java
public static void main(final String[] args) {
try {
final ServerOptions serverOptions = ServerOptions.parse(args);
if (serverOptions == null) {
return;
}
final Properties properties = serverOptions.loadProperties(System::getProperties);
final String installDir = properties.getProperty("ksql.server.install.dir");
final Optional<String> queriesFile = serverOptions.getQueriesFile(properties);
final Executable executable = createExecutable(properties, queriesFile, installDir);
new KsqlServerMain(executable).tryStartApp();
} catch (final Exception e) {
log.error("Failed to start KSQL", e);
System.exit(-1);
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private static Executable createExecutable(
final Properties properties,
final Optional<String> queriesFile,
final String installDir
) throws Exception {
if (queriesFile.isPresent()) {
return StandaloneExecutor.create(properties, queriesFile.get(), installDir);
}
if (!properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KSQL_REST_SERVER_DEFAULT_APP_ID);
}
final KsqlRestConfig restConfig = new KsqlRestConfig(properties);
return KsqlRestApplication.buildApplication(
restConfig,
new KsqlVersionCheckerAgent()
);
}
void tryStartApp() throws Exception {
try {
log.info("Starting server");
executable.start();
log.info("Server up and running");
executable.join();
} finally {
log.info("Server shutting down");
executable.stop();
}
}
KsqlServerMain调用KsqlRestApplication.buildApplication构建rest服务器,接着调用tryStartApp启动服务。
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
public static KsqlRestApplication buildApplication(
KsqlRestConfig restConfig,
VersionCheckerAgent versionCheckerAgent
)
throws Exception {
final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
KsqlEngine ksqlEngine = new KsqlEngine(ksqlConfig);
KafkaTopicClient topicClient = ksqlEngine.getTopicClient();
UdfLoader.newInstance(ksqlConfig, ksqlEngine.getMetaStore(), ksqlInstallDir).load();
String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
String commandTopic =
restConfig.getCommandTopic(ksqlServiceId);
ensureCommandTopic(restConfig, topicClient, commandTopic);
Map<String, Expression> commandTopicProperties = new HashMap<>();
commandTopicProperties.put(
DdlConfig.VALUE_FORMAT_PROPERTY,
new StringLiteral("json")
);
commandTopicProperties.put(
DdlConfig.KAFKA_TOPIC_NAME_PROPERTY,
new StringLiteral(commandTopic)
);
ksqlEngine.getDdlCommandExec().execute(new RegisterTopicCommand(new RegisterTopic(
QualifiedName.of(COMMANDS_KSQL_TOPIC_NAME),
false,
commandTopicProperties
)), false);
ksqlEngine.getDdlCommandExec().execute(new CreateStreamCommand(
"statementText",
new CreateStream(
QualifiedName.of(COMMANDS_STREAM_NAME),
Collections.singletonList(new TableElement(
"STATEMENT",
new PrimitiveType(Type.KsqlType.STRING)
)),
false,
Collections.singletonMap(
DdlConfig.TOPIC_NAME_PROPERTY,
new StringLiteral(COMMANDS_KSQL_TOPIC_NAME)
)
),
ksqlEngine.getTopicClient(),
true
), false);
Map<String, Object> commandConsumerProperties = restConfig.getCommandConsumerProperties();
KafkaConsumer<CommandId, Command> commandConsumer = new KafkaConsumer<>(
commandConsumerProperties,
getJsonDeserializer(CommandId.class, true),
getJsonDeserializer(Command.class, false)
);
KafkaProducer<CommandId, Command> commandProducer = new KafkaProducer<>(
restConfig.getCommandProducerProperties(),
getJsonSerializer(true),
getJsonSerializer(false)
);
CommandStore commandStore = new CommandStore(
commandTopic,
commandConsumer,
commandProducer,
new CommandIdAssigner(ksqlEngine.getMetaStore())
);
StatementParser statementParser = new StatementParser(ksqlEngine);
StatementExecutor statementExecutor = new StatementExecutor(
ksqlConfig,
ksqlEngine,
statementParser
);
CommandRunner commandRunner = new CommandRunner(
statementExecutor,
commandStore
);
RootDocument rootDocument = new RootDocument();
StatusResource statusResource = new StatusResource(statementExecutor);
StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlConfig,
ksqlEngine,
statementParser,
restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)
);
KsqlResource ksqlResource = new KsqlResource(
ksqlConfig,
ksqlEngine,
commandStore,
statementExecutor,
restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)
);
commandRunner.processPriorCommands();
return new KsqlRestApplication(
ksqlEngine,
ksqlConfig,
restConfig,
commandRunner,
rootDocument,
statusResource,
streamedQueryResource,
ksqlResource,
versionCheckerAgent
);
}
@Override
public void start() throws Exception {
super.start();
commandRunnerThread.start();
Properties metricsProperties = new Properties();
metricsProperties.putAll(getConfiguration().getOriginals());
if (versionCheckerAgent != null) {
versionCheckerAgent.start(KsqlModuleType.SERVER, metricsProperties);
}
displayWelcomeMessage();
}
@Override
public void setupResources(Configurable<?> config, KsqlRestConfig appConfig) {
config.register(rootDocument);
config.register(new ServerInfoResource(serverInfo));
config.register(statusResource);
config.register(ksqlResource);
config.register(streamedQueryResource);
config.register(new KsqlExceptionMapper());
}
KsqlRestApplication.buildApplication运行过程中,会读 取commandTopic进行恢复,这里不作过多介绍。后续将会详细展开.
接着,KsqlRestApplication.buildApplication加载相关的restful api资源。
随后, KsqlRestApplication.start启动restful服务。
KsqlRestApplication.start首先调用父类Application.start 方法,该application位于另一项目 中:
https://github.com/confluentinc/rest-utils.git
[larluo@larluo-nixos:~/work/git/my-repo/rest-utils]$ cat ./core/src/main/java/io/confluent/rest/Application.java
/**
* Register resources or additional Providers, ExceptionMappers, and other JAX-RS components with
* the Jersey application. This, combined with your Configuration class, is where you can
* customize the behavior of the application.
*/
public abstract void setupResources(Configurable<?> config, T appConfig);
public void start() throws Exception {
if (server == null) {
createServer();
}
server.start();
}
public Server createServer() throws RestConfigException, ServletException {
// The configuration for the JAX-RS REST service
ResourceConfig resourceConfig = new ResourceConfig();
Map<String, String> configuredTags = getConfiguration().getMap(RestConfig.METRICS_TAGS_CONFIG);
Map<String, String> combinedMetricsTags = new HashMap<>(getMetricsTags());
combinedMetricsTags.putAll(configuredTags);
configureBaseApplication(resourceConfig, combinedMetricsTags);
setupResources(resourceConfig, getConfiguration());
// Configure the servlet container
ServletContainer servletContainer = new ServletContainer(resourceConfig);
final FilterHolder servletHolder = new FilterHolder(servletContainer);
server = new Server() {
@Override
protected void doStop() throws Exception {
super.doStop();
Application.this.metrics.close();
Application.this.onShutdown();
Application.this.shutdownLatch.countDown();
}
};
MBeanContainer mbContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
server.addEventListener(mbContainer);
server.addBean(mbContainer);
MetricsListener metricsListener = new MetricsListener(metrics, "jetty", combinedMetricsTags);
List<URI> listeners = parseListeners(config.getList(RestConfig.LISTENERS_CONFIG),
config.getInt(RestConfig.PORT_CONFIG), Arrays.asList("http", "https"), "http");
for (URI listener : listeners) {
log.info("Adding listener: " + listener.toString());
NetworkTrafficServerConnector connector;
if (listener.getScheme().equals("http")) {
connector = new NetworkTrafficServerConnector(server);
} else {
SslContextFactory sslContextFactory = new SslContextFactory();
if (!config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG).isEmpty()) {
sslContextFactory.setKeyStorePath(
config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG)
);
sslContextFactory.setKeyStorePassword(
config.getPassword(RestConfig.SSL_KEYSTORE_PASSWORD_CONFIG).value()
);
sslContextFactory.setKeyManagerPassword(
config.getPassword(RestConfig.SSL_KEY_PASSWORD_CONFIG).value()
);
sslContextFactory.setKeyStoreType(
config.getString(RestConfig.SSL_KEYSTORE_TYPE_CONFIG)
);
if (!config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG).isEmpty()) {
sslContextFactory.setKeyManagerFactoryAlgorithm(
config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG));
}
}
sslContextFactory.setNeedClientAuth(config.getBoolean(RestConfig.SSL_CLIENT_AUTH_CONFIG));
List<String> enabledProtocols = config.getList(RestConfig.SSL_ENABLED_PROTOCOLS_CONFIG);
if (!enabledProtocols.isEmpty()) {
sslContextFactory.setIncludeProtocols(enabledProtocols.toArray(new String[0]));
}
List<String> cipherSuites = config.getList(RestConfig.SSL_CIPHER_SUITES_CONFIG);
if (!cipherSuites.isEmpty()) {
sslContextFactory.setIncludeCipherSuites(cipherSuites.toArray(new String[0]));
}
if (!config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG).isEmpty()) {
sslContextFactory.setEndpointIdentificationAlgorithm(
config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
}
if (!config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG).isEmpty()) {
sslContextFactory.setTrustStorePath(
config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG)
);
sslContextFactory.setTrustStorePassword(
config.getPassword(RestConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG).value()
);
sslContextFactory.setTrustStoreType(
config.getString(RestConfig.SSL_TRUSTSTORE_TYPE_CONFIG)
);
if (!config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG).isEmpty()) {
sslContextFactory.setTrustManagerFactoryAlgorithm(
config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG)
);
}
}
sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROTOCOL_CONFIG));
if (!config.getString(RestConfig.SSL_PROVIDER_CONFIG).isEmpty()) {
sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROVIDER_CONFIG));
}
connector = new NetworkTrafficServerConnector(server, sslContextFactory);
}
connector.addNetworkTrafficListener(metricsListener);
connector.setPort(listener.getPort());
connector.setHost(listener.getHost());
server.addConnector(connector);
}
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
ServletHolder defaultHolder = new ServletHolder("default", DefaultServlet.class);
defaultHolder.setInitParameter("dirAllowed", "false");
ResourceCollection staticResources = getStaticResources();
if (staticResources != null) {
context.setBaseResource(staticResources);
}
configureSecurityHandler(context);
List<String> unsecurePaths = config.getList(RestConfig.AUTHENTICATION_SKIP_PATHS);
setUnsecurePathConstraints(context, unsecurePaths);
String allowedOrigins = getConfiguration().getString(
RestConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG
);
if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
FilterHolder filterHolder = new FilterHolder(CrossOriginFilter.class);
filterHolder.setName("cross-origin");
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
String allowedMethods = getConfiguration().getString(
RestConfig.ACCESS_CONTROL_ALLOW_METHODS
);
if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) {
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
}
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}
configurePreResourceHandling(context);
context.addFilter(servletHolder, "/*", null);
configurePostResourceHandling(context);
context.addServlet(defaultHolder, "/*");
RequestLogHandler requestLogHandler = new RequestLogHandler();
requestLogHandler.setRequestLog(requestLog);
HandlerCollection handlers = new HandlerCollection();
handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
/* Needed for graceful shutdown as per `setStopTimeout` documentation */
StatisticsHandler statsHandler = new StatisticsHandler();
statsHandler.setHandler(handlers);
final ServletContextHandler webSocketServletContext =
new ServletContextHandler(ServletContextHandler.SESSIONS);
webSocketServletContext.setContextPath(
config.getString(RestConfig.WEBSOCKET_PATH_PREFIX_CONFIG)
);
final ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[] {
statsHandler,
webSocketServletContext
});
server.setHandler(wrapWithGzipHandler(contexts));
ServerContainer container =
WebSocketServerContainerInitializer.configureContext(webSocketServletContext);
registerWebSocketEndpoints(container);
int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG);
if (gracefulShutdownMs > 0) {
server.setStopTimeout(gracefulShutdownMs);
}
server.setStopAtShutdown(true);
return server;
}
Application.start()启动过程 中会调用子类KsqlRestApplication.setupResource加载资源路径.
对于查询请求,对应的资源为StreamedQueryResource.
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response streamQuery(KsqlRequest request) throws Exception {
String ksql = request.getKsql();
Statement statement;
if (ksql == null) {
return Errors.badRequest("\"ksql\" field must be given");
}
Map<String, Object> clientLocalProperties =
Optional.ofNullable(request.getStreamsProperties()).orElse(Collections.emptyMap());
try {
statement = statementParser.parseSingleStatement(ksql);
} catch (IllegalArgumentException | KsqlException e) {
return Errors.badRequest(e);
}
if (statement instanceof Query) {
QueryStreamWriter queryStreamWriter;
try {
queryStreamWriter = new QueryStreamWriter(
ksqlConfig,
ksqlEngine,
disconnectCheckInterval,
ksql,
clientLocalProperties,
objectMapper);
} catch (KsqlException e) {
return Errors.badRequest(e);
}
log.info("Streaming query '{}'", ksql);
return Response.ok().entity(queryStreamWriter).build();
} else if (statement instanceof PrintTopic) {
TopicStreamWriter topicStreamWriter = getTopicStreamWriter(
clientLocalProperties,
(PrintTopic) statement
);
return Response.ok().entity(topicStreamWriter).build();
}
return Errors.badRequest(String .format(
"Statement type `%s' not supported for this resource",
statement.getClass().getName()));
}
StreamedQueryResource.streamQuery判断为Query类型,构建 QueryStreamWriter对象进行处理.
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java
QueryStreamWriter(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final long disconnectCheckInterval,
final String queryString,
final Map<String, Object> overriddenProperties,
final ObjectMapper objectMapper
) throws Exception {
QueryMetadata queryMetadata =
ksqlEngine.buildMultipleQueries(
queryString, ksqlConfig, overriddenProperties).get(0);
this.objectMapper = objectMapper;
if (!(queryMetadata instanceof QueuedQueryMetadata)) {
throw new Exception(String.format(
"Unexpected metadata type: expected QueuedQueryMetadata, found %s instead",
queryMetadata.getClass()
));
}
this.disconnectCheckInterval = disconnectCheckInterval;
this.queryMetadata = ((QueuedQueryMetadata) queryMetadata);
this.queryMetadata.setLimitHandler(new LimitHandler());
this.queryMetadata.getKafkaStreams().setUncaughtExceptionHandler(new StreamsExceptionHandler());
this.ksqlEngine = ksqlEngine;
queryMetadata.getKafkaStreams().start();
}
QueryStreamWriter.ctor调用KsqlEngine.buildMultipleQueries构建kafka streams dsl后,运行kafka streams进行查询处理.
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
public List<QueryMetadata> buildMultipleQueries(
final String queriesString,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties) {
for (String property : overriddenProperties.keySet()) {
if (IMMUTABLE_PROPERTIES.contains(property)) {
throw new IllegalArgumentException(
String.format("Cannot override property '%s'", property)
);
}
}
// Multiple queries submitted as the same time should success or fail as a whole,
// Thus we use tempMetaStore to store newly created tables, streams or topics.
// MetaStore tempMetaStore = new MetaStoreImpl(metaStore);
MetaStore tempMetaStore = metaStore.clone();
// Build query AST from the query string
List<Pair<String, Statement>> queries = parseQueries(
queriesString,
tempMetaStore
);
return planQueries(queries, ksqlConfig, overriddenProperties, tempMetaStore);
}
List<Pair<String, Statement>> parseQueries(
final String queriesString,
final MetaStore tempMetaStore
) {
try {
MetaStore tempMetaStoreForParser = tempMetaStore.clone();
// Parse and AST creation
KsqlParser ksqlParser = new KsqlParser();
List<SqlBaseParser.SingleStatementContext> parsedStatements
= ksqlParser.getStatements(queriesString);
List<Pair<String, Statement>> queryList = new ArrayList<>();
for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
Pair<Statement, DataSourceExtractor> statementInfo = ksqlParser.prepareStatement(
singleStatementContext,
tempMetaStoreForParser
);
Statement statement = statementInfo.getLeft();
if (StatementRewriteForStruct.requiresRewrite(statement)) {
statement = new StatementRewriteForStruct(
statement,
statementInfo.getRight())
.rewriteForStruct();
}
Pair<String, Statement> queryPair =
buildSingleQueryAst(
statement,
getStatementString(singleStatementContext),
tempMetaStore,
tempMetaStoreForParser
);
if (queryPair != null) {
queryList.add(queryPair);
}
}
return queryList;
} catch (Exception e) {
throw new ParseFailedException("Exception while processing statements :" + e.getMessage(), e);
}
}
private Pair<String, Statement> buildSingleQueryAst(
final Statement statement,
final String statementString,
final MetaStore tempMetaStore,
final MetaStore tempMetaStoreForParser
) {
log.info("Building AST for {}.", statementString);
if (statement instanceof Query) {
return new Pair<>(statementString, statement);
} else if (statement instanceof CreateAsSelect) {
CreateAsSelect createAsSelect = (CreateAsSelect) statement;
QuerySpecification querySpecification =
(QuerySpecification) createAsSelect.getQuery().getQueryBody();
Query query = addInto(
createAsSelect.getQuery(),
querySpecification,
createAsSelect.getName().getSuffix(),
createAsSelect.getProperties(),
createAsSelect.getPartitionByColumn(),
true
);
tempMetaStoreForParser.putSource(
queryEngine.getResultDatasource(
querySpecification.getSelect(),
createAsSelect.getName().getSuffix()
).cloneWithTimeKeyColumns());
return new Pair<>(statementString, query);
} else if (statement instanceof InsertInto) {
InsertInto insertInto = (InsertInto) statement;
if (tempMetaStoreForParser.getSource(insertInto.getTarget().getSuffix()) == null) {
throw new KsqlException(String.format("Sink, %s, does not exist for the INSERT INTO "
+ "statement.", insertInto.getTarget().getSuffix()));
}
if (tempMetaStoreForParser.getSource(insertInto.getTarget().getSuffix()).getDataSourceType()
!= DataSource.DataSourceType.KSTREAM) {
throw new KsqlException(String.format("INSERT INTO can only be used to insert into a "
+ "stream. %s is a table.",
insertInto.getTarget().getSuffix()));
}
QuerySpecification querySpecification =
(QuerySpecification) insertInto.getQuery().getQueryBody();
Query query = addInto(
insertInto.getQuery(),
querySpecification,
insertInto.getTarget().getSuffix(),
new HashMap<>(),
insertInto.getPartitionByColumn(),
false
);
return new Pair<>(statementString, query);
} else if (statement instanceof DdlStatement) {
return buildSingleDdlStatement(statement,
statementString,
tempMetaStore,
tempMetaStoreForParser);
}
return null;
}
private List<QueryMetadata> planQueries(
final List<Pair<String, Statement>> statementList,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties,
final MetaStore tempMetaStore
) {
// Logical plan creation from the ASTs
List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
tempMetaStore,
statementList,
ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
);
// Physical plan creation from logical plans.
List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
logicalPlans,
statementList,
ksqlConfig,
overriddenProperties,
clientSupplier,
true
);
for (QueryMetadata queryMetadata : runningQueries) {
if (queryMetadata instanceof PersistentQueryMetadata) {
livePersistentQueries.add(queryMetadata);
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
persistentQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
metaStore.updateForPersistentQuery(persistentQueryMetadata.getQueryId().getId(),
persistentQueryMetadata.getSourceNames(),
persistentQueryMetadata.getSinkNames());
}
allLiveQueries.add(queryMetadata);
}
return runningQueries;
}
public QueryMetadata getQueryExecutionPlan(final Query query, final KsqlConfig ksqlConfig) {
// Logical plan creation from the ASTs
List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
metaStore,
Collections.singletonList(new Pair<>("", query)),
ksqlConfig);
// Physical plan creation from logical plans.
List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
logicalPlans,
Collections.singletonList(new Pair<>("", query)),
ksqlConfig,
Collections.emptyMap(),
clientSupplier,
false
);
return runningQueries.get(0);
}
KsqlEngine.buildMultipleQueries首先调用KsqlEngine.parseQueries生成AST,接着根据 AST调用KsqlEngine.planQueries生成KakaStreams。
首先来看KsqlEngine.parseQueries
List<Pair<String, Statement>> parseQueries(
final String queriesString,
final MetaStore tempMetaStore
) {
try {
MetaStore tempMetaStoreForParser = tempMetaStore.clone();
// Parse and AST creation
KsqlParser ksqlParser = new KsqlParser();
List<SqlBaseParser.SingleStatementContext> parsedStatements
= ksqlParser.getStatements(queriesString);
List<Pair<String, Statement>> queryList = new ArrayList<>();
for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
Pair<Statement, DataSourceExtractor> statementInfo = ksqlParser.prepareStatement(
singleStatementContext,
tempMetaStoreForParser
);
Statement statement = statementInfo.getLeft();
if (StatementRewriteForStruct.requiresRewrite(statement)) {
statement = new StatementRewriteForStruct(
statement,
statementInfo.getRight())
.rewriteForStruct();
}
Pair<String, Statement> queryPair =
buildSingleQueryAst(
statement,
getStatementString(singleStatementContext),
tempMetaStore,
tempMetaStoreForParser
);
if (queryPair != null) {
queryList.add(queryPair);
}
}
return queryList;
} catch (Exception e) {
throw new ParseFailedException("Exception while processing statements :" + e.getMessage(), e);
}
}
这里调用KsqlParser.getStatements进行语句解析:
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-parser/src/main/java/io/confluent/ksql/parser/KsqlParser.java
import org.antlr.v4.runtime.ANTLRInputStream
public List<SqlBaseParser.SingleStatementContext> getStatements(String sql) {
try {
ParserRuleContext tree = getParseTree(sql);
SqlBaseParser.StatementsContext statementsContext = (SqlBaseParser.StatementsContext) tree;
return statementsContext.singleStatement();
} catch (Exception e) {
throw new ParseFailedException(e.getMessage(), e);
}
}
private ParserRuleContext getParseTree(String sql) {
SqlBaseLexer
sqlBaseLexer =
new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);
sqlBaseLexer.removeErrorListeners();
sqlBaseLexer.addErrorListener(ERROR_LISTENER);
sqlBaseParser.removeErrorListeners();
sqlBaseParser.addErrorListener(ERROR_LISTENER);
Function<SqlBaseParser, ParserRuleContext> parseFunction = SqlBaseParser::statements;
ParserRuleContext tree;
try {
// first, try parsing with potentially faster SLL mode
sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
tree = parseFunction.apply(sqlBaseParser);
} catch (ParseCancellationException ex) {
// if we fail, parse with LL mode
tokenStream.reset(); // rewind input stream
sqlBaseParser.reset();
sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
tree = parseFunction.apply(sqlBaseParser);
}
return tree;
}
KsqlParser.getStatements 接着调用antlr的SqlBaseParser::statements进行解析:
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4
grammar SqlBase;
tokens {
DELIMITER
}
statements
: (singleStatement)* EOF
;
singleStatement
: statement ';'
;
singleExpression
: expression EOF
;
statement
: query #querystatement
| (LIST | SHOW) PROPERTIES #listProperties
| (LIST | SHOW) TOPICS #listTopics
| (LIST | SHOW) REGISTERED TOPICS #listRegisteredTopics
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
| DESCRIBE EXTENDED? (qualifiedName | TOPIC qualifiedName) #showColumns
| DESCRIBE FUNCTION qualifiedName #describeFunction
| PRINT (qualifiedName | STRING) (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)? #printTopic
| (LIST | SHOW) QUERIES EXTENDED? #listQueries
| TERMINATE QUERY? qualifiedName #terminateQuery
| SET STRING EQ STRING #setProperty
| UNSET STRING #unsetProperty
| LOAD expression #loadProperties
| REGISTER TOPIC (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? #registerTopic
| CREATE STREAM (IF NOT EXISTS)? qualifiedName
('(' tableElement (',' tableElement)* ')')?
(WITH tableProperties)? #createStream
| CREATE STREAM (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? AS query
(PARTITION BY identifier)? #createStreamAs
| CREATE TABLE (IF NOT EXISTS)? qualifiedName
('(' tableElement (',' tableElement)* ')')?
(WITH tableProperties)? #createTable
| CREATE TABLE (IF NOT EXISTS)? qualifiedName
(WITH tableProperties)? AS query #createTableAs
| INSERT INTO qualifiedName query (PARTITION BY identifier)? #insertInto
| DROP TOPIC (IF EXISTS)? qualifiedName #dropTopic
| DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropStream
| DROP TABLE (IF EXISTS)? qualifiedName (DELETE TOPIC)? #dropTable
| EXPLAIN ANALYZE?
('(' explainOption (',' explainOption)* ')')? (statement | qualifiedName) #explain
| EXPORT CATALOG TO STRING #exportCatalog
| RUN SCRIPT STRING #runScript
;
query
: queryNoWith
;
queryNoWith:
queryTerm
(LIMIT limit=(INTEGER_VALUE | ALL))?
;
queryTerm
: queryPrimary #queryTermDefault
;
queryPrimary
: querySpecification #queryPrimaryDefault
| TABLE qualifiedName #table
| VALUES expression (',' expression)* #inlineTable
| '(' queryNoWith ')' #subquery
;
querySpecification
: SELECT STREAM? selectItem (',' selectItem)*
(INTO into=relationPrimary)?
(FROM from=relation (',' relation)*)?
(WINDOW windowExpression)?
(WHERE where=booleanExpression)?
(GROUP BY groupBy)?
(HAVING having=booleanExpression)?
;
解析成功后对于每个语句通过KsqlEngine.buildSingleQueryAst构建 Pair<String, Statement>.
接着查看planQueries:
private List<QueryMetadata> planQueries(
final List<Pair<String, Statement>> statementList,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties,
final MetaStore tempMetaStore
) {
// Logical plan creation from the ASTs
List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
tempMetaStore,
statementList,
ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
);
// Physical plan creation from logical plans.
List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
logicalPlans,
statementList,
ksqlConfig,
overriddenProperties,
clientSupplier,
true
);
for (QueryMetadata queryMetadata : runningQueries) {
if (queryMetadata instanceof PersistentQueryMetadata) {
livePersistentQueries.add(queryMetadata);
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
persistentQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
metaStore.updateForPersistentQuery(persistentQueryMetadata.getQueryId().getId(),
persistentQueryMetadata.getSourceNames(),
persistentQueryMetadata.getSinkNames());
}
allLiveQueries.add(queryMetadata);
}
return runningQueries;
}
这里KsqlEngine.planQueries通过QueryEngine.buildLogicalPlans及QueryEngine.buildPhysicalPlans构造逻辑计划以及物理计划,其中物理计划生成 kafka streams dsl。
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
List<Pair<String, PlanNode>> buildLogicalPlans(
final MetaStore metaStore,
final List<Pair<String, Statement>> statementList,
final KsqlConfig config) {
List<Pair<String, PlanNode>> logicalPlansList = new ArrayList<>();
// TODO: the purpose of tempMetaStore here
MetaStore tempMetaStore = metaStore.clone();
for (Pair<String, Statement> statementQueryPair : statementList) {
if (statementQueryPair.getRight() instanceof Query) {
PlanNode logicalPlan = buildQueryLogicalPlan(
statementQueryPair.getLeft(),
(Query) statementQueryPair.getRight(),
tempMetaStore, config
);
logicalPlansList.add(new Pair<>(statementQueryPair.getLeft(), logicalPlan));
} else {
logicalPlansList.add(new Pair<>(statementQueryPair.getLeft(), null));
}
log.info("Build logical plan for {}.", statementQueryPair.getLeft());
}
return logicalPlansList;
}
private PlanNode buildQueryLogicalPlan(
final String sqlExpression,
final Query query,
final MetaStore tempMetaStore,
final KsqlConfig config) {
final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(
tempMetaStore,
ksqlEngine.getFunctionRegistry(),
config
);
final Analysis analysis = queryAnalyzer.analyze(sqlExpression, query);
final AggregateAnalysis aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
final PlanNode logicalPlan
= new LogicalPlanner(analysis, aggAnalysis, ksqlEngine.getFunctionRegistry()).buildPlan();
if (logicalPlan instanceof KsqlStructuredDataOutputNode) {
KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
(KsqlStructuredDataOutputNode) logicalPlan;
StructuredDataSource
structuredDataSource =
new KsqlStream(
sqlExpression,
ksqlStructuredDataOutputNode.getId().toString(),
ksqlStructuredDataOutputNode.getSchema(),
ksqlStructuredDataOutputNode.getKeyField(),
ksqlStructuredDataOutputNode.getTimestampExtractionPolicy(),
ksqlStructuredDataOutputNode.getKsqlTopic()
);
if (analysis.isDoCreateInto()) {
tempMetaStore.putTopic(ksqlStructuredDataOutputNode.getKsqlTopic());
tempMetaStore.putSource(structuredDataSource.cloneWithTimeKeyColumns());
}
}
return logicalPlan;
}
List<QueryMetadata> buildPhysicalPlans(
final List<Pair<String, PlanNode>> logicalPlans,
final List<Pair<String, Statement>> statementList,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties,
final KafkaClientSupplier clientSupplier,
final boolean updateMetastore
) {
List<QueryMetadata> physicalPlans = new ArrayList<>();
for (int i = 0; i < logicalPlans.size(); i++) {
Pair<String, PlanNode> statementPlanPair = logicalPlans.get(i);
if (statementPlanPair.getRight() == null) {
Statement statement = statementList.get(i).getRight();
if (!(statement instanceof DdlStatement)) {
throw new KsqlException("expecting a statement implementing DDLStatement but got: "
+ statement.getClass());
}
handleDdlStatement(
statementPlanPair.getLeft(),
(DdlStatement) statement
);
} else {
buildQueryPhysicalPlan(
physicalPlans, statementPlanPair, ksqlConfig,
overriddenProperties, clientSupplier, updateMetastore
);
}
}
return physicalPlans;
}
private void buildQueryPhysicalPlan(
final List<QueryMetadata> physicalPlans,
final Pair<String, PlanNode> statementPlanPair,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties,
final KafkaClientSupplier clientSupplier,
final boolean updateMetastore
) {
final StreamsBuilder builder = new StreamsBuilder();
// Build a physical plan, in this case a Kafka Streams DSL
final PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(
builder,
ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties),
ksqlEngine.getTopicClient(),
ksqlEngine.getFunctionRegistry(),
overriddenProperties,
updateMetastore,
ksqlEngine.getMetaStore(),
ksqlEngine.getSchemaRegistryClient(),
ksqlEngine.getQueryIdGenerator(),
new KafkaStreamsBuilderImpl(clientSupplier)
);
physicalPlans.add(physicalPlanBuilder.buildPhysicalPlan(statementPlanPair));
}
QueryEngine.buildLogicalPlans主要由QueryAnalyzer实现,进行AstVisitor生成 Analysis,这里不做进一步介绍,后续将会详细讲解...
QueryEngine.buildPhysicalPlans主要由PhysicalPlanBuilder.buildPhysicalPlan实现
PhysicalPlanBuilder.buildQueryPhysicalPlan中构建了KafkaStreamsBuilderImpl对象后续使用。
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java
public QueryMetadata buildPhysicalPlan(final Pair<String, PlanNode> statementPlanPair) {
final SchemaKStream resultStream = statementPlanPair
.getRight()
.buildStream(
builder,
ksqlConfig,
kafkaTopicClient,
functionRegistry,
overriddenStreamsProperties,
schemaRegistryClient
);
final OutputNode outputNode = resultStream.outputNode();
boolean isBareQuery = outputNode instanceof KsqlBareOutputNode;
// Check to make sure the logical and physical plans match up;
// important to do this BEFORE actually starting up
// the corresponding Kafka Streams job
if (isBareQuery && !(resultStream instanceof QueuedSchemaKStream)) {
throw new KsqlException(String.format(
"Mismatch between logical and physical output; "
+ "expected a QueuedSchemaKStream based on logical "
+ "KsqlBareOutputNode, found a %s instead",
resultStream.getClass().getCanonicalName()
));
}
String serviceId = getServiceId();
String persistanceQueryPrefix =
ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG);
String transientQueryPrefix =
ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);
if (isBareQuery) {
return buildPlanForBareQuery(
(QueuedSchemaKStream) resultStream,
(KsqlBareOutputNode) outputNode,
serviceId,
transientQueryPrefix,
statementPlanPair.getLeft()
);
} else if (outputNode instanceof KsqlStructuredDataOutputNode) {
KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
(KsqlStructuredDataOutputNode) outputNode;
ksqlStructuredDataOutputNode = ksqlStructuredDataOutputNode.cloneWithDoCreateInto(
((KsqlStructuredDataOutputNode) statementPlanPair.getRight()).isDoCreateInto()
);
return buildPlanForStructuredOutputNode(
statementPlanPair.getLeft(),
resultStream,
ksqlStructuredDataOutputNode,
serviceId,
persistanceQueryPrefix,
statementPlanPair.getLeft());
} else {
throw new KsqlException(
"Sink data source of type: "
+ outputNode.getClass()
+ " is not supported.");
}
}
private QueryMetadata buildPlanForBareQuery(
final QueuedSchemaKStream schemaKStream,
final KsqlBareOutputNode bareOutputNode,
final String serviceId,
final String transientQueryPrefix,
final String statement
) {
final String applicationId = addTimeSuffix(getBareQueryApplicationId(
serviceId,
transientQueryPrefix
));
KafkaStreams streams = buildStreams(
bareOutputNode,
builder,
applicationId,
ksqlConfig,
overriddenStreamsProperties
);
SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);
return new QueuedQueryMetadata(
statement,
streams,
bareOutputNode,
schemaKStream.getExecutionPlan(""),
schemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable)
? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM,
applicationId,
kafkaTopicClient,
builder.build(),
overriddenStreamsProperties
);
}
private KafkaStreams buildStreams(
final OutputNode outputNode,
final StreamsBuilder builder,
final String applicationId,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
) {
final Map<String, Object> newStreamsProperties
= new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
newStreamsProperties.putAll(overriddenProperties);
newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
updateListProperty(
newStreamsProperties,
StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
ConsumerCollector.class.getCanonicalName()
);
updateListProperty(
newStreamsProperties,
StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
ProducerCollector.class.getCanonicalName()
);
return kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
}
这里PhysicalPlanBuilder.buildPhysicalPlan调用标准输出形式 的PhysicalPlanBuilder.buildPlanForBareQuery,进一步调用KafkaStreamsBuilderImpl.buildKafkaStreams生成KafkaStreams
[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/physical/KafkaStreamsBuilderImpl.java
public KafkaStreams buildKafkaStreams(StreamsBuilder builder, StreamsConfig conf) {
return new KafkaStreams(builder.build(), conf, clientSupplier);
}
最终QueryStreamWriter.ctor运行构建 好的kafka streams dsl完成 实时查询。。。
网友评论