实时数据同步之Maxwell和Canal-二、Maxwell 使用

时间:2024-04-16 10:24:22

1、Maxwell 安装部署

1.1 下载安装

# 因为1.30开始不支持jdk8,所以用这个
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/

1.2 初始化 Maxwell 元数据库

# 在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
mysql -uroot -p123456
CREATE DATABASE maxwell;
# 设置 mysql 用户密码安全级别
set global validate_password_length=4;
set global validate_password_policy=0;
# 分配一个账号可以操作该数据库
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
# 分配这个账号可以监控其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO maxwell@'%';
# 刷新 mysql 表权限
flush privileges;

1.3 Maxwell 进程启动

# ============Maxwell 进程启动方式有如下两种========
# 1、使用命令行参数启动 Maxwell 进程
cd /opt/module/maxwell-1.29.2/
# --user  连接 mysql 的用户
# --password  连接 mysql 的用户的密码
# --host mysql 安装的主机名
# --producer  生产者模式(stdout:控制台 kafka:kafka 集群)
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
# 2、修改配置文件,定制化启动 Maxwell 进程 
cp config.properties.example config.properties
vim config.properties
# 修改完成后
bin/maxwell --config ./config.properties

2、Maxwell入门案例

2.1 监控 Mysql 数据并在控制台打印

# 运行 maxwell 来监控 mysql 数据更新
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

创建对应的mysql语句,查看控制台

# 向 mysql 的 test_maxwell 库的 test 表插入一条数据,查看 maxwell 的控制台输出
insert into test  values(1,'aaa');
{
    "database":"test_maxwell", --库名
    "table":"test", --表名
    "type":"insert", --数据更新类型
    "ts":1683960319, --操作时间
    "xid":4335, --操作 id
    "commit":true, --提交成功
    "data":{ --数据
        "id":1,
        "name":"aaa"
    }
}
# 向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据,控制台出现了 3 条 json日志,说明 maxwell 是以数据行为单位进行日志的采集的
INSERT INTO  test VALUES(2,'bbb'),(3,'ccc'),(4,'ddd');
{"database":"test_maxwell","table":"test","type":"insert","ts":1683960373,"xid":4666,"xoffset":0,"data":{"id":2,"name":"bbb"}}
{"database":"test_maxwell","table":"test","type":"insert","ts":1683960373,"xid":4666,"xoffset":1,"data":{"id":3,"name":"ccc"}}
{"database":"test_maxwell","table":"test","type":"insert","ts":1683960373,"xid":4666,"commit":true,"data":{"id":4,"name":"ddd"}}
update test set name='shawn' where id=1;
{
    "database":"test_maxwell",
    "table":"test",
    "type":"update",
    "ts":1683960396,
    "xid":4737,
    "commit":true,
    "data":{
        "id":1,
        "name":"shawn" --修改后的数据
    },
    "old":{ --修改前的数据
        "name":"aaa"
    }
}
# 删除 test_maxwell 库的 test 表的一条数据,查看 maxwell 的控制台输出
DELETE FROM test WHERE id=1;
{
    "database":"test_maxwell",
    "table":"test",
    "type":"delete",
    "ts":1683960501,
    "xid":5085,
    "commit":true,
    "data":{
        "id":1,
        "name":"shawn"
    }
}

2.2 监控 Mysql 数据输出到 kafka

简单接入

# 启动 zookeeper 和 kafka
jpsall
# windows有个可视化工具,叫做kafka Tool
# https://www.kafkatool.com/download.html

# 启动 Maxwell 监控 binlog
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092   --kafka_topic=maxwell
# 打开 kafka 的控制台的消费者消费 maxwell 主题
# 如果要读取历史数据,需要加上--from-begining
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell

然后就是kafka 主题数据的分区控制,在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,就变得至关重要,实现步骤如下:在公司生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 kafka 的一个主题 Topic,并且这个主题也肯定是多分区的

# 修改 maxwell 的配置文件,定制化启动 maxwell 进程
vim config.properties

#   tl;dr   config
log_level=info
# #Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
kafka.bootstrap.servers=hadoop102:9092
 
#   mysql   login   info
host=hadoop102
user=maxwell
password=123456
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
#  ***   kafka   ***
#   list   of   kafka   brokers
#kafka.bootstrap.servers=hosta:9092,hostb:9092
#   kafka   topic   to   write   to
#   this   can   be   static,   e.g.   'maxwell',   or   dynamic,   e.g.namespace_%{database}_%{table}
#   in   the   latter   case   'database'   and   'table'   will   be   replacedwith   the   values   for   the   row   being   processed
# #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell3

#  ***   partitioning   ***
# 一般都是库名或表分区
#   What   part   of   the   data   do   we   partition   by?
#producer_partition_by=database   #   [database,   table,primary_key,   transaction_id,   column]
producer_partition_by=database

#   控制数据分区模式,可选模式有  库名,表名,主键,列名
#   specify   what   fields   to   partition   by   when   using producer_partition_by=column
#   column   separated   list.
#producer_partition_columns=name
 
#   when   using   producer_partition_by=column,   partition   by   this when
#   the   specified   column(s)   don't   exist.
#producer_partition_by_fallback=database

# 手动创建一个 3 个分区的 topic,名字就叫做 maxwell3
kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 2 --partitions 3 --topic maxwell3
# 利用配置文件启动 Maxwell 进程
bin/maxwell --config ./config.properties
/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon

ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

# 消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell3

# 向 test_maxwell 库的 test 表再次插入一条数据
insert into test_maxwell.test values (6,'fff');
# 通过 kafka tool 工具查看,此条数据进入了 maxwell3 主题的 1 号分区
# 向 test_maxwell2 库的 aaa 表插入一条数据
# 注意binlog配置文件要监听这个库才行
insert into test_maxwell2.test values (23,'dd');
# 通过 kafka  tool 工具查看,此条数据进入了 maxwell3 主题的 0 号分区,说明库名会对数据进入的分区造成影响

2.3 监控 Mysql 指定表数据输出控制台

运行 maxwell 来监控 mysql 指定表数据更新

# 运行 maxwell 来监控 mysql 指定表数据更新
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --filter 'exclude: *.*, include:test_maxwell.test' --producer=stdout
# 注:还可以设置 include:test_maxwell.*,通过此种方式来监控 mysql 某个库的所有表,也就是说过滤整个库

2.4 监控 Mysql 指定表全量数据输出控制台,数据初始化

https://maxwells-daemon.io/bootstrapping/

Maxwell 进程默认只能监控 mysql 的 binlog日志的新增及变化的数据,但是Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据,来对 MySQL 的某张表进行数据初始化,也就是我们常说的全量同步。具体操作步骤如下:需求:将 test_maxwell 库下的 test2 表的四条数据,全量导入到 maxwell 控制台进行打印

# 修改Maxwell的元数据,触发数据初始化机制,在 mysql 的 maxwell 库中 bootstrap表中插入一条数据,写明需要全量数据的库名和表名
insert into maxwell.bootstrap(database_name,table_name) values('test_maxwell','test2');

# 启动 maxwell 进程,此时初始化程序会直接打印 test2 表的所有数据
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout

# 当数据全部初始化完成以后,Maxwell 的元数据会变化
# is_complete  字段从 0 变为 1
# start_at  字段从 null 变为具体时间(数据同步开始时间)
# complete_at  字段从 null 变为具体时间(数据同步结束时间)

还有一个方法是使用maxwell-bootstrap脚本,前提是已经启动了maxwell,否则会被阻塞

/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

# 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
# 一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间

# 采用bootstrap方式同步的输出数据格式如下
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-start",
    "ts": 1450557744,
    "data": {}
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "hello"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-insert",
    "ts": 1450557744,
    "data": {
        "txt": "bootstrap!"
    }
}
{
    "database": "fooDB",
    "table": "barTable",
    "type": "bootstrap-complete",
    "ts": 1450557744,
    "data": {}
}


2.5 群起脚本

一个启动脚本,可以参考

#!/bin/bash

MAXWELL_HOME=/opt/module/maxwell

status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}


start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}


stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}


case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac