【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程

时间:2025-03-04 14:07:29

文章目录

  • 背景
  • 环境
  • 工具选型
  • 实操
    • MM1
    • MM2
      • 以MM2集群运行
      • 以Standalone模式运行
  • 验证
  • 附录
    • MM2配置表
    • 其他
    • TODO

背景

一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。

环境

source集群:kafka-2.6.0、2个broker、虚拟机

target集群:kafka-2.6.0、3个broker、k8s

工具:MM1()、MM2()

需求:Topic名称不能改变、数据完整

条件:target集群需要开启自动创建Topic:=true

工具选型

本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。

并且在MM1多年的使用过程中发现了以下局限性:

  1. 静态的黑名单和白名单
  2. Topic信息不能同步,所有Topic同步到目标端都只有一个Partition
  3. 必须通过手动配置来解决active-active场景下的循环同步问题(MM2为解决这个问题,也做了体验很不好的改动)
  4. rebalance导致的性能问题
  5. 缺乏监控手段
  6. 无法保证Exactly Once
  7. 无法提供容灾恢复
  8. 无法同步Topic列表,只能同步有数据的Topic

MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:

  1. 完整的Topic列表
  2. Topic配置
  3. ACL信息(如果有)
  4. consumer group和offset(kafka2.7.0之后版本才行)
  5. 其他功能:
    • 支持循环同步检测
    • 多集群自定义同步(同一个任务中,可以多集群同步:A->B、B->C、B->D)
    • 提供可监控Metrics
    • 可通过配置保证Exactly Once

实操

秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。

MM1

执行 --help 查看参数选项:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./ --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
-- <String: Stop    Configure the mirror maker to exit on
  the entire mirror maker when a send      a failed send. (default: true)
  failure occurs>
-- <String: config file>  Embedded consumer config for consuming
                                           from the source cluster.
-- <String:   The consumer rebalance listener to use
  A custom rebalance listener of type      for mirror maker consumer.
  ConsumerRebalanceListener>
--help                                   Print usage information.
-- <String: A custom      Message handler which will process
  message handler of type                  every record in-between consumer and
  MirrorMakerMessageHandler>               producer.
-- <String:          Arguments used by custom message
  Arguments passed to message handler      handler for mirror maker.
  constructor.>
--                           DEPRECATED Use new consumer in mirror
                                           maker (this is the default so this
                                           option will be removed in a future
                                           version).
-- <Integer: Number of        Number of consumption streams.
  threads>                                 (default: 1)
-- <Integer:    Offset commit interval in ms.
  offset commit interval in                (default: 60000)
  millisecond>
-- <String: config file>  Embedded producer config.
-- <String:       Arguments used by custom rebalance
  Arguments passed to custom rebalance     listener for mirror maker consumer.
  listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.
  (String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         

核心参数就两个:消费者和生产者的配置文件:

:(消费source集群)

=source:9092
=earliest
=
=SASL_PLAINTEXT
=PLAIN
=mm1-consumer
= required username="admin" password="admin-pwd";

:(发送消息至目标集群)

= target:29092
=SASL_PLAINTEXT
=PLAIN
= required username="Admin" password="hMOPbmZE";
acks=-1
=10
=10000
retries=3

执行脚本:

./ -- ./ -- ./ -- 5000 -- 2 --whitelist "projects.*"

MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。

MM2

有四种运行MM2的方法:

  • As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
  • As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
  • As a standalone Connect worker.(作为独立的Connect工作者)
  • In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。

以MM2集群运行

这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可

老样子,执行 --help 看看使用说明:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./ --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] 

MirrorMaker 2.0 driver

positional arguments:
           MM2 configuration file.

optional arguments:
  -h, --help             show this help message and exit
  --clusters CLUSTER [CLUSTER ...]
                         Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  

可以看到,参数简单了许多,核心参数就一个配置文件。

name = event-center-connector
 = 
 = 2

# 定义集群别名
clusters = event-center, event-center-new

# 设置event-center集群的kafka地址列表
 = source:9193
=SASL_PLAINTEXT
=PLAIN
= required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
 = target:29092
=SASL_PLAINTEXT
=PLAIN
= required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center-> = true
# 允许同步topic的正则
event-center-> = projects.*
event-center-> = .*

# MM2内部同步机制使用的topic,replication数量设置
=1
=1
=1
=1
=1
=1

# 自定义参数
# 是否同步源topic配置
=true
# 是否同步源event-centerCL信息
=true
=true
# 连接器是否发送心跳
=true
# 心跳间隔
=5
# 是否发送检查点
=true
# 是否刷新topic列表
=true
# 刷新间隔
=60
# 是否刷新消费者组id
=true
# 刷新间隔
=60
# DefaultReplicationPolicy / CustomReplicationPolicy
=
# 远端创建新topic的replication数量设置
=3

需要注意的是: 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“”使用自定义复制策略类来完成此操作。

针对如何自定义策略及使用方法,见我的另一篇文章:

为了保证脚本后台运行,写一个脚本包装一下:

#!/bin/bash

exec ./  >log/ 2>&1 &

之后执行脚本即可。

以Standalone模式运行

这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:

–help看看如何使用:

./ --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone   [ ...] (:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 

需要两个配置文件,一个是作为worker的kafka集群信息(),另一个是同步数据的配置()

=worker:29092
=PLAINTEXT
=PLAIN

 = 
 = 

=/tmp/
=10000

name = MirrorSourceConnector
topics = projects.*
groups = *
 = 
 = 1

# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
 = old
 = source:9193
=SASL_PLAINTEXT
=PLAIN
= required username="admin" password="admin-pwd";
# target
 = new
 = target:29092
=SASL_PLAINTEXT
=PLAIN
= required username="Admin" password="hMOPbmZE";

# 是否同步源topic配置信息
=true
# 是否同步源ACL信息
=true
=true
# 连接器是否发送心跳
=true
# 心跳间隔
=5
# 是否发送检查点
=true
# 是否刷新topic列表
=true
# 刷新间隔
=30
# 是否刷新消费者组id
=true
# 刷新间隔
=30
# 连接器消费者预读队列大小
# =500
# 使用自定义策略
=
 = 3

执行:

./  

这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接

验证

验证:

  • 消息数量 OK

    使用kafka-tool工具连接上两个集群进行比对

  • Topic数量 OK

    • source:
    ./ --bootstrap-server source:9193 --command-config  --list >  
    
    • sink
    ./ --bootstrap-server sink:29092 --command-config  --list >  
    
    • 示例:
     = SASL_PLAINTEXT
     = PLAIN
    = required username="admin" password="admin-pwd";
    
  • 新消息是否同步 OK

  • 新Topic是否同步 OK

  • Consumer是否同步 NO

./ --bootstrap-server source:9193 --command-config  --list >  

​ 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils

  • consumer offset是否同步 NO

  • ACL是否同步 OK
    通过或者客户端工具kafka-tool可以查看

附录

MM2配置表

property default value description
name required name of the connector, . “us-west->us-east”
topics empty string regex of topics to replicate, . “topic1|topic2|topic3”. Comma-separated lists are also supported.
“..internal, ..replica, __consumer_offsets” or similar topics to exclude from replication
groups empty string regex of groups to replicate, . “.*”
empty string groups to exclude from replication
required name of the cluster being replicated
required name of the downstream Kafka cluster
required upstream cluster to replicate
required downstream cluster
true whether or not to monitor source cluster for configuration changes
true whether to monitor source cluster ACLs for changes
true connector should periodically emit heartbeats
5 (seconds) frequency of heartbeats
true connector should periodically emit consumer offset information
5 (seconds) frequency of checkpoints
true connector should periodically check for new topics
5 (seconds) frequency to check source cluster for new topics
true connector should periodically check for new consumer groups
5 (seconds) frequency to check source cluster for new consumer groups
500 (records) number of records to let consumer get ahead of producer
use LegacyReplicationPolicy to mimic legacy MirrorMaker
1 day used when creating heartbeat topics for the first time
1 day used when creating checkpoint topics for the first time
max long used when creating offset sync topic for the first time
2 used when creating remote topics

其他

参考:

/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0

/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new

/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf

/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide

TODO

后续验证发现一个问题:
从旧集群生产消息,会复制3份到新集群