美文网首页
clickhouse-copier

clickhouse-copier

作者: 越狱的灵感 | 来源:发表于2022-06-14 09:01 被阅读0次

    前言

    clickhouse-copier 是clickhouse系统本身带的一个集群同步工具,不需要额外的第三方中间件,使用也非常方便,由于可以同步到本地集群,也可以实现reshard(很多公司的clickhouse reshard 数据重分区 就是基于clickhouse-copier实现的)的功能。

    参考文档:
    https://clickhouse.tech/docs/zh/operations/utilities/clickhouse-copier/
    http://www.clickhouse.com.cn/topic/601fb322b06e5e0f21ba79e1
    https://altinity.com/blog/2018/8/22/clickhouse-copier-in-practice

    技术原理

    1.png

    操作

    1,创建zk文件

    <yandex>
        <logger>
            <level>trace</level>
            <size>100M</size>
            <count>3</count>
        </logger>
        <zookeeper>
           <node index="1">
                <host>zk node1</host>
                <port>12181</port>
            </node>
            <node index="2">
                <host>zk node2</host>
                <port>12181</port>
            </node>
            <node index="3">
                <host>zk node3</host>
                <port>12181</port>
            </node>
        </zookeeper>
    </yandex>
    

    zk集群可以单独建一个或者使用ck集群的zk也可以,配置文件可以参考ck节点上的 /etc/metrika.xml

    2,创建任务文件

    <yandex>
        <!-- Configuration of clusters as in an ordinary server config -->
        <remote_servers>
            <source_cluster>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>ck source node1</host>
                        <port>29000</port>
                        <user>xxxx</user>
                        <password>xxxx</password>
                    </replica>
                    <replica>
                        <host>ck source node2</host>
                        <port>29000</port>
                        <user>xxxx</user>
                        <password>xxxx</password>
                    </replica>
                </shard>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>ck source node3</host>
                        <port>29000</port>
                        <user>default</user>
                        <password>xxxx</password>
                    </replica>
                    <replica>
                        <host>ck source node4</host>
                        <port>29000</port>
                        <user>default</user>
                        <password>xxxx</password>
                    </replica>
                </shard>
            </source_cluster>
            <destination_cluster>
                <shard>
                    <internal_replication>true</internal_replication>
                    <replica>
                        <host>ck target node1</host>
                        <port>29000</port>
                        <user>xxxx</user>
                        <password>xxxx</password>
                    </replica>
                </shard>
            </destination_cluster>
        </remote_servers>
         
        <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
        <!-- 工作进程 -->
        <max_workers>4</max_workers>
         
        <!-- Setting used to fetch (pull) data from source cluster tables -->
        <!-- 源数据库只读 -->
        <settings_pull>
            <readonly>1</readonly>
        </settings_pull>
         
        <!-- Setting used to insert (push) data to destination cluster tables -->
        <!-- 目标数据库可读写 -->
        <settings_push>
            <readonly>0</readonly>
        </settings_push>
         
        <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
             They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
        <!-- 这里保持默认配置 -->
        <settings>
            <connect_timeout>3</connect_timeout>
            <!-- Sync insert is set forcibly, leave it here just in case. -->
            <insert_distributed_sync>1</insert_distributed_sync>
        </settings>
         
        <!-- Copying tasks description.
             You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
             sequentially.
        -->
        <tables>
            <!-- A table task, copies one table. -->
            <!-- 配置单表同步逻辑 -->
            <table_hits>
             
                <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
                <!-- 源数据集群 -->
                <cluster_pull>source_cluster</cluster_pull>
                <!-- 源数据数据库 -->
                <database_pull>demo_source_db</database_pull>
                <!-- 源数据表 -->
                <table_pull>demo_source_table</table_pull>
                 
                <!-- Destination cluster name and tables in which the data should be inserted -->
                <!-- 目标数据集群 -->
                <cluster_push>destination_cluster</cluster_push>
                <!-- 目标数据数据库 -->
                <database_push>demo_target_db</database_push>
                <!-- 目标数据数据表,表如果不存在,工具会自动建,但是数据库不会 -->
                <table_push>demo_target_table</table_push>
                 
                <!-- 设置piece数量,(官方没有给出它的用途,它和cluster_pull标签同级): 用来设置piece数量,默认是10,保持默认就行,在**的时候将每个partition切分成10片,每片的数据是处理的最小任务粒度,然后将每片写入到对应的辅助临时表des_table_piece_*表中,之所以要对partition进行切片到piece的粒度,第一个优点是:可以增加并行度,多个copier可以同时处理多个pieces;第二个优点则是单个pieceTask失败后的重试粒度变小了-->
                <!--<number_of_splits></number_of_splits>-->
                 
                <!-- Engine of destination tables.
                     If destination tables have not be created, workers create them using columns definition from source tables and engine
                     definition from here.
                     NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
                     be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
                     specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
                     system.parts table.
                -->
                <!-- 目标数据数据表引擎,默认会使用源数据表的,建议主动设置 -->
                <engine>ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster_demo0100allp_v1}/{shard_demo0100allp_v1}/demo_target_table' ,'{replica_demo0100allp_v1}') partition by toYYYYMMDD(beginTime) order by (userId,nodeCode,chargingItem,resourceType,model,resourceId)</engine>
                 
                <!-- Sharding key used to insert data to destination cluster -->
                <!-- 目标数据数据表shard逻辑,按业务来设置,我这边是随机写shard -->
                <sharding_key>rand()</sharding_key>
                 
                <!-- Optional expression that filter data while pull them from source servers -->
                <!-- 同步过滤条件,这里如果是多节点同步,建议按节点对数据分割,提高同步性能 -->
                <!--<where_condition>CounterID != 0</where_condition> -->
                 
                <!-- Optional expression that filter data while pull them from source servers -->
                <!-- This section specifies partitions that should be copied, other partition will be ignored.
                     Partition names should have the same format as
                     partition column of system.parts table (i.e. a quoted text).
                     Since partition key of source and destination cluster could be different,
                     these partition names specify destination partitions.
                     NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
                     it is strictly recommended to specify them explicitly.
                     If you already have some ready paritions on destination cluster they
                     will be removed at the start of the copying since they will be interpeted
                     as unfinished data from the previous copying!!!
                -->
               <!--测试发现,工具在获得表的分区信息的时候,是全表扫描的,where_condition不效果,所以大表还是需要配置enabled_partitions按分区同步-->
                <!--<enabled_partitions>
                    <partition>'2018-02-26'</partition>
                    <partition>'2018-03-05'</partition>
                    ...
                </enabled_partitions>-->
            </table_hits>
            
           <!-- Next table to copy. It is not copied until previous table is copying. -->
            <!--</table_visits>
            ...
            </table_visits>-->
            
        </tables>
    </yandex>
    

    3,执行同步

    ##执行同步
    clickhouse-copier --daemon --config /data/clickhouse-copier/zookeeper.xml --task-path /clickhouse/copytasks/task/path  --task-file /data/clickhouse-copier/task/cdnlog_demo_task_1.xml --task-upload-force=true --base-dir /data/clickhouse-copier/data
     
    ##配置详解
    --daemon 后台运行
    --config zk配置文件
    --task-path zk节点上面存储配置文件的路径
    --task-file 本地同步任务配置文件
    --task-upload-force=true 强制刷新任务配置文件到zk的${task-path}/description路径下
    --base-dir  每次备份的日志和元数据信息
     
    ##zk可能需要到的命令
    #手动上传配置文件到zk
    /data/zookeeper-server/bin/zkCli.sh -server zknode:12181 create /clickhouse/copytasks/task/path/description  "`cat cdnlog_demo_task_1.xml`"
     
    #刷新配置信息
    /data/zookeeper-server/bin/zkCli.sh -server zknode:12181 set /clickhouse/copytasks/task/path/description  "`cat cdnlog_demo_task_1.xml`"
     
    #查看配置信息
    /data/zookeeper-server/bin/zkCli.sh -server zknode:12181 get /clickhouse/copytasks/task/path/description
    
    2.png 3.png

    Tips

    0,强烈建议如果是做reshard功能,停止写入流程,否则会出现数据一致性的问题。
    1,本地表数据结构需要完全一样,默认值不同都不行,分布式表需要自己建立。
    2,在同步的过程中,在源数据库,会生成 XXX.piece_0 到 XXX.piece_n 的配置文件(n由配置文件中的number_of_splits指定),千万不要删除XXX.piece_n配置文件,否则会丢失数据,且千万不能再同步过程中中断任务,否则在XXX.piece_n还没同步到目标表后,XXX.piece_n就被删除了,会出现丢失数据的情况。
    3,如果是作为reshard作用,同步到本地集群,千万不要把目标表和原表名称写成一样,否则会覆盖老的数据,切记切记!!!。
    4,同步过了的分区就不能再同步了。
    5,建议不同步物化视图,因为物化视图的逻辑是需要自己在目标库新建的,如果同步物化视图,目标表是个普通表。
    6,官网推荐在源数据表开启clickhouse-copier进程,以便减少网络流量,且建议在每个主分区上部署一个,使用where_condition做分片。
    7,数据同步过程

    A:DETACH源表
    B:文件远程同步
    C:ATTACH 到XXX.piece_n
    D:将XXX.piece_n 数据 insert into 目标表
    E:drop XXX.piece_n
    

    相关文章

      网友评论

          本文标题:clickhouse-copier

          本文链接:https://www.haomeiwen.com/subject/nrujmrtx.html