美文网首页ClickHouse
ClickHouse复制表、分布式表机制与使用方法

ClickHouse复制表、分布式表机制与使用方法

作者: LittleMagic | 来源:发表于2020-04-29 22:05 被阅读0次

    Replication & Sharding

    在ClickHouse文集的第一篇文章中,笔者介绍了ClickHouse高可用集群的配置方法,并且提到:分布式存储要保证高可用,就必须有数据冗余——即副本(replica)。ClickHouse依靠ReplicatedMergeTree引擎族与ZooKeeper实现了复制表机制,成为其高可用的基础。

    另外,笔者也提到,ClickHouse像ElasticSearch一样具有数据分片(shard)的概念,这也是分布式存储的特点之一,即通过并行读写提高效率。ClickHouse依靠Distributed引擎实现了分布式表机制,在所有分片(本地表)上建立视图进行分布式查询,使用很方便。

    在实际操作中,为了最大化性能与稳定性,分片和副本几乎总是一同使用。

    本文就对ClickHouse的复制表、分布式表机制和用法作个介绍。

    Replicated Table & ReplicatedMergeTree Engines

    ClickHouse的副本机制之所以叫“复制表”,是因为它工作在表级别,而不是集群级别(如HDFS)。也就是说,用户在创建表时可以通过指定引擎选择该表是否高可用,每张表的分片与副本都是互相独立的。

    目前支持复制表的引擎是ReplicatedMergeTree引擎族,它与平时最常用的MergeTree引擎族是正交的,如下图所示。

    下面给出ReplicatedMergeTree引擎的完整建表DDL语句。

    CREATE TABLE IF NOT EXISTS test.events_local ON CLUSTER '{cluster}' (
      ts_date Date,
      ts_date_time DateTime,
      user_id Int64,
      event_type String,
      site_id Int64,
      groupon_id Int64,
      category_id Int64,
      merchandise_id Int64,
      search_text String
      -- A lot more columns...
    )
    ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test/events_local','{replica}')
    PARTITION BY ts_date
    ORDER BY (ts_date,toStartOfHour(ts_date_time),site_id,event_type)
    SETTINGS index_granularity = 8192;
    

    其中,ON CLUSTER语法表示分布式DDL,即执行一次就可在集群所有实例上创建同样的本地表。集群标识符{cluster}、分片标识符{shard}和副本标识符{replica}来自之前提到过的复制表宏配置,即config.xml中<macros>一节的内容,配合ON CLUSTER语法一同使用,可以避免建表时在每个实例上反复修改这些值。

    ReplicatedMergeTree引擎族接收两个参数:

    • ZK中该表相关数据的存储路径,ClickHouse官方建议规范化,如上面的格式/clickhouse/tables/{shard}/[database_name]/[table_name]
    • 副本名称,一般用{replica}即可。

    观察一下上述ZK路径下的znode结构与内容。

    [zk: localhost:2181(CONNECTED) 0] ls /clickhouse/tables/01/test/events_local
    [metadata, temp, mutations, log, leader_election, columns, blocks, nonincrement_block_numbers, replicas, quorum, block_numbers]
    
    [zk: localhost:2181(CONNECTED) 1] get /clickhouse/tables/04/test/events_local/columns
    columns format version: 1
    9 columns:
    `ts_date` Date
    `ts_date_time` DateTime
    `user_id` Int64
    `event_type` String
    `site_id` Int64
    `groupon_id` Int64
    `category_id` Int64
    `merchandise_id` Int64
    `search_text` String
    # ...................
    
    [zk: localhost:2181(CONNECTED) 2] get /clickhouse/tables/07/test/events_local/metadata
    metadata format version: 1
    date column: 
    sampling expression: 
    index granularity: 8192
    mode: 0
    sign column: 
    primary key: ts_date, toStartOfHour(ts_date_time), site_id, event_type
    data format version: 1
    partition key: ts_date
    granularity bytes: 10485760
    # ...................
    

    ReplicatedMergeTree引擎族在ZK中存储大量数据,包括且不限于表结构信息、元数据、操作日志、副本状态、数据块校验值、数据part merge过程中的选主信息等等。可见,ZK在复制表机制下扮演了元数据存储、日志框架、分布式协调服务三重角色,任务很重,所以需要额外保证ZK集群的可用性以及资源(尤其是硬盘资源)。

    下图大致示出复制表执行插入操作时的流程(internal_replication配置项为true)。即先写入一个副本,再通过config.xml中配置的interserver HTTP port端口(默认是9009)将数据复制到其他实例上去,同时更新ZK集群上记录的信息。

    Distributed Table & Distributed Engine

    ClickHouse分布式表的本质并不是一张表,而是一些本地物理表(分片)的分布式视图,本身并不存储数据。

    支持分布式表的引擎是Distributed,建表DDL语句示例如下,_all只是分布式表名比较通用的后缀而已。

    CREATE TABLE IF NOT EXISTS test.events_all ON CLUSTER sht_ck_cluster_1
    AS test.events_local
    ENGINE = Distributed(sht_ck_cluster_1,test,events_local,rand());
    

    Distributed引擎需要以下几个参数:

    • 集群标识符
      注意不是复制表宏中的标识符,而是<remote_servers>中指定的那个。
    • 本地表所在的数据库名称
    • 本地表名称
    • (可选的)分片键(sharding key)
      该键与config.xml中配置的分片权重(weight)一同决定写入分布式表时的路由,即数据最终落到哪个物理表上。它可以是表中一列的原始数据(如site_id),也可以是函数调用的结果,如上面的SQL语句采用了随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)

    在分布式表上执行查询的流程简图如下所示。发出查询后,各个实例之间会交换自己持有的分片的表数据,最终汇总到同一个实例上返回给用户。

    而在写入时,我们有两种选择:一是写分布式表,二是写underlying的本地表。孰优孰劣呢?

    直接写分布式表的优点自然是可以让ClickHouse控制数据到分片的路由,缺点就多一些:

    • 数据是先写到一个分布式表的实例中并缓存起来,再逐渐分发到各个分片上去,实际是双写了数据(写入放大),浪费资源;
    • 数据写入默认是异步的,短时间内可能造成不一致;
    • 目标表中会产生较多的小parts,使merge(即compaction)过程压力增大。

    相对而言,直接写本地表是同步操作,更快,parts的大小也比较合适,但是就要求应用层额外实现sharding和路由逻辑,如轮询或者随机等。

    应用层路由并不是什么难事,所以如果条件允许,在生产环境中总是推荐写本地表、读分布式表。举个例子,在笔者最近引入的Flink-ClickHouse Sink连接器中,就采用了随机路由,部分代码如下。

        private Request buildRequest(ClickhouseRequestBlank requestBlank) {
            String resultCSV = String.join(" , ", requestBlank.getValues());
            String query = String.format("INSERT INTO %s VALUES %s", requestBlank.getTargetTable(), resultCSV);
            String host = sinkSettings.getClickhouseClusterSettings().getRandomHostUrl();
    
            BoundRequestBuilder builder = asyncHttpClient
                    .preparePost(host)
                    .setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=utf-8")
                    .setBody(query);
    
            if (sinkSettings.getClickhouseClusterSettings().isAuthorizationRequired()) {
                builder.setHeader(HttpHeaders.Names.AUTHORIZATION, "Basic " + sinkSettings.getClickhouseClusterSettings().getCredentials());
            }
    
            return builder.build();
        }
    
        public String getRandomHostUrl() {
            currentHostId = ThreadLocalRandom.current().nextInt(hostsWithPorts.size());
            return hostsWithPorts.get(currentHostId);
        }
    

    The End

    ClickHouse确实是一个设计很精妙的OLAP数据库,还有很多细节等着我们在实际应用中去发掘。

    民那晚安晚安。

    相关文章

      网友评论

        本文标题:ClickHouse复制表、分布式表机制与使用方法

        本文链接:https://www.haomeiwen.com/subject/caigwhtx.html