美文网首页clickhouse
Clickhouse-copier简介

Clickhouse-copier简介

作者: 金科_ | 来源:发表于2019-01-22 00:10 被阅读0次

    Clickhouse-copier 是一个将数据从某个clickhouse环境迁至另一个clickhouse环境的工具。

    这个工具是标准发布的clickhouse server的一部分,它可以在完全并行的模式下工作, 并以最有效的方式分发数据。

    本文将介绍几个可以Clickhouse-copier 的使用实例。以下简称copier

    要使用copier,我们首先要准备一个XML-schema配置, 在这个配置里, 我们需要指定源群集配置和目标群集配置, 以及要复制的表信息。

    关于copier 配置格式可参考: clickhouse-copier config 。

    完成XML-schema配置后,需要将此配置上载至 Zookeeper 节点的特定路径下 (/<task-path>/description)。(下文会提及)

    clickhouse 复印机配置格式在文档中进行了描述: https://clickhouse.yandex/docs/en/operations/utils/clickhouse-copier/配置 xml 文件, 然后需要上载到位于特定路径下的 Zookeeper 节点 (/描述)。

    配置完zk任务后, 就可以指定参数执行 copier工具, 命令行如下所示:

    clickhouse-copier --config config-copier.xml --task-path /task/path --base-dir /path/to/dir --logs-dir /path/to/logs

    参数说明:

    1.config —  config-copier.xml 文件路径,此xml文件中指定zk节点信息及copier的log配置信息

    2.task-path — 到 zookeeper 节点的路径。此节点用于同步copier进程和存储任务。任务存储于 $task-path/description.

    3.base-dir — 辅助文件的路径。设置这个参数后,clickhouse-copier会在$ base-dir中创建clickhouse-copier_YYYYMMHHSS_ <PID>子目录。如果省略此参数,则会在启动clickhouse-copier的目录中创建目录。

    4.logs-dir  - 日志文件路径。当工具启动时,它会创建日志文件并输出所有内容。注意:clickhouse-copier在此过程中不会向stdout写入任何内容。


    用例 1: 将数据从一个群集传输到另一个群集

    将数据从一个ch集群导入到另一个ch集群,有很多种办法

    1.从源集群导出数据到文件中,将文件dump到另一个集群里

    2. insert into … select from remote(...)

    3. 分离或冻结源群集上的分区, 手动将其移动到目标分区, 然后将其附加到目标分区

    4.clickhouse-copier

    前两个选项适用于小型表和单节点系统。但是,如果表很大,则会出现问题。因此,移动数据文件可能是一种更好的方法,但它需要大量的手动工作。特别是如果源和目标集群的配置不同的情况下,移动数据文件将愈加复杂。这就是clickhouse-copier的设计初衷。在程序内部,它可以使用 insert into … select from remote(...)方法 ,同时也兼顾了源和目标集群拓扑因素。

    在此示例中,我们将展示如何将具有两个分片节点的群集中的数据传输到具有两个复制节点的群集。

    解决方案:

    假设我们想拷贝源集群下的 source_database.test_table. (库:source_database, 表:test_table)

    源集群配置定义如下:

    <remote_servers>

    <source_cluster>

      <shard>

        <weight>1</weight>

        <replica>

          <host>192.168.11.1</host>

          <port>9000</port>

        </replica>

      </shard>

      <shard>

        <weight>1</weight>

        <replica>

          <host>192.168.11.2</host>

          <port>9000</port>

        </replica>

      </shard>

    </source_cluster>

    目标集群配置定义如下:

    <yandex>

    <remote_servers>

    <target_cluster>

      <shard>

        <weight>1</weight>

        <replica>

          <host>192.168.11.3</host>

          <port>9000</port>

        </replica>

        <replica>

          <host>192.168.11.4</host>

          <port>9000</port>

        </replica>

      </shard>

    </target_cluster>

    </remote_servers>

    </yandex>

    完整的 clickhouse-copier 任务配置(schema.xml) :

    <yandex>

    <remote_servers>

        <source_cluster>

            <shard>

                <weight>1</weight>

                <replica>

                    <host>192.168.11.1</host>

                    <port>9000</port>

                </replica>

            </shard>

            <shard>

                <weight>1</weight>

                <replica>

                    <host>192.168.11.2</host>

                    <port>9000</port>

                </replica>

            </shard>

        </source_cluster>

        <target_cluster>

            <shard>

                <weight>1</weight>

                <replica>

                    <host>192.168.11.3</host>

                    <port>9000</port>

                </replica>

                <replica>

                    <host>192.168.11.4</host>

                    <port>9000</port>

                </replica>

            </shard>

        </target_cluster>

    </remote_servers>

    <max_workers>1</max_workers>

    <tables>

        <table_events>

            <cluster_pull>source_cluster</cluster_pull>

            <database_pull>source_database</database_pull>

            <table_pull>test_table</table_pull>

            <cluster_push>target_cluster</cluster_push>

            <database_push>target_database</database_push>

            <table_push>test_table</table_push>

            <engine>Engine=MergeTree(...)</engine>

            <sharding_key>rand()</sharding_key>

        </table_events>

    </tables>

    </yandex>

    使用以下命令将其上传到Zookeeper:

    ./zkCli create /clickhouse/copytasks “”

    ./zkCli create /clickhouse/copytasks/task2 “”

    ./zkCli create /clickhouse/copytasks/task2/description “`cat schema.xml`”


    另外,我们需要指定copier的配置文件,config-copier.xml

    示例如下:

     config-copier.xml

    运行clickhouse-copier工具:

    clickhouse-copier --config = config-copier.xml --task-path = /clickhouse/copytasks/task2

    工具启动后,将需要一段时间才能完成任务,具体取决于要复制的表的大小。此外,在完成此过程之前,它不会显示任何内容。之后,您应该检查日志文件以确认没有错误。

    之后我们可以验证数据是否一致,如果OK,根据情况下掉旧库。


    用例 2: 将单个节点服务器扩展到多个节点群集

    在本例中, 我们从单个 clickhouse 服务器开始。目标是将此服务器扩展到两个节点群集中, 数据在节点之间平均分布。对于新群集, 我们使用现有服务器和新服务器。

    解决方案:

    Clickhouse-copier不适用于此用例,因为它无法将表复制到同一服务器上的同一数据库中。由于我们要重用一台服务器,我们需要创建一个新的数据库。所以计划如下:

    1.为分布式表创建新数据库

    2.使用clickhouse-copier将数据复制到新数据库和新表中

    3.在两台服务器上重新创建旧表

    4.从新表中分离分区并将它们附加到旧表

    步骤3和4通常是可选的, 但如果要保留原始表和数据库名称, 则需要执行步骤。

    让我们详细地回顾一下这个过程。

    假设我们有source_database.test_table需要重新分配数据。在现有服务器和新服务器上创建具有相同表结构的单独数据库sharded_database和表test_table。考虑到所有这些因素,我们需要将两台服务器配置为在集群模式下工作,因此我们需要首先更新其配置。

    在我们的服务器上添加<clickhouse_remove_servers>部分到clickhouse配置文件中

    <yandex>

    <remote_servers>

    <test_cluster>

      <shard>

        <weight>1</weight>

        <replica>

          <host>192.168.11.1</host>

          <port>9000</port>

        </replica>

      </shard>

      <shard>

        <weight>1</weight>

        <replica>

          <host>192.168.11.2</host>

          <port>9000</port>

        </replica>

      </shard>

    </test_cluster>

    </remote_servers>

    <macros>

      <shard>shard1</shard>

      <replica>replica1</replica>

    </macros>

    </yandex>

    注意:在每台服务器上分别使用<shard>标记内的分片编号。

    之后不需要重新启动ClickHouse,因此我们可以继续在位于两个节点上的test_table分片上创建分布式表。这是这种表的一个例子

    CREATE TABLE `sharded_database`.`distrib_table` (...)

    ENGINE = Distributed(`test_cluster`, `sharded_database`, `test_table`, RAND())

    该表将帮助我们从新的分布式模式中进行选择查询。

    所以从现在开始,我们有一个基于两个节点的空分布式表,我们需要用clickhouse-copier填充数据。要开始这个过程,我们需要书写配置项:

    1.源集群配置(这将是我们的单个服务器)

    2.目标集群(这将是我们的新集群)

    3.从中提取数据的表

    4.用于将数据提取到的表

    配置计划如下:

    <yandex>

    <remote_servers>

        <source_cluster>

            <shard>

                <weight>1</weight>

                <replica>

                    <host>192.168.11.1</host>

                    <port>9000</port>

                </replica>

            </shard>

        </source_cluster>

        <target_cluster>

            <shard>

                <weight>1</weight>

                <replica>

                    <host>192.168.11.1</host>

                    <port>9000</port>

                </replica>

            </shard>

            <shard>

                <weight>1</weight>

                <replica>

                    <host>192.168.11.2</host>

                    <port>9000</port>

                </replica>

            </shard>

        </target_cluster>

    </remote_servers>

    <max_workers>1</max_workers>

    <tables>

        <table_events>

            <cluster_pull>source_cluster</cluster_pull>

            <database_pull>source_database</database_pull>

            <table_pull>test_table</table_pull>

            <cluster_push>target_cluster</cluster_push>

            <database_push>sharded_database</database_push>

            <table_push>test_table</table_push>

            <engine>Engine=MergeTree(...)</engine>

            <sharding_key>rand()</sharding_key>

        </table_events>

    </tables>

    </yandex>

    将其上传到Zookeeper中,如下所示:

    ./zkCli.sh create /clickhouse/copytasks ""

    ./zkCli.sh create /clickhouse/copytasks/task1 ""

    ./zkCli.sh create /clickhouse/copytasks/task1/description "`cat schema.xml`"

    最后一步之前是为Zookeeper连接创建XML配置,如下所示:

    zookeeper.xml 

    运行copier:

    clickhouse-copier --config-file=zookeeper.xml --task-path=/clickhouse/copytasks/task1

    最后,我们可以验证新旧表中的数据是否一致:

    SELECT COUNT(*) FROM source_database.test_table;

    SELECT COUNT(*) FROM sharded_database.distrib_table;

    如果需要我们的表驻留在初始的'source_database'中,可以使用DETACH / ATTACH PARTITION技术。首先从目标表中分离所有分区,命令如下所示:

    ALTER TABLE sharded_dataabase.test_table DETACH PARTITION '<name>'

    然后将文件从/var/lib/clickhouse/data/sharded_database移动到相关表在每个服务器上的/ /var/lib/clickhouse/data/source_database目录下,然后附加分区,命令如下所示:

    ALTER TABLE source_database.test_table ATTACH PARTITION '<name>'

    有关ATTACH和DETACH分区的更多信息,请参见此处:

    https://clickhouse.yandex/docs/en/query_language/alter/#manipulations-with-partitions-and-parts

    一旦我们reattach所有分区,就不再需要sharded_database,可以将其删除。

    性能说明:

    在准备上述用例时,我们对clickhouse-copier性能感到好奇。现在让我们做一些基准测试。我们还可以将这些结果与select … insert进行比较。

    对于基准测试,我们使用了以下测试:

    1.将数据从具有单个节点的一个群集复制到具有一个节点的不同群集

    2.将数据从具有单个节点的一个群集复制到具有两个节点的群集

    3.将具有两个节点的一个群集复制到另一个具有两个节点的集群

    clickhouse-copier基于正确配置的XML模式完成工作的前提下,我们简单的调用了clickhouse-client完成了测试,如下所示:

    clickhouse-client --host <source> -q “SELECT FROM <source_distributed_table> FORMAT CSV” | clickhouse-client --host <target> -q “INSERT INTO <target_distributed_table> FORMAT CSV”

    在我们得到结果之前,让我们定义我们的数据库。对于所有这些用例,我们将使用基于美国航空公司数据库的公共测试数据集之一,其中包含大约1.5亿条记录。

    以下是结果表:

    result table

    作为结论,我们可以看到clickhouse-copier的性能类似于SELECT ... INSERT语句,但是当它到达分布式数据库时,它变得更有效,因为它使用了大部分并行资源。由于clickhouse-copier最初设计用于复制500个节点的Yandex集群,因此它在大型系统中非常流行。

    结论:

    Clickhouse-copier是一种在任何拓扑的ClickHouse集群之间迁移数据的强大工具。它还可以用于重新分片或更改表的主键。在一个简单的环境中,它的性能与'insert ... select'相同,但当它应用于大型ClickHouse集群时,它的性能极大提升,并且规避了并行复制任务带来的诸多问题。

    相关文章

      网友评论

        本文标题:Clickhouse-copier简介

        本文链接:https://www.haomeiwen.com/subject/hefkjqtx.html