环境:
centos 7
docker
ksqldb官方文档:
https://docs.ksqldb.io/en/latest/developer-guide/create-a-stream/
安装
docker pull confluentinc/ksqldb-server:0.11.0
docker pull confluentinc/ksql-cli
启动ksqldb-server 简约版
docker run -d --name ksqldb-server \
-p 0.0.0.0:8088:8088 \
-e KSQL_BOOTSTRAP_SERVERS=192.168.9.121:9092 \
-e KSQL_LISTENERS=http://0.0.0.0:8088/ \
-e KSQL_KSQL_SERVICE_ID=ksql_service_2_ \
confluentinc/ksqldb-server:0.11.0
KSQL_BOOTSTRAP_SERVERS
用于建立与Kafka集群的初始连接的主机列表,多个,号开隔开。
KSQL_KSQL_SERVICE_ID
KSQL服务器的服务ID,用作KSQL创建的内部主题的前缀。
KSQL_LISTENERS
代理侦听的URI列表,包括协议。
启动ksqldb-server 带认证版
docker run -d --name ksqldb-server \
-p 0.0.0.0:8088:8088 \
-e KSQL_BOOTSTRAP_SERVERS=192.168.30.135:9092 \
-e KSQL_LISTENERS=http://0.0.0.0:8088/ \
-e KSQL_KSQL_SERVICE_ID=ksql_service_2_ \
-e KSQL_SECURITY_PROTOCOL=SASL_PLAINTEXT \
-e KSQL_SASL_MECHANISM=PLAIN \
-e KSQL_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<strong-password>\";" \
confluentinc/cp-ksqldb-server:6.0.1
KSQL_SECURITY_PROTOCOL 可以是SASL_SSL等,
Kafka群集用于安全性的协议。
KSQL_SASL_MECHANISM
您的Kafka群集用于安全性的SASL机制。
KSQL_SASL_JAAS_CONFIG
Java身份验证和授权服务(JAAS)配置。
注意权限要用管理员的权限。
镜像要用: confluentinc/cp-ksqldb-server:6.0.1
KSQL CLI
方式一:先创建一个ksqldb-cli 容器, 再进入连接
docker run -d -it --name ksqldb-cli confluentinc/ksqldb-cli:0.17.0 ksql http://ksqldb-server:8088
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
方式二: run ksqldb-cli 容器 并且连接到ksql-server
docker run -it confluentinc/ksqldb-cli:0.17.0 ksql http://192.168.30.134:8088
列出topics
show topics;
ksql 流 STREAM
建立一个流 STREAM:
CREATE STREAM testS14 (
num BIGINT,
nummd5 VARCHAR,
t BIGINT,
lat VARCHAR,
lng VARCHAR,
spd VARCHAR,
alt VARCHAR,
dir VARCHAR,
loctype VARCHAR,
pktype VARCHAR,
imei VARCHAR,
ip VARCHAR
) WITH (
KAFKA_TOPIC='test3',
VALUE_FORMAT='json'
);
获取流的模式:
describe zcs_test;
DESCRIBE EXTENDED zcs_test;
查询流 STREAM:test_stream3
select * from test_stream3 emit changes;
带条件的查询
select * from TEST_STREAM5 WHERE IMEI='863014530322258' and T=1600524888 emit changes;
ps: ksql默认是从kafka最新的数据查询消费的,如果你想从开头查询,则需要在会话上进行设置:SET 'auto.offset.reset' = 'earliest';
建立一个STRUCT流 STREAM
CREATE STREAM TOPS WITH (
KAFKA_TOPIC='topdata',
VALUE_FORMAT='AVRO',
WRAP_SINGLE_VALUE=false
);
CREATE STREAM testS8 (
value STRUCT<lat VARCHAR>
) WITH (
kafka_topic='test1',
VALUE_FORMAT='json'
);
select value->lat as lat
from testS5 emit changes;
查询流
SELECT data,
data->t,
data->imei,
data->ip
FROM test_stream6
EMIT CHANGES;
删除一个流
ksql> DROP STREAM TESTS8;
Message
---------------------------------------------
Source `TESTS8` (topic: test1) was dropped.
---------------------------------------------
删除一个关联查询流
- 终止
ksql> TERMINATE CSAS_NUMS566_0;
Message
-------------------
Query terminated.
-------------------
- 删除
ksql> DROP STREAM NUMS566;
Message
------------------------------------------------
Source `NUMS566` (topic: NUMS566) was dropped.
------------------------------------------------
查看持久查询
SHOW QUERIES;
故障排除
表格或数据流没有结果?SELECT * FROM
这通常是由于查询被配置为仅处理新到达的数据而没有接收到新的输入记录所致。要修复,请执行以下一项操作:
运行。有关更多信息,请参阅“ 配置ksqlDB CLI”和“ 配置ksqlDB服务器”。SET 'auto.offset.reset' = 'earliest';
将新记录写入输入主题。
KSQL 的查询语句不能有""双引号
ksql> select * from TEST_STREAM5 WHERE IMEI=863013182153512 emit changes;
Error in WHERE expression: Cannot compare IMEI (STRING) to 863013182153512 (BIGINT) with EQUAL.
ksql> select * from TEST_STREAM5 WHERE IMEI='863013182153512' emit changes;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|T |LAT |LNG |SPD |ALT |DIR |LOCTYPE |PKTYPE |IMEI |IP |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|1600524975 |22.9439667 |114.1338333 |78 |0 |70 |0 |0 |863013182153512 |10.26.96.81 |
|1600524975 |22.9439667 |114.1338333 |78 |0 |70 |0 |0 |863013182153512 |10.26.96.81 |
|1600524975 |22.9439667 |114.1338333 |78 |0 |70 |0 |0 |863013182153512 |10.26.96.81 |
网友评论