美文网首页
Clickhouse消费存储Kafka中的Json数据

Clickhouse消费存储Kafka中的Json数据

作者: 国服最坑开发 | 来源:发表于2023-05-11 17:59 被阅读0次
clickhouse

0x01 概述

在单机完成验证:从Kafak中消费Json格式的数据,转存至 Clickhouse中。
包含了关键的多层级json字段解析提取能力
目标:实现下面这个json的存储:

{
    "id": "123",
    "timestamp": 1234567,
    "payload": {
        "message": "test",
        "measure_string": "haha"
    }
}

0x02 Kafak创建Topic

  • 创建topic
kafka-topics --bootstrap-server localhost:9092 --topic clickhouseTestJson --create --partitions 6 --replication-factor 1
  • 确认topic
 kafka-topics --bootstrap-server localhost:9092 --describe clickhouseTestJson.
 
 Topic: clickhouseTestJson  TopicId: PBbkM2ZhQMarwFf4x5znNQ PartitionCount: 6   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: clickhouseTestJson   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 4    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 5    Leader: 0   Replicas: 0 Isr: 0

0x03 Clickhouse 表创建:

重点说明:
在clickhouse 中,用到三个对象:

  • 最终持久化存储表: event
  • 存放kafka消息的队列: json_queue2, 一次select后数据就没有了。
  • 数据视图转换器: json_mv2,用于把数据持久化?
-- 创建存储化
CREATE TABLE IF NOT EXISTS event
(
    timestamp UInt64 Codec(DoubleDelta, LZ4),
    id Int64 Codec(Gorilla, LZ4),
    message LowCardinality(String),
    measure_string String Codec(ZSTD),
    date Date DEFAULT toDate(timestamp, 'UTC') Codec(ZSTD),
    timestamp_1min UInt64 DEFAULT (floor(timestamp/60) * 60) Codec(DoubleDelta, LZ4)
) Engine = MergeTree
PARTITION BY toStartOfMonth(date)
ORDER BY (
    id,
    timestamp_1min
);

-- 创建队列
CREATE TABLE IF NOT EXISTS json_queue2 (
  all String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
       kafka_topic_list = 'clickhouseTestJson',
       kafka_group_name = 'clickhouseTestJsonGroup2',
       kafka_format = 'JSONAsString',
       kafka_skip_broken_messages = 10000,
       kafka_max_block_size = 1048576;

-- 创建数据转换
CREATE MATERIALIZED VIEW json_mv2 TO event AS
SELECT
    JSONExtract(all, 'id', 'Int64') AS id,
    JSONExtract(all, 'timestamp', 'Int64') AS timestamp,
    JSONExtractString(all, 'payload', 'message') AS message,
    JSONExtractString(all, 'payload', 'measure_string') AS measure_string
FROM json_queue2;

在上述MATERIALIZED VIEW json_mv2对象中,我们通过JSONExtractString函数实现了将多层级的json进行扁平化存储的能力。

0x03 验证提交数据

  • 提交数据
kafka-console-producer --broker-list localhost:9092 --topic clickhouseTestJson
{"id":"123","timestamp":1234567,"payload":{"message":"test","measure_string":"haha"}}

0x04 Clickhouse中查看数据

clickhouse-client --stream_like_engine_allow_direct_select 1
use yourdatabase;
select * from event;

# 下面是结果:
SELECT *
FROM event

Query id: f91cb37d-a54a-49a4-800d-1c27e48a9018

┌─timestamp─┬─id─┬─message─┬─measure_string─┬───────date─┬─timestamp_1min─┐
│   1234567 │  0 │ test    │ haha           │ 1970-01-15 │        1234560 │
└───────────┴────┴─────────┴────────────────┴────────────┴────────────────┘

至此完成数据存储,后续可面向 event表,进行数据合并开发。

0x05 增加字段

Materialized View 不支持增加字段,只能删掉重建
Table 列只能加,不能删除

建字段的时候,对数值型要提前定义好,要不然查询时,使用sum类函数会报错。

alter table app_event_raw
    add column cleanTimes Int32 after city;

drop view app_event_view;

CREATE MATERIALIZED VIEW app_event_view TO app_event_raw AS
SELECT JSONExtract(all, 'apptype_name', 'String')       AS apptype_name,
       JSONExtract(all, 'baseplate_version', 'String')  AS baseplate_version,
       JSONExtract(all, 'bd_code', 'String')            AS bd_code,
       JSONExtract(all, 'bd_name', 'String')            AS bd_name,   
FROM app_event_queue;

相关文章

  • clickhouse之mergeTree

    mergeTree 数据存储方式 数据库表在clickhouse中是分块存储(如果 partitioning ke...

  • ClickHouse kafka引擎落盘分布式表

    1.以json形式传递消息 不包含嵌套json格式,这位作者有解决clickhouse与kafka集成[https...

  • 大数据界的黑马——ClickHouse 架构概述

    ClickHouse 是一个真正的列式数据库管理系统(DBMS)。在 ClickHouse 中,数据始终是按列存储...

  • ClickHouse冷热数据备份思考

    基于ClickHouse来实现实时数仓,一般来说我们可以将热数据存储在ClickHouse中,比如:存储最近30天...

  • Kafka_核心

    kafka集群 Kafka的设计都是为了实现kafak消息队列消费数据的语义Kafka消息队列中数据消费的三种语义...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • Kafka0.8集群部署与shell命令行操作

    1、kafka简介在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。KAFKA...

  • kafka集群搭建

    1、kafka简介在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。KAFKA...

  • kafka消费者丢消息的原因分析

    场景: 通过kafka消费作业,存储到数据库中,当遇到错误信息时停止提交offset,关闭消费者,发送报警短信 项...

  • kafka知识要点

    kafka的特点: 1.消息持久化:通过0(1)的磁盘数据结构提供数据的持久化,kafka中可以存储数据,存储量决...

网友评论

      本文标题:Clickhouse消费存储Kafka中的Json数据

      本文链接:https://www.haomeiwen.com/subject/ziissdtx.html