文章目录
- 背景
- 环境
- 工具选型
- 实操
- MM1
- MM2
- 以MM2集群运行
- 以Standalone模式运行
- 验证
- 附录
- MM2配置表
- 其他
- TODO
背景
一个测试环境的kafka集群,Topic有360+,Partition有2000+,部署在虚拟机上,由于多方面原因,要求迁移至k8s容器内(全量迁移),正好可以拿来练一下手。本文主要记录对MM1和MM2的实际操作过程,以及使用过程中遇到的问题及解决方案。
环境
source集群:kafka-2.6.0、2个broker、虚拟机
target集群:kafka-2.6.0、3个broker、k8s
工具:MM1()、MM2()
需求:Topic名称不能改变、数据完整
条件:target集群需要开启自动创建Topic:=true
工具选型
本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。
并且在MM1多年的使用过程中发现了以下局限性:
- 静态的黑名单和白名单
- Topic信息不能同步,所有Topic同步到目标端都只有一个Partition
- 必须通过手动配置来解决active-active场景下的循环同步问题(MM2为解决这个问题,也做了体验很不好的改动)
- rebalance导致的性能问题
- 缺乏监控手段
- 无法保证Exactly Once
- 无法提供容灾恢复
- 无法同步Topic列表,只能同步有数据的Topic
MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,可以支持同步以下数据:
- 完整的Topic列表
- Topic配置
- ACL信息(如果有)
- consumer group和offset(kafka2.7.0之后版本才行)
- 其他功能:
- 支持循环同步检测
- 多集群自定义同步(同一个任务中,可以多集群同步:A->B、B->C、B->D)
- 提供可监控Metrics
- 可通过配置保证Exactly Once
- …
实操
秉着实操前先演练的原则,我自己搭建了一个和目标集群相同配置的集群,用于验证不同工具的操作结果。有足够把握之后,再对目标集群实际操作。
MM1
执行 --help 查看参数选项:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./ --help
This tool helps to continuously copy data between two Kafka clusters.
Option Description
------ -----------
-- <String: Stop Configure the mirror maker to exit on
the entire mirror maker when a send a failed send. (default: true)
failure occurs>
-- <String: config file> Embedded consumer config for consuming
from the source cluster.
-- <String: The consumer rebalance listener to use
A custom rebalance listener of type for mirror maker consumer.
ConsumerRebalanceListener>
--help Print usage information.
-- <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
-- <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
-- DEPRECATED Use new consumer in mirror
maker (this is the default so this
option will be removed in a future
version).
-- <Integer: Number of Number of consumption streams.
threads> (default: 1)
-- <Integer: Offset commit interval in ms.
offset commit interval in (default: 60000)
millisecond>
-- <String: config file> Embedded producer config.
-- <String: Arguments used by custom rebalance
Arguments passed to custom rebalance listener for mirror maker consumer.
listener constructor as a string.>
--version Display Kafka version.
--whitelist <String: Java regex Whitelist of topics to mirror.
(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
核心参数就两个:消费者和生产者的配置文件:
:(消费source集群)
=source:9092
=earliest
=
=SASL_PLAINTEXT
=PLAIN
=mm1-consumer
= required username="admin" password="admin-pwd";
:(发送消息至目标集群)
= target:29092
=SASL_PLAINTEXT
=PLAIN
= required username="Admin" password="hMOPbmZE";
acks=-1
=10
=10000
retries=3
执行脚本:
./ -- ./ -- ./ -- 5000 -- 2 --whitelist "projects.*"
MM1比较简单,只要两个配置文件没问题,sasl配置正确,基本就OK了,适合简单的数据同步,比如指定topic进行同步。
MM2
有四种运行MM2的方法:
- As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
- As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
- As a standalone Connect worker.(作为独立的Connect工作者)
- In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)
本文介绍第一种和第三种:作为专用的MirrorMaker群集、作为独立的Connect工作者,第二种需要搭建connect集群,操作比较复杂。
以MM2集群运行
这种模式是最简单的,只需要提供一个配置文件即可,配置文件定制化程度比较高,根据业务需求配置即可
老样子,执行 --help 看看使用说明:
[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./ --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]]
MirrorMaker 2.0 driver
positional arguments:
MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
可以看到,参数简单了许多,核心参数就一个配置文件。
:
name = event-center-connector
=
= 2
# 定义集群别名
clusters = event-center, event-center-new
# 设置event-center集群的kafka地址列表
= source:9193
=SASL_PLAINTEXT
=PLAIN
= required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
= target:29092
=SASL_PLAINTEXT
=PLAIN
= required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center-> = true
# 允许同步topic的正则
event-center-> = projects.*
event-center-> = .*
# MM2内部同步机制使用的topic,replication数量设置
=1
=1
=1
=1
=1
=1
# 自定义参数
# 是否同步源topic配置
=true
# 是否同步源event-centerCL信息
=true
=true
# 连接器是否发送心跳
=true
# 心跳间隔
=5
# 是否发送检查点
=true
# 是否刷新topic列表
=true
# 刷新间隔
=60
# 是否刷新消费者组id
=true
# 刷新间隔
=60
# DefaultReplicationPolicy / CustomReplicationPolicy
=
# 远端创建新topic的replication数量设置
=3
需要注意的是: 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。
官方也给出了解释:
这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“”使用自定义复制策略类来完成此操作。
针对如何自定义策略及使用方法,见我的另一篇文章:
为了保证脚本后台运行,写一个脚本包装一下:
:
#!/bin/bash
exec ./ >log/ 2>&1 &
之后执行脚本即可。
以Standalone模式运行
这种模式会麻烦点,需要提供一个kafka,作为worker节点来同步数据,使用的脚本为:
–help看看如何使用:
./ --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone [ ...] (:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]#
需要两个配置文件,一个是作为worker的kafka集群信息(),另一个是同步数据的配置()
:
=worker:29092
=PLAINTEXT
=PLAIN
=
=
=/tmp/
=10000
:
name = MirrorSourceConnector
topics = projects.*
groups = *
=
= 1
# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
= old
= source:9193
=SASL_PLAINTEXT
=PLAIN
= required username="admin" password="admin-pwd";
# target
= new
= target:29092
=SASL_PLAINTEXT
=PLAIN
= required username="Admin" password="hMOPbmZE";
# 是否同步源topic配置信息
=true
# 是否同步源ACL信息
=true
=true
# 连接器是否发送心跳
=true
# 心跳间隔
=5
# 是否发送检查点
=true
# 是否刷新topic列表
=true
# 刷新间隔
=30
# 是否刷新消费者组id
=true
# 刷新间隔
=30
# 连接器消费者预读队列大小
# =500
# 使用自定义策略
=
= 3
执行:
./
这种方式做一个简单的介绍,我最后采用的是上一种方式,比较简单直接
验证
验证:
-
消息数量 OK
使用kafka-tool工具连接上两个集群进行比对
-
Topic数量 OK
- source:
./ --bootstrap-server source:9193 --command-config --list >
- sink
./ --bootstrap-server sink:29092 --command-config --list >
- 示例:
= SASL_PLAINTEXT = PLAIN = required username="admin" password="admin-pwd";
-
新消息是否同步 OK
-
新Topic是否同步 OK
-
Consumer是否同步 NO
./ --bootstrap-server source:9193 --command-config --list >
如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils
-
consumer offset是否同步 NO
-
ACL是否同步 OK
通过或者客户端工具kafka-tool可以查看
附录
MM2配置表
property | default value | description |
---|---|---|
name | required | name of the connector, . “us-west->us-east” |
topics | empty string | regex of topics to replicate, . “topic1|topic2|topic3”. Comma-separated lists are also supported. |
“..internal, ..replica, __consumer_offsets” or similar | topics to exclude from replication | |
groups | empty string | regex of groups to replicate, . “.*” |
empty string | groups to exclude from replication | |
required | name of the cluster being replicated | |
required | name of the downstream Kafka cluster | |
required | upstream cluster to replicate | |
required | downstream cluster | |
true | whether or not to monitor source cluster for configuration changes | |
true | whether to monitor source cluster ACLs for changes | |
true | connector should periodically emit heartbeats | |
5 (seconds) | frequency of heartbeats | |
true | connector should periodically emit consumer offset information | |
5 (seconds) | frequency of checkpoints | |
true | connector should periodically check for new topics | |
5 (seconds) | frequency to check source cluster for new topics | |
true | connector should periodically check for new consumer groups | |
5 (seconds) | frequency to check source cluster for new consumer groups | |
500 (records) | number of records to let consumer get ahead of producer | |
use LegacyReplicationPolicy to mimic legacy MirrorMaker | ||
1 day | used when creating heartbeat topics for the first time | |
1 day | used when creating checkpoint topics for the first time | |
max long | used when creating offset sync topic for the first time | |
2 | used when creating remote topics |
其他
参考:
/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0
/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new
/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf
/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide
TODO
后续验证发现一个问题:
从旧集群生产消息,会复制3份到新集群