美文网首页SparkT-Sql
Clickhouse Kafka引擎表使用进阶

Clickhouse Kafka引擎表使用进阶

作者: 淡淡的小番茄 | 来源:发表于2021-02-19 08:22 被阅读0次

          由于项目刚启动,人手不足,kafka引擎表在我们项目中应用很多,基本靠kafka引擎表来做日志计量工作。目前总共23来个引擎,我们的集群只有2台,1分片2副本。目前明显感觉压力比较大。

    Kafka引擎表必要参数:

    ====================================

    kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。

    kafka_topic_list – topic 列表 (my_topic)。

    kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。

    kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。

    可选参数:

    ====================================

    kafka_row_delimiter - 每个消息体(记录)之间的分隔符。

    kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。

    kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

    示例:

    SETTINGS kafka_broker_list = '172.30.xxx.xx:9092',

    kafka_topic_list = 'ck.device.log',

    kafka_group_name = 'device.log.g1',

    kafka_format = 'JSONEachRow',

    kafka_row_delimiter = '\n',

    kafka_skip_broken_messages = 1,

    kafka_num_consumers = 1;

    详细的参数,可以参加官方文档:

    https://clickhouse.tech/docs/zh/engines/table-engines/integrations/kafka/

    https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

    我们每个topic对应不同的日志计量数据,数据为json格式,各不相同。目前的实现机制为:

    然后我们想将引擎表按照业务合并,由目前的23个合并到3-4个,具体如下:

    多引擎表模式,kafka_format我们使用的是JSONEachRow,合并之后需要修改为JSONAsString模式。

    分别创建统一的Kafka引擎表、创建ODS层事实表、物化视图表:

    -- Kafka引擎表

    CREATE TABLE dmp_log.sync_kafka_u0

    (

    `json` Nullable(String)

    )

    ENGINE = Kafka

    SETTINGS kafka_broker_list = '172.30.xxx.xx:9092',

    kafka_topic_list = 'ck.t0',

    kafka_group_name = 't0.g1',

    kafka_format = 'JSONAsString',

    kafka_row_delimiter = '\n',

    kafka_skip_broken_messages = 1,

    kafka_num_consumers = 1;

    -- ODS层事实表

    CREATE TABLE dmp_log.ods_log_t0(

    `data_type` Int16,

    `org_id` Int64,

    `org_name` String,

    `product_key` String,

    `device_key` String,

    `properties` String,

    `insert_time` DateTime DEFAULT now()

    )ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/ods_log_t0','{replica}')

    PARTITION BY toYYYYMM(insert_time)

    ORDER BY (data_type,org_id,product_key,device_key)

    SETTINGS index_granularity = 8192;

    -- 物化视图表

    CREATE MATERIALIZED VIEW dmp_log.view_log_t0 TO dmp_log.ods_log_t0

    (

    `data_type` Int16,

    `org_id` Int64,

    `org_name` String,

    `product_key` String,

    `device_key` String,

    `properties` String

    ) AS

    SELECT

    JSONExtractInt(json,'dataType') data_type,

    JSONExtractFloat(json,'orgId') org_id,

    JSONExtractString(json,'orgName') org_name,

    JSONExtractString(json,'productKey') product_key,

    JSONExtractString(json,'deviceKey') device_key,

    JSONExtractString(json,'properties') properties

    FROM dmp_log.sync_kafka_u0 where _topic='ck.t0';

    连接kafka发送测试消息:

    ./bin/kafka-console-producer.sh --broker-list 172.30.xxx.xx:9092 --topic ck.t0

    >{"dataType":"0","productKey":"cu23o9t2xsyllwnC","productName":"thermostat","deviceKey":"thermostat01","deviceName":"roadthermostat","orgId":"337","orgName":"北京市分公司","ruleExecuteTime":"2021-02-01 20:09:51.628","ruleId":"577","ruleName":"gggggg","messageId":"1498639","properties":"[{\"device.prop.value\":40,\"device.prop.ts\":\"\",\"device.prop.name\":\"实时温度\",\"device.prop.key\":\"currentTemperature\"}]","dataTime":"2022-12-11 10:17:51.014","orgKey":"CU_BEIJING","recordMode":"1","iotId":"b3d43eb801804c41823e6550d5f62ab6","nodeType":"0","createdBy":"cu3k8ssg7t86p0CA&streetlamp01","connectionProtocol":"0","authType":"0","IP":"","updater":null,"updatedOn":"2022-12-11 10:17:51.014","description":"","activatedOn":null,"gatewayId":""}

    查看ODS表信息:

      Kafka Engine 对数据格式很敏感。我们遇到过由于kafka消息队列中发送了一条非友好json格式数据(存在换行、回车导致json非一行),然后kafka引擎一直在报错,读取这个消息,不会丢弃。由于是测试环境我们是直接删掉topic重新发送正确消息。当然还有更优雅的方式,就是重置kafka引擎对应的offset。就是下面所说的回滚操作:

    删除脏数据步骤:

    1、执行```DETACH TABLE sync_kafka_u0```, 下线kafka引擎表。

    2、重置Kafka对应的Offset

    kafka-consumer-groups --bootstrap-server 172.30.xxx.xxx:9092 \

    --topic ck.t0 --group t0.g1 \

    --reset-offsets --to-earliest --execute

    也可通过kafka-consumer-groups.sh命令查看具体的offset,然后通过重置功能重置到具体的offset(具体指定忽略多少条记录)。

    3、执行```ATTACH TABLE data_source```, 上线该表。

    相关文章

      网友评论

        本文标题:Clickhouse Kafka引擎表使用进阶

        本文链接:https://www.haomeiwen.com/subject/qqboxltx.html