背景说明
Kakfa MirrorMaker是Kafka 官方提供的跨数据中心的流数据同步方案,其实现原理是通过从Source Cluster消费消息,然后将消息生产到Target Cluster,即普通的消息生产和消费。用户只要通过简单的consumer配置和producer配置,启动Mirror,就可以实现准实时的数据同步。

简单的说,这个工具就是启动consumer消费旧集群中topic的数据,然后启动producer直接发送到新集群中的topic。
问题说明
MirrorMaker同步队列时要求topic的名字必须一样。
解决方案
经过在github中进行搜索,发现开源项目:mirrormaker_topic_rename已经满足项目需求https://github.com/opencore/mirrormaker_topic_rename
源码编译
mvn clean package

构建上传
上传mmchangetopic-1.0-SNAPSHOT.jar至目录:/kafka/libs

配置说明
配置文件目录:/kafka/config
相关配置文件:consumer.properties和producer.properties
consumer.properties => 源集群配置[A]
producer.properties => 目标集群配置[B]
消息流向:消息写入A集群后会被复制到目标集群B中。
启动参数

脚本启动
前台启动
./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties --producer.config ../config/producer.properties --whitelist .* --message.handler com.opencore.RenameTopicHandler --message.handler.args 'a,b'
后台启动
nohup ./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties --producer.config ../config/producer.properties --whitelist .* --message.handler com.opencore.RenameTopicHandler --message.handler.args 'a,b' &
参考命令
nohup bin/kafka-mirror-maker.sh --consumer.config MirrorMaker/consumer.properties --producer.config MirrorMaker/producer.properties --whitelist ' test' &
nohup ./bin/kafka-mirror-maker.sh --new.consumer --consumer.config config/mirror-consumer.properties --num.streams 40 --producer.config config/mirror-producer.properties --whitelist 'ABTestMsg|AppColdStartMsg|BackPayMsg|WebMsg|GoldOpenMsg|BoCaiMsg' &
kafka-mirror-maker --consumer.config consumer.properties --producer.config producer.properties --whitelist test_.* --message.handler com.opencore.RenameTopicHandler --message.handler.args test_source,test_target;test_source2,test_target2
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist “my-topic1,my-topic2”
日志调试
如需要调试,查看debug日志 可以修改/kafka/config/tools-log4j.properties
参考文档
https://blog.csdn.net/gdutliuyun827/article/details/55263245
附录
consumer.properties
bootstrap.servers=10.255.30.152:9092
group.id=mirrorGroupTest
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
request.timeout.ms=50000
heartbeat.interval.ms=30000
session.timeout.ms=40000
max.poll.records=20000
receive.buffer.bytes=524288
max.partition.fetch.bytes=5248576
producer.properties
bootstrap.servers=10.255.30.132:9092
compression.type=none
batch.size=16384
retries=3
acks=1
producer.type=sync
网友评论