0x01 概述
在单机完成验证:从Kafak中消费Json格式的数据,转存至 Clickhouse中。
包含了关键的多层级json字段解析提取能力
目标:实现下面这个json的存储:
{
"id": "123",
"timestamp": 1234567,
"payload": {
"message": "test",
"measure_string": "haha"
}
}
0x02 Kafak创建Topic
- 创建topic
kafka-topics --bootstrap-server localhost:9092 --topic clickhouseTestJson --create --partitions 6 --replication-factor 1
- 确认topic
kafka-topics --bootstrap-server localhost:9092 --describe clickhouseTestJson.
Topic: clickhouseTestJson TopicId: PBbkM2ZhQMarwFf4x5znNQ PartitionCount: 6 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: clickhouseTestJson Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: clickhouseTestJson Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: clickhouseTestJson Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: clickhouseTestJson Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: clickhouseTestJson Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: clickhouseTestJson Partition: 5 Leader: 0 Replicas: 0 Isr: 0
0x03 Clickhouse 表创建:
重点说明:
在clickhouse 中,用到三个对象:
- 最终持久化存储表: event
- 存放kafka消息的队列: json_queue2, 一次select后数据就没有了。
- 数据视图转换器: json_mv2,用于把数据持久化?
-- 创建存储化
CREATE TABLE IF NOT EXISTS event
(
timestamp UInt64 Codec(DoubleDelta, LZ4),
id Int64 Codec(Gorilla, LZ4),
message LowCardinality(String),
measure_string String Codec(ZSTD),
date Date DEFAULT toDate(timestamp, 'UTC') Codec(ZSTD),
timestamp_1min UInt64 DEFAULT (floor(timestamp/60) * 60) Codec(DoubleDelta, LZ4)
) Engine = MergeTree
PARTITION BY toStartOfMonth(date)
ORDER BY (
id,
timestamp_1min
);
-- 创建队列
CREATE TABLE IF NOT EXISTS json_queue2 (
all String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'clickhouseTestJson',
kafka_group_name = 'clickhouseTestJsonGroup2',
kafka_format = 'JSONAsString',
kafka_skip_broken_messages = 10000,
kafka_max_block_size = 1048576;
-- 创建数据转换
CREATE MATERIALIZED VIEW json_mv2 TO event AS
SELECT
JSONExtract(all, 'id', 'Int64') AS id,
JSONExtract(all, 'timestamp', 'Int64') AS timestamp,
JSONExtractString(all, 'payload', 'message') AS message,
JSONExtractString(all, 'payload', 'measure_string') AS measure_string
FROM json_queue2;
在上述MATERIALIZED VIEW json_mv2
对象中,我们通过JSONExtractString函数实现了将多层级的json进行扁平化存储的能力。
0x03 验证提交数据
- 提交数据
kafka-console-producer --broker-list localhost:9092 --topic clickhouseTestJson
{"id":"123","timestamp":1234567,"payload":{"message":"test","measure_string":"haha"}}
0x04 Clickhouse中查看数据
clickhouse-client --stream_like_engine_allow_direct_select 1
use yourdatabase;
select * from event;
# 下面是结果:
SELECT *
FROM event
Query id: f91cb37d-a54a-49a4-800d-1c27e48a9018
┌─timestamp─┬─id─┬─message─┬─measure_string─┬───────date─┬─timestamp_1min─┐
│ 1234567 │ 0 │ test │ haha │ 1970-01-15 │ 1234560 │
└───────────┴────┴─────────┴────────────────┴────────────┴────────────────┘
至此完成数据存储,后续可面向 event表,进行数据合并开发。
0x05 增加字段
Materialized View 不支持增加字段,只能删掉重建
Table 列只能加,不能删除
建字段的时候,对数值型要提前定义好,要不然查询时,使用sum
类函数会报错。
alter table app_event_raw
add column cleanTimes Int32 after city;
drop view app_event_view;
CREATE MATERIALIZED VIEW app_event_view TO app_event_raw AS
SELECT JSONExtract(all, 'apptype_name', 'String') AS apptype_name,
JSONExtract(all, 'baseplate_version', 'String') AS baseplate_version,
JSONExtract(all, 'bd_code', 'String') AS bd_code,
JSONExtract(all, 'bd_name', 'String') AS bd_name,
FROM app_event_queue;
网友评论