美文网首页Apache Kafka
kafka集群扩容后的数据迁移

kafka集群扩容后的数据迁移

作者: yesAnd_ | 来源:发表于2018-10-16 17:39 被阅读0次

最近我们生产环境的kafka集群有增加节点的需求,然而kafka在新增节点后并不会像elasticsearch那样感知到新节点加入后自动将数据reblance到新集群中,因此这个过程需要我们手动分配。一番折腾之后,实现了增加kafka集群节点并将原有数据均匀分配到扩容后的集群。下面结合一个例子谈一下整个过程。

一、环境说明

1.集群状况

假定当前的cluster中只有(101,102,103)三个kafka节点,有一个名为think_tank的topic,该topic有2个replica,均匀分布在三个节点上.

2.目的

我们要做的是在cluster中新增两个节点(记为104,105)后,将的数据均匀分到新集群中的5个节点上。

二、操作步骤

新增kafka节点的部署不是本文重点,就不在此赘述。

其实官方文档的这一小节关于集群扩容讲解很详细:Expanding your cluster ,整个过程需要分为三个步骤:获取kafka给出的建议分配方案、按照给出的分配方案执行分配、查看分配的进度以及状态。这三个步骤对应了kafka脚本提供的三个partition reassigment工具。

--generate: In this mode, given a list of topics and a list of brokers, the tool generates a candidate reassignment to move all partitions of the specified topics to the new brokers. This option merely provides a convenient way to generate a partition reassignment plan given a list of topics and target brokers.

--execute: In this mode, the tool kicks off the reassignment of partitions based on the user provided reassignment plan. (using the --reassignment-json-file option). This can either be a custom reassignment plan hand crafted by the admin or provided by using the --generate option

--verify: In this mode, the tool verifies the status of the reassignment for all partitions listed during the last --execute. The status can be either of successfully completed, failed or in progress

结合例子具体说明:

1、生成重新分配topic的方案

脚本的参数是以json文件的形式传入的,首先要新建一个json文件并设置需要分配哪些topic,think_tank-to-move.json:

{
    "topics":[
        {
            "topic":"think_tank"
        },
        {
            "topic":"这里可以同时指定多个..."
        }
    ],
    "version":1
}

使用/bin目录中提供的kafka-reassign-partitions.sh的脚本请求获取生成分配方案:

./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --topics-to-move-json-file think_tank-to-move.json --broker-list "101,102,103,104,105" --generate

--broker-lsit 的参数 "101,102,103,104,105"是指集群中每个broker的id,由于我们是需要将所有topic均匀分配到扩完结点的5台机器上,所以要指定。同理,当业务改变为将原来的所有数据从旧节点(01,102,103)迁移到新节点(104,105)实现数据平滑迁移,这时的参数应"104,105".

脚本执行后返回的结果如下:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[101,102]},{"topic":"think_tank","partition":4,"replicas":[103,102]},{"topic":"think_tank","partition":3,"replicas":[102,101]},{"topic":"think_tank","partition":0,"replicas":[102,103]},{"topic":"think_tank","partition":1,"replicas":[103,101]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[103,101]},{"topic":"think_tank","partition":4,"replicas":[105,103]},{"topic":"think_tank","partition":3,"replicas":[104,102]},{"topic":"think_tank","partition":0,"replicas":[101,104]},{"topic":"think_tank","partition":1,"replicas":[102,105]}]}

可以看出当前正在运行的方案中,think_tank的replica都是分布在101,102,103这3个节点,新给出的建议方案中replica均匀分布在扩容后的5个节点中。

2.执行分配方案

将上一个步骤中生成的建议方案复制到新建的think_tank_reassignment.json中:

{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[103,101]},{"topic":"think_tank","partition":4,"replicas":[105,103]},{"topic":"think_tank","partition":3,"replicas":[104,102]},{"topic":"think_tank","partition":0,"replicas":[101,104]},{"topic":"think_tank","partition":1,"replicas":[102,105]}]}

使用脚本执行:

 ./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file think_tank_reassignment.json --execute

脚本执行,返回内容:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"think_tank","partition":2,"replicas":[101,102]},{"topic":"think_tank","partition":4,"replicas":[103,102]},{"topic":"think_tank","partition":3,"replicas":[102,101]},{"topic":"think_tank","partition":0,"replicas":[102,103]},{"topic":"think_tank","partition":1,"replicas":[103,101]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

如上,成功开始执行分配数据,同时提示你如果有需要将之前的分配方案备份便于回滚到原方案。

3.查看配过程进

查看脚本的方法如下,注意这次的json文件要和执行步骤中的json是同一个文件:

 ./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file think_tank_reassignment.json --verify

返回结果:

Reassignment of partition [think_tank,2] completed successfully
Reassignment of partition [think_tank,1] completed successfully
Reassignment of partition [think_tank,3] is still in progress
Reassignment of partition [think_tank,4] completed successfully
Reassignment of partition [think_tank,0] is still in progress

is still in progress表示还在处理中,全部迁移成功后每个partition都会显示 completed successfully.注意如果topic数据量大,这个过程可能会时间长一些,不要轻易重启节点!可能会导致数据不一致!!!

三、其它

这个partion reassignment工具同样可以按需手动地将某个特定的topic指定到特定的broker上,所要做的就是按照步骤一给定的格式关联partition到borker即可,如,将think_tank的partition0指定到101、102两节点上:

{
    "version":1,
    "partitions":[
        {
            "topic":"think_tank",
            "partition":0,
            "replicas":[
                101,
                105
            ]
        }
    ]
}

另外,如果有增加replica的个数的需求,同样可以使用这个脚本,可以翻一下官网文档。

One more thing

一点儿感触,在确定问题所在后,官方的文档应该作为我们优先考虑的一个重要资料源,网上的资料由于时间较早、版本不同的原因,解决方式可能需要细微的改动才能达到目的,这些坑在官方的一手资料上其实是可以规避的。

欢迎拍砖,欢迎交流~

注:转载请注明出处

相关文章

  • kafka集群扩容后的数据迁移

    最近我们生产环境的kafka集群有增加节点的需求,然而kafka在新增节点后并不会像elasticsearch那样...

  • kafka集群维护

    【kafka集群维护】 【kafka集群分区日志迁移】(热部署)迁移topic数据到其他broker,请遵循下面四步:

  • Kafka broker 存储不均衡问题排查记录

    背景 Kafka原本集群磁盘扩容后,新建topic的业务量较大,存在数据积压。一段时间后收到集群节点报警,部分磁盘...

  • ceph:如何计算集群数据恢复时间

    1、前言 很多场景会导致集群数据进行迁移、恢复,比如磁盘损坏后换盘操作、集群扩容操作、机器下电维护后再次上电等等。...

  • Elasticsearch非停机状态下线节点

    对于集群的扩容, 我们得心应手; 但遇到需要缩减集群时, 就有点难受 节点的数据如何迁移 (虽然有复本, 但不同索...

  • hdfs文件迁移

    hadoop跨集群之间迁移HDFS数据 不同hadoop集群之间迁移hive数据 hadoop跨集群之间迁移hiv...

  • graph扩容历史数据自动迁移

    当graph集群扩容时,数据会自动迁移达到rebalance的目的。具体的操作步骤如下: 假设1:旧的graph集...

  • RabbitMQ 集群迁移

    对于 RabbitMQ 运维层面来说,扩容和迁移是必不可少的。扩容比较简单,一般向集群中加入新的集群节点即可,不过...

  • 【kafka】 Kafka 集群快速扩容方案

    Kafka扩容后如何将Partitions分区分配到新brokers节点上?kafka数据倾斜问题如何解决? 什么...

  • 一篇文章教你自建hadoop集群迁移到EMR

    自建集群要迁移到EMR集群,往往需要迁移已有数据。本文主要介绍hdfs数据和hive meta数据如何迁移。 前置...

网友评论

    本文标题:kafka集群扩容后的数据迁移

    本文链接:https://www.haomeiwen.com/subject/yhohzftx.html