美文网首页
ksql 安装 与查询

ksql 安装 与查询

作者: Joncc | 来源:发表于2021-05-11 09:30 被阅读0次

    环境:

    centos 7
    docker 
    

    ksqldb官方文档:
    https://docs.ksqldb.io/en/latest/developer-guide/create-a-stream/

    安装

    docker pull confluentinc/ksqldb-server:0.11.0
    docker pull confluentinc/ksql-cli
    

    启动ksqldb-server 简约版

    docker run -d --name ksqldb-server \
      -p 0.0.0.0:8088:8088 \
      -e KSQL_BOOTSTRAP_SERVERS=192.168.9.121:9092 \
      -e KSQL_LISTENERS=http://0.0.0.0:8088/ \
      -e KSQL_KSQL_SERVICE_ID=ksql_service_2_ \
      confluentinc/ksqldb-server:0.11.0
    
    KSQL_BOOTSTRAP_SERVERS
    用于建立与Kafka集群的初始连接的主机列表,多个,号开隔开。
    KSQL_KSQL_SERVICE_ID
    KSQL服务器的服务ID,用作KSQL创建的内部主题的前缀。
    KSQL_LISTENERS
    代理侦听的URI列表,包括协议。
    

    启动ksqldb-server 带认证版

    docker run -d --name ksqldb-server \
      -p 0.0.0.0:8088:8088 \
      -e KSQL_BOOTSTRAP_SERVERS=192.168.30.135:9092 \
      -e KSQL_LISTENERS=http://0.0.0.0:8088/ \
      -e KSQL_KSQL_SERVICE_ID=ksql_service_2_ \
      -e KSQL_SECURITY_PROTOCOL=SASL_PLAINTEXT \
      -e KSQL_SASL_MECHANISM=PLAIN \
      -e KSQL_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<strong-password>\";" \
     confluentinc/cp-ksqldb-server:6.0.1
    
    
    KSQL_SECURITY_PROTOCOL 可以是SASL_SSL等,
    Kafka群集用于安全性的协议。
    KSQL_SASL_MECHANISM
    您的Kafka群集用于安全性的SASL机制。
    KSQL_SASL_JAAS_CONFIG
    Java身份验证和授权服务(JAAS)配置。
    
    注意权限要用管理员的权限。
    镜像要用: confluentinc/cp-ksqldb-server:6.0.1
    

    KSQL CLI

    方式一:先创建一个ksqldb-cli 容器, 再进入连接

    docker run -d -it --name ksqldb-cli confluentinc/ksqldb-cli:0.17.0 ksql http://ksqldb-server:8088
    
    docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
    

    方式二: run ksqldb-cli 容器 并且连接到ksql-server

    docker run -it confluentinc/ksqldb-cli:0.17.0 ksql http://192.168.30.134:8088
    

    列出topics

    show topics;

    ksql 流 STREAM

    建立一个流 STREAM:

    CREATE STREAM testS14 (
       num BIGINT,
       nummd5 VARCHAR,
       t BIGINT,
       lat VARCHAR,
       lng VARCHAR,
       spd VARCHAR,
       alt VARCHAR,
       dir VARCHAR,
       loctype VARCHAR,
       pktype VARCHAR,
       imei VARCHAR,
       ip VARCHAR
       ) WITH (
        KAFKA_TOPIC='test3',
        VALUE_FORMAT='json'
       );
    

    获取流的模式:

    describe zcs_test;
    DESCRIBE EXTENDED zcs_test;
    

    查询流 STREAM:test_stream3

    select * from test_stream3 emit changes;
    

    带条件的查询

    select * from TEST_STREAM5 WHERE IMEI='863014530322258' and T=1600524888 emit changes;
    

    ps: ksql默认是从kafka最新的数据查询消费的,如果你想从开头查询,则需要在会话上进行设置:SET 'auto.offset.reset' = 'earliest';

    建立一个STRUCT流 STREAM

    CREATE STREAM TOPS  WITH (
        KAFKA_TOPIC='topdata',
        VALUE_FORMAT='AVRO',
        WRAP_SINGLE_VALUE=false
       );
    
    
    CREATE STREAM testS8 (
      value STRUCT<lat VARCHAR>
      ) WITH (
      kafka_topic='test1', 
      VALUE_FORMAT='json'
      );
    
    select value->lat as lat
           from  testS5  emit changes;
    
    

    查询流

    SELECT data,
           data->t,
           data->imei,
           data->ip
    FROM test_stream6
    EMIT CHANGES;
    

    删除一个流

    ksql> DROP STREAM TESTS8;
    
     Message
    ---------------------------------------------
     Source `TESTS8` (topic: test1) was dropped.
    ---------------------------------------------
    

    删除一个关联查询流

    1. 终止
    ksql> TERMINATE  CSAS_NUMS566_0;
    
     Message
    -------------------
     Query terminated.
    -------------------
    
    1. 删除
    ksql> DROP STREAM NUMS566;
    
     Message
    ------------------------------------------------
     Source `NUMS566` (topic: NUMS566) was dropped.
    ------------------------------------------------
    

    查看持久查询

    SHOW QUERIES;
    

    故障排除

    表格或数据流没有结果?SELECT * FROM

    这通常是由于查询被配置为仅处理新到达的数据而没有接收到新的输入记录所致。要修复,请执行以下一项操作:
    
        运行。有关更多信息,请参阅“ 配置ksqlDB CLI”和“ 配置ksqlDB服务器”。SET 'auto.offset.reset' = 'earliest';
        将新记录写入输入主题。
    

    KSQL 的查询语句不能有""双引号

        ksql> select * from TEST_STREAM5 WHERE IMEI=863013182153512 emit changes;   
        Error in WHERE expression: Cannot compare IMEI (STRING) to 863013182153512 (BIGINT) with EQUAL.
        ksql> select * from TEST_STREAM5 WHERE IMEI='863013182153512' emit changes;
        +------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
        |T                 |LAT               |LNG               |SPD               |ALT               |DIR               |LOCTYPE           |PKTYPE            |IMEI              |IP                |
        +------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
        |1600524975        |22.9439667        |114.1338333       |78                |0                 |70                |0                 |0                 |863013182153512   |10.26.96.81       |
        |1600524975        |22.9439667        |114.1338333       |78                |0                 |70                |0                 |0                 |863013182153512   |10.26.96.81       |
        |1600524975        |22.9439667        |114.1338333       |78                |0                 |70                |0                 |0                 |863013182153512   |10.26.96.81       |
    
    

    相关文章

      网友评论

          本文标题:ksql 安装 与查询

          本文链接:https://www.haomeiwen.com/subject/semnrltx.html