5.1 导入依赖
创建 Spring Boot 项目,并导入以下依赖。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>
<dependency>
<groupId>org.rocketmq.spring.boot</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
5.2 配置 application.yaml
application.yaml
的内容如下:
spring:
application:
name: spring-boot-canal-redis
datasource:
url: jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
data:
redis:
host: localhost
port: 6379
database: 0
password: "123456"
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1ms
rocketmq:
name-server: localhost:9876
producer:
group: my-producer_canal-test-topic
send-message-timeout: 60000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
# consumer:
# group: my-consumer_canal-test-topic
server:
port: 8089
5.3 实现Canal同步服务代码
5.3.1 Canal同步服务接口
/**
* Canal同步服务接口,用于处理来自Canal的数据同步请求
* 该接口主要定义了如何处理数据变更事件,包括DDL语句执行和DML操作(插入、更新、删除)
*
* @author zouhu
* @data 2024-10-31 15:16
*/
public interface CanalSyncService<T> {
/**
* 处理数据变更事件
* <p>
* 该方法用于处理来自Canal的数据变更事件,包括DDL语句执行和其他数据操作(如插入、更新和删除)
* </p>
*
* @param flatMessage CanalMQ数据
*/
void process(FlatMessage flatMessage);
/**
* DDL语句处理
*
* @param flatMessage CanalMQ数据
*/
void ddl(FlatMessage flatMessage);
/**
* 插入
*
* @param list 新增数据
*/
void insert(Collection<T> list);
/**
* 更新
*
* @param list 更新数据
*/
void update(Collection<T> list);
/**
* 删除
*
* @param list 删除数据
*/
void delete(Collection<T> list);
}
5.3.2 抽象Canal-RocketMQ通用处理服务
/**
* 抽象Canal-RocketMQ通用处理服务
*
*
* @author zouhu
* @data 2024-10-31 15:21
*/
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractCanalRocketMqRedisService<T> implements CanalSyncService<T> {
private final RedisTemplate<String, Object> redisTemplate;
private Class<T> classCache;
/**
* 获取Model名称
*
* @return Model名称
*/
protected abstract String getModelName();
/**
* 处理数据
* <p>
* 后续优化:可以使用策略模式来封装不同表的操作,不一定要统一
* </p>
*
* @param flatMessage CanalMQ数据
*/
@Override
public void process(FlatMessage flatMessage) {
if (flatMessage.getIsDdl()) {
ddl(flatMessage);
return;
}
Set<T> data = getData(flatMessage);
if (SqlType.INSERT.getType().equals(flatMessage.getType())) {
insert(data);
}
if (SqlType.UPDATE.getType().equals(flatMessage.getType())) {
update(data);
}
if (SqlType.DELETE.getType().equals(flatMessage.getType())) {
delete(data);
}
}
/**
* DDL语句处理
*
* @param flatMessage CanalMQ数据
*/
@Override
public void ddl(FlatMessage flatMessage) {
//TODO : DDL需要同步,删库清空,更新字段处理
}
/**
* 插入
*
* @param list 新增数据
*/
@Override
public void insert(Collection<T> list) {
insertOrUpdate(list);
}
/**
* 更新
*
* @param list 更新数据
*/
@Override
public void update(Collection<T> list) {
insertOrUpdate(list);
}
/**
* 删除
*
* @param list 删除数据
*/
@Override
public void delete(Collection<T> list) {
Set<String> keys = Sets.newHashSetWithExpectedSize(list.size());
for (T data : list) {
keys.add(getWrapRedisKey(data));
}
redisTemplate.delete(keys);
}
/**
* 插入或者更新redis
* <p>
* data 对象里面还包含 getTypeArgument()的返回值,但是没有写到 Redis 里面
* </p>
*
* @param list 数据
*/
private void insertOrUpdate(Collection<T> list) {
for (T data : list) {
log.info("redis data:{}", data);
String key = getWrapRedisKey(data);
log.info("redis key:{}", key);
redisTemplate.opsForValue().set(key, data);
}
}
/**
* 封装redis的key
*
* @param t 原对象
* @return key
*/
protected String getWrapRedisKey(T t) {
return getModelName() + ":" + getIdValue(t);
}
/**
* 获取类泛型
*
* @return 泛型Class
*/
@SuppressWarnings("unchecked")
protected Class<T> getTypeArgument() {
if (classCache == null) {
classCache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
return classCache;
}
/**
* 获取 Object 标有 @TableId 注解的字段值
*
* @param t 对象
* @return id值
*/
protected Object getIdValue(T t) {
Field fieldOfId = getIdField();
ReflectionUtils.makeAccessible(fieldOfId);
return ReflectionUtils.getField(fieldOfId, t);
}
/**
* 获取Class标有@TableId注解的字段名称
*
* @return id字段名称
*/
protected Field getIdField() {
Class<T> clz = getTypeArgument();
Field[] fields = clz.getDeclaredFields();
for (Field field : fields) {
TableId annotation = field.getAnnotation(TableId.class);
if (annotation != null) {
return field;
}
}
log.error("PO类未设置@TableId注解");
throw new RuntimeException("PO类未设置@TableId注解");
}
/**
* 转换 Canal 的 FlatMessage中的data成泛型对象
*
* @param flatMessage Canal发送MQ信息
* @return 泛型对象集合
*/
protected Set<T> getData(FlatMessage flatMessage) {
List<Map<String, String>> sourceData = flatMessage.getData();
Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size());
for (Map<String, String> map : sourceData) {
// 将Type类型的数据和T对象合并转换为泛型对象T
T t = JSON.parseObject(JSON.toJSONString(map), getTypeArgument());
targetData.add(t);
}
return targetData;
}
}
5.3.3 具体类的同步服务实现
/**
* User类的 Canal-RocketMQ通用处理服务实现
*
* @author zouhu
* @data 2024-10-31 17:23
*/
@Component
public class UserCanalRocketMqRedisService extends AbstractCanalRocketMqRedisService<User> {
public UserCanalRocketMqRedisService(RedisTemplate<String, Object> redisTemplate) {
super(redisTemplate);
}
@Override
protected String getModelName() {
return "User";
}
}
5.4 实体类
后续将根据这个实体类来进行测试。
/**
* User 实体类
*
* @author zouhu
* @data 2024-10-31 13:29
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class User extends Model<User> {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private String name;
private String email;
}
5.5 RocketMQ 消费者
/**
* 监听所有表的数据修改 binlog
* <p>
* 目前只实现了单个表的处理逻辑, 后续可以使用策略模式实现不同表的处理逻辑
* </p>
*
* @author zouhu
* @data 2024-10-27 23:18
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "canal-test-topic",
consumerGroup = "my-consumer_test-topic-1"
)
public class CanalCommonSyncBinlogConsumer implements RocketMQListener<FlatMessage> {
private final UserCanalRocketMqRedisService userCanalRocketMqRedisService;
@Override
public void onMessage(FlatMessage flatMessage) {
log.info("consumer message {}", flatMessage);
try {
userCanalRocketMqRedisService.process(flatMessage);
} catch (Exception e) {
log.warn(String.format("message [%s] 消费失败", flatMessage), e);
throw new RuntimeException(e);
}
}
}
5.6 后续优化方案
使用策略模式实现不同表的处理策略.