美文网首页Flink源码阅读系列
Flink源码阅读之Sql-Client的执行原理

Flink源码阅读之Sql-Client的执行原理

作者: 〇白衣卿相〇 | 来源:发表于2020-12-03 10:49 被阅读0次

    前言

    sql-cli相信大家都用过,通过sql-client.sh embedded启动就会进入交互界面,每条sql都可以单独执行。在功能调试时非常方便,还有进入界面的那个大松鼠相当可爱。

    脚本

    先上脚本代码

    #!/usr/bin/env bash
    ################################################################################
    #  Licensed to the Apache Software Foundation (ASF) under one
    #  or more contributor license agreements.  See the NOTICE file
    #  distributed with this work for additional information
    #  regarding copyright ownership.  The ASF licenses this file
    #  to you under the Apache License, Version 2.0 (the
    #  "License"); you may not use this file except in compliance
    #  with the License.  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    # limitations under the License.
    ################################################################################
    
    ################################################################################
    # Adopted from "flink" bash script
    ################################################################################
    
    target="$0"
    # For the case, the executable has been directly symlinked, figure out
    # the correct bin path by following its symlink up to an upper bound.
    # Note: we can't use the readlink utility here if we want to be POSIX
    # compatible.
    iteration=0
    while [ -L "$target" ]; do
        if [ "$iteration" -gt 100 ]; then
            echo "Cannot resolve path: You have a cyclic symlink in $target."
            break
        fi
        ls=`ls -ld -- "$target"`
        target=`expr "$ls" : '.* -> \(.*\)$'`
        iteration=$((iteration + 1))
    done
    
    # Convert relative path to absolute path
    bin=`dirname "$target"`
    
    # get flink config
    . "$bin"/config.sh
    
    if [ "$FLINK_IDENT_STRING" = "" ]; then
            FLINK_IDENT_STRING="$USER"
    fi
    
    CC_CLASSPATH=`constructFlinkClassPath`
    
    ################################################################################
    # SQL client specific logic
    ################################################################################
    
    log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
    log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
    
    # get path of jar in /opt if it exist
    FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
    
    # add flink-python jar to the classpath
    if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then
        FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar")
        if [ -n "$FLINK_PYTHON_JAR" ]; then
            CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR"
        fi
    fi
    
    # check if SQL client is already in classpath and must not be shipped manually
    if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then
    
        # start client without jar
        exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@"
    
    # check if SQL client jar is in /opt
    elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
    
        # start client with jar
        exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`"
    
    # write error message to stderr
    else
        (>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.")
    
        # exit to force process failure
        exit 1
    fi
    

    内容比较简单,直接看最后启动命令调了那个类,可以看到是SqlClient。接下来就可以进入代码了。

    代码

    main方法:

    public static void main(String[] args) {
            if (args.length < 1) {
                CliOptionsParser.printHelpClient();
                return;
            }
    
            switch (args[0]) {
    
                case MODE_EMBEDDED:
                    // remove mode
                    final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
                    final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
                    if (options.isPrintHelp()) {
                        CliOptionsParser.printHelpEmbeddedModeClient();
                    } else {
                        try {
                            final SqlClient client = new SqlClient(true, options);
                            client.start();
                        } catch (SqlClientException e) {
                            // make space in terminal
                            System.out.println();
                            System.out.println();
                            LOG.error("SQL Client must stop.", e);
                            throw e;
                        } catch (Throwable t) {
                            // make space in terminal
                            System.out.println();
                            System.out.println();
                            LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
                            throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
                        }
                    }
                    break;
    
                case MODE_GATEWAY:
                    throw new SqlClientException("Gateway mode is not supported yet.");
    
                default:
                    CliOptionsParser.printHelpClient();
            }
        }
    

    目前只支持embedded模式,后面还支持一些参数。对参数做解析,然后启动SqlClient。在start方法中主要做几件事:

    1. 根据配置加载一些依赖
    2. 启动gataway,用来和其他系统交互
    3. 构造environment
    4. 添加hook在程序结束时做一些事情
    5. 正式开始界面操作
    private void start() {
            if (isEmbedded) {
                // create local executor with default environment
                final List<URL> jars;
                if (options.getJars() != null) {
                    jars = options.getJars();
                } else {
                    jars = Collections.emptyList();
                }
                final List<URL> libDirs;
                if (options.getLibraryDirs() != null) {
                    libDirs = options.getLibraryDirs();
                } else {
                    libDirs = Collections.emptyList();
                }
                final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
                executor.start();
    
                // create CLI client with session environment
                final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
                appendPythonConfig(sessionEnv, options.getPythonConfiguration());
                final SessionContext context;
                if (options.getSessionId() == null) {
                    context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
                } else {
                    context = new SessionContext(options.getSessionId(), sessionEnv);
                }
    
                // Open an new session
                String sessionId = executor.openSession(context);
                try {
                    // add shutdown hook
                    Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
    
                    // do the actual work
                    openCli(sessionId, executor);
                } finally {
                    executor.closeSession(sessionId);
                }
            } else {
                throw new SqlClientException("Gateway mode is not supported yet.");
            }
        }
    

    执行sql语句是借助于CliClient

    private void openCli(String sessionId, Executor executor) {
            CliClient cli = null;
            try {
                Path historyFilePath;
                if (options.getHistoryFilePath() != null) {
                    historyFilePath = Paths.get(options.getHistoryFilePath());
                } else {
                    historyFilePath = Paths.get(System.getProperty("user.home"),
                            SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
                }
                cli = new CliClient(sessionId, executor, historyFilePath);
                // interactive CLI mode
                if (options.getUpdateStatement() == null) {
                    cli.open();
                }
                // execute single update statement
                else {
                    final boolean success = cli.submitUpdate(options.getUpdateStatement());
                    if (!success) {
                        throw new SqlClientException("Could not submit given SQL update statement to cluster.");
                    }
                }
            } finally {
                if (cli != null) {
                    cli.close();
                }
            }
        }
    

    在open方法中接受sql-cli界面输入的sql语句进行解析,以分号作为一条sql的结束,借助SqlCommandParser对命令做解析,根据不同命令做不同处理。

    open() {
            isRunning = true;
    
            // print welcome
            terminal.writer().append(CliStrings.MESSAGE_WELCOME);
    
            // begin reading loop
            while (isRunning) {
                // make some space to previous command
                terminal.writer().append("\n");
                terminal.flush();
    
                final String line;
                try {
                    line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
                } catch (UserInterruptException e) {
                    // user cancelled line with Ctrl+C
                    continue;
                } catch (EndOfFileException | IOError e) {
                    // user cancelled application with Ctrl+D or kill
                    break;
                } catch (Throwable t) {
                    throw new SqlClientException("Could not read from command line.", t);
                }
                if (line == null) {
                    continue;
                }
                final Optional<SqlCommandCall> cmdCall = parseCommand(line);
                cmdCall.ifPresent(this::callCommand);
            }
        }
    

    callCommand这个方法比较长,就是根据不同的sql执行不同的操作
    比如create table的sql调用的是callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);方法
    最终会调用TableEnvironment#executeSql方法。
    insert或insert overwrite调用的是callInsert(cmdCall)最终会调用TableEnvironment#sqlUpdate,这个方法已经过时。
    其他的sql都是类似,感兴趣的可以跟代码进去看,再往后就是跟sql程序一样了,sql验证→转换→优化→翻译成transformation→提交执行。可以看Flink源码阅读之Flinksql执行流程这篇文章。

    相关文章

      网友评论

        本文标题:Flink源码阅读之Sql-Client的执行原理

        本文链接:https://www.haomeiwen.com/subject/djtewktx.html