Flink SQL-Client 的使用

2019-05-08 14:54

    flink sql client 介绍

    The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

    flink sql-client 是一种实用的工具,方便 flink 开发人员编写,调试,提交实时table代码, 不用编写 JavaScala代码。同时在 sql-client 上能够可视化的看到实时统计的 retractappend 结果。

    部署环境 (单机)

    • java 1.8
    • zookeeper 3.4.13
    • kafka 0.11
    • flink 1.6

    1. 启动 zookeeper
    yizhou@pro:~$ zkServer start
    ZooKeeper JMX enabled by default
    Using config: /usr/local/etc/zookeeper/zoo.cfg
    Starting zookeeper ... STARTED
    yizhou@pro:~$ zkServer status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/etc/zookeeper/zoo.cfg
    Mode: standalone
    1. 启动 kafka
    yizhou@pro:${KAFKA_HOME}$ nohup bin/kafka-server-start.sh config/server.properties &
    [1] 70358
    # 查看 topic
    yizhou@pro:${KAFKA_HOME}$ bin/kafka-topics.sh --list --zookeeper localhost:2181
    # 往 order_sql 这个 topic发送 json 消息
    yizhou@pro:${KAFKA_HOME}$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_sql
    >{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "1556420980000"}
    >{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "1556421040000"}
    >{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "1556421100000"}
    >{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "1556421120000"}
    >{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "1556421480000"}
    >{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "1556421510000"}
    >{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "1556421570000"}
    >{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "1556421630000"}
    >{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "1556421655000"}
    1. 启动 flink
    yizhou@pro:${FLINK_HOME}/bin$ ./start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host pro.local.
    Starting taskexecutor daemon on host pro.local.

    注册 kafka 消息为 flink sql 的动态表

    kafka消息映射为flink sql 的动态表,是非常重要的操作。整个操作通过配置 ${FLINK_HOME}/conf 目录下的 yaml 文件实现。以下例子是将 kafkatopic: order_sql, 映射为 table

    # Table Sources
    # Define table sources and sinks here.
    tables: # empty list
    # A typical table source definition looks like:
     - name: orders
       type: source
       update-mode: append
          property-version: 1
          type: kafka
          version: 0.11
          topic: order_sql
          startup-mode: earliest-offset
          - key: zookeeper.connect
            value: localhost:2181
          - key: bootstrap.servers
            value: localhost:9092
          - key: group.id
            value: test-consumer-group
          property-version: 1
          type: json
          schema: "ROW(order_id LONG, shop_id VARCHAR, member_id LONG, trade_amt DOUBLE, pay_time TIMESTAMP)"
          - name: order_id
            type: LONG
          - name: shop_id
            type: VARCHAR
          - name: member_id
            type: LONG
          - name: trade_amt
            type: DOUBLE
          - name: payment_time
            type: TIMESTAMP
                type: "from-field"
                from: "pay_time"
                type: "periodic-bounded"
                delay: "60000"
    # User-defined functions
    # Define scalar, aggregate, or table functions here.
    functions: [] # empty list
    # A typical function definition looks like:
    # - name: ...
    #   from: class
    #   class: ...
    #   constructor: ...
    # Execution properties
    # Execution properties allow for changing the behavior of a table program.
      # 'batch' or 'streaming' execution
      type: streaming
      # allow 'event-time' or only 'processing-time' in sources
      time-characteristic: event-time
      # interval in ms for emitting periodic watermarks
      periodic-watermarks-interval: 200
      # 'changelog' or 'table' presentation of results
      result-mode: table
      # maximum number of maintained rows in 'table' presentation of results
      max-table-result-rows: 1000000
      # parallelism of the program
      parallelism: 1
      # maximum parallelism
      max-parallelism: 128
      # minimum idle state retention in ms
      min-idle-state-retention: 3600000
      # maximum idle state retention in ms
      max-idle-state-retention: 7200000
    # Deployment properties
    # Deployment properties allow for describing the cluster to which table
    # programs are submitted to.
      # general cluster communication timeout in ms
      response-timeout: 5000
      # (optional) address from cluster to gateway
      gateway-address: ""
      # (optional) port from cluster to gateway
      gateway-port: 0

    从配置文件中可以看出,flink 中注册的表名为 orders,作为source的数据源表,append的方式不断添加。以 0.11 版本的 kafka 的作为 connector, topic 信息、消息偏移量、zookeeper 地址、broker 地址、消费组信息都写入配置。同时 kafka 消息的 json 消息如何映射到 flink sql tableshecma,选择pay_time 作为 event-timewatermark 设置为60s。

    启动 flink sql client

    命令行启动 bin/sql-client.sh embedded -d conf/sql.my.yaml -l sql-libs/。 其中 sql.my.yaml 是上述的 yaml 配置文件名称。${FLINK_HOME}/sql-libs 目录下需要提前下载 flink-connector-kafka-0.11flink-json-1.6.1-sql-jar.jar 两个jar包

    yizhou@pro:${FLINK_HOME}$ bin/sql-client.sh embedded -d conf/sql.my.yaml -l sql-libs/
    Reading default environment from: file:/usr/local/Cellar/apache-flink/1.6.2/libexec/conf/sql.my.yaml
    No session environment specified.
                                ▓███▓░░        ▒▒▒▓██▒  ▒
                              ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                              ██▒         ░▒▓███▒    ▒█▒█▒
                                ░▓█            ███   ▓░▒██
                                  ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                                █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                                ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                             ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                       ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                      ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                    ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
                   ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
                  ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
               ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
               ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
               ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
               ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
              ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
              █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
              ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
              ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
               ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
               ▓█   ▒█▓   ░     █░                ▒█              █▓
                █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
                 █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
                  ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
                   ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                    ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                      ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                          ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
        ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
       |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
       | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
       |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
       | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
       |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
            Welcome! Enter HELP to list all available commands. QUIT to exit.
    Flink SQL> show tables;
    Flink SQL> describe orders;
     |-- order_id: Long
     |-- shop_id: String
     |-- member_id: Long
     |-- trade_amt: Double
     |-- payment_time: TimeIndicatorTypeInfo(rowtime)
    Flink SQL>

    运行 sql 语句

    • 首先执行最简单的 select *
    Flink SQL> select * from orders;
    select * from orders
    • 1分钟固定窗口计算
      , TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start
      , TUMBLE_END(payment_time, INTERVAL '1' MINUTE)   AS tumble_end
      , sum(trade_amt)                             AS amt
    FROM orders
    GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);




