美文网首页
Clickhouse消费存储Kafka中的Json数据

Clickhouse消费存储Kafka中的Json数据

作者: 国服最坑开发 | 来源:发表于2023-05-11 17:59 被阅读0次
    clickhouse

    0x01 概述

    在单机完成验证:从Kafak中消费Json格式的数据,转存至 Clickhouse中。
    包含了关键的多层级json字段解析提取能力
    目标:实现下面这个json的存储:

    {
        "id": "123",
        "timestamp": 1234567,
        "payload": {
            "message": "test",
            "measure_string": "haha"
        }
    }
    

    0x02 Kafak创建Topic

    • 创建topic
    kafka-topics --bootstrap-server localhost:9092 --topic clickhouseTestJson --create --partitions 6 --replication-factor 1
    
    • 确认topic
     kafka-topics --bootstrap-server localhost:9092 --describe clickhouseTestJson.
     
     Topic: clickhouseTestJson  TopicId: PBbkM2ZhQMarwFf4x5znNQ PartitionCount: 6   ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: clickhouseTestJson   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: clickhouseTestJson   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
        Topic: clickhouseTestJson   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
        Topic: clickhouseTestJson   Partition: 3    Leader: 0   Replicas: 0 Isr: 0
        Topic: clickhouseTestJson   Partition: 4    Leader: 0   Replicas: 0 Isr: 0
        Topic: clickhouseTestJson   Partition: 5    Leader: 0   Replicas: 0 Isr: 0
    

    0x03 Clickhouse 表创建:

    重点说明:
    在clickhouse 中,用到三个对象:

    • 最终持久化存储表: event
    • 存放kafka消息的队列: json_queue2, 一次select后数据就没有了。
    • 数据视图转换器: json_mv2,用于把数据持久化?
    -- 创建存储化
    CREATE TABLE IF NOT EXISTS event
    (
        timestamp UInt64 Codec(DoubleDelta, LZ4),
        id Int64 Codec(Gorilla, LZ4),
        message LowCardinality(String),
        measure_string String Codec(ZSTD),
        date Date DEFAULT toDate(timestamp, 'UTC') Codec(ZSTD),
        timestamp_1min UInt64 DEFAULT (floor(timestamp/60) * 60) Codec(DoubleDelta, LZ4)
    ) Engine = MergeTree
    PARTITION BY toStartOfMonth(date)
    ORDER BY (
        id,
        timestamp_1min
    );
    
    -- 创建队列
    CREATE TABLE IF NOT EXISTS json_queue2 (
      all String
    ) ENGINE = Kafka
    SETTINGS kafka_broker_list = 'localhost:9092',
           kafka_topic_list = 'clickhouseTestJson',
           kafka_group_name = 'clickhouseTestJsonGroup2',
           kafka_format = 'JSONAsString',
           kafka_skip_broken_messages = 10000,
           kafka_max_block_size = 1048576;
    
    -- 创建数据转换
    CREATE MATERIALIZED VIEW json_mv2 TO event AS
    SELECT
        JSONExtract(all, 'id', 'Int64') AS id,
        JSONExtract(all, 'timestamp', 'Int64') AS timestamp,
        JSONExtractString(all, 'payload', 'message') AS message,
        JSONExtractString(all, 'payload', 'measure_string') AS measure_string
    FROM json_queue2;
    

    在上述MATERIALIZED VIEW json_mv2对象中,我们通过JSONExtractString函数实现了将多层级的json进行扁平化存储的能力。

    0x03 验证提交数据

    • 提交数据
    kafka-console-producer --broker-list localhost:9092 --topic clickhouseTestJson
    {"id":"123","timestamp":1234567,"payload":{"message":"test","measure_string":"haha"}}
    

    0x04 Clickhouse中查看数据

    clickhouse-client --stream_like_engine_allow_direct_select 1
    use yourdatabase;
    select * from event;
    
    # 下面是结果:
    SELECT *
    FROM event
    
    Query id: f91cb37d-a54a-49a4-800d-1c27e48a9018
    
    ┌─timestamp─┬─id─┬─message─┬─measure_string─┬───────date─┬─timestamp_1min─┐
    │   1234567 │  0 │ test    │ haha           │ 1970-01-15 │        1234560 │
    └───────────┴────┴─────────┴────────────────┴────────────┴────────────────┘
    

    至此完成数据存储,后续可面向 event表,进行数据合并开发。

    0x05 增加字段

    Materialized View 不支持增加字段,只能删掉重建
    Table 列只能加,不能删除

    建字段的时候,对数值型要提前定义好,要不然查询时,使用sum类函数会报错。

    alter table app_event_raw
        add column cleanTimes Int32 after city;
    
    drop view app_event_view;
    
    CREATE MATERIALIZED VIEW app_event_view TO app_event_raw AS
    SELECT JSONExtract(all, 'apptype_name', 'String')       AS apptype_name,
           JSONExtract(all, 'baseplate_version', 'String')  AS baseplate_version,
           JSONExtract(all, 'bd_code', 'String')            AS bd_code,
           JSONExtract(all, 'bd_name', 'String')            AS bd_name,   
    FROM app_event_queue;
    

    相关文章

      网友评论

          本文标题:Clickhouse消费存储Kafka中的Json数据

          本文链接:https://www.haomeiwen.com/subject/ziissdtx.html