【Canal 中间件】Canal 实现 MySQL 增量数据的异步缓存更新-五、实现客户端代码

时间:2024-11-01 13:07:34

5.1 导入依赖

创建 Spring Boot 项目,并导入以下依赖。

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.7</version>
</dependency>

<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.7</version>
</dependency>

<dependency>
    <groupId>org.rocketmq.spring.boot</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

5.2 配置 application.yaml

application.yaml 的内容如下:

spring:
  application:
    name: spring-boot-canal-redis
  datasource:
    url: jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=UTC
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  data:
    redis:
      host: localhost
      port: 6379
      database: 0
      password: "123456"
      lettuce:
        pool:
          max-active: 8
          max-idle: 8
          min-idle: 0
          max-wait: -1ms

rocketmq:
  name-server: localhost:9876
  producer:
    group: my-producer_canal-test-topic
    send-message-timeout: 60000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
#  consumer:
#    group: my-consumer_canal-test-topic

server:
  port: 8089

5.3 实现Canal同步服务代码

5.3.1 Canal同步服务接口
/**
 *  Canal同步服务接口,用于处理来自Canal的数据同步请求
 *  该接口主要定义了如何处理数据变更事件,包括DDL语句执行和DML操作(插入、更新、删除)
 *
 * @author zouhu
 * @data 2024-10-31 15:16
 */
public interface CanalSyncService<T> {
    /**
     * 处理数据变更事件
     * <p>
     *     该方法用于处理来自Canal的数据变更事件,包括DDL语句执行和其他数据操作(如插入、更新和删除)
     * </p>
     *
     * @param flatMessage CanalMQ数据
     */
    void process(FlatMessage flatMessage);

    /**
     * DDL语句处理
     *
     * @param flatMessage CanalMQ数据
     */
    void ddl(FlatMessage flatMessage);

    /**
     * 插入
     *
     * @param list 新增数据
     */
    void insert(Collection<T> list);

    /**
     * 更新
     *
     * @param list 更新数据
     */
    void update(Collection<T> list);

    /**
     * 删除
     *
     * @param list 删除数据
     */
    void delete(Collection<T> list);
}

5.3.2 抽象Canal-RocketMQ通用处理服务
/**
 * 抽象Canal-RocketMQ通用处理服务
 *
 *
 * @author zouhu
 * @data 2024-10-31 15:21
 */
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractCanalRocketMqRedisService<T> implements CanalSyncService<T> {

    private final RedisTemplate<String, Object> redisTemplate;

    private Class<T> classCache;


    /**
     * 获取Model名称
     *
     * @return Model名称
     */
    protected abstract String getModelName();

    /**
     * 处理数据
     * <p>
     *     后续优化:可以使用策略模式来封装不同表的操作,不一定要统一
     * </p>
     *
     * @param flatMessage CanalMQ数据
     */
    @Override
    public void process(FlatMessage flatMessage) {

        if (flatMessage.getIsDdl()) {
            ddl(flatMessage);
            return;
        }

        Set<T> data = getData(flatMessage);

        if (SqlType.INSERT.getType().equals(flatMessage.getType())) {
            insert(data);
        }

        if (SqlType.UPDATE.getType().equals(flatMessage.getType())) {
            update(data);
        }

        if (SqlType.DELETE.getType().equals(flatMessage.getType())) {
            delete(data);
        }

    }

    /**
     * DDL语句处理
     *
     * @param flatMessage CanalMQ数据
     */
    @Override
    public void ddl(FlatMessage flatMessage) {
        //TODO : DDL需要同步,删库清空,更新字段处理
    }

    /**
     * 插入
     *
     * @param list 新增数据
     */
    @Override
    public void insert(Collection<T> list) {
        insertOrUpdate(list);
    }

    /**
     * 更新
     *
     * @param list 更新数据
     */
    @Override
    public void update(Collection<T> list) {
        insertOrUpdate(list);
    }

    /**
     * 删除
     *
     * @param list 删除数据
     */
    @Override
    public void delete(Collection<T> list) {
        Set<String> keys = Sets.newHashSetWithExpectedSize(list.size());

        for (T data : list) {
            keys.add(getWrapRedisKey(data));
        }

        redisTemplate.delete(keys);
    }

    /**
     * 插入或者更新redis
     * <p>
     *     data 对象里面还包含 getTypeArgument()的返回值,但是没有写到 Redis 里面
     * </p>
     *
     * @param list 数据
     */
    private void insertOrUpdate(Collection<T> list) {
        for (T data : list) {
            log.info("redis data:{}", data);
            String key = getWrapRedisKey(data);
            log.info("redis key:{}", key);
            redisTemplate.opsForValue().set(key, data);
        }
    }

    /**
     * 封装redis的key
     *
     * @param t 原对象
     * @return key
     */
    protected String getWrapRedisKey(T t) {
        return getModelName() + ":" + getIdValue(t);
    }

    /**
     * 获取类泛型
     *
     * @return 泛型Class
     */
    @SuppressWarnings("unchecked")
    protected Class<T> getTypeArgument() {
        if (classCache == null) {
            classCache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
        }
        return classCache;
    }

    /**
     * 获取 Object 标有 @TableId 注解的字段值
     *
     * @param t 对象
     * @return id值
     */
    protected Object getIdValue(T t) {
        Field fieldOfId = getIdField();
        ReflectionUtils.makeAccessible(fieldOfId);
        return ReflectionUtils.getField(fieldOfId, t);
    }

    /**
     * 获取Class标有@TableId注解的字段名称
     *
     * @return id字段名称
     */
    protected Field getIdField() {
        Class<T> clz = getTypeArgument();
        Field[] fields = clz.getDeclaredFields();
        for (Field field : fields) {
            TableId annotation = field.getAnnotation(TableId.class);

            if (annotation != null) {
                return field;
            }
        }
        log.error("PO类未设置@TableId注解");
        throw new RuntimeException("PO类未设置@TableId注解");
    }

    /**
     * 转换 Canal 的 FlatMessage中的data成泛型对象
     *
     * @param flatMessage Canal发送MQ信息
     * @return 泛型对象集合
     */
    protected Set<T> getData(FlatMessage flatMessage) {
        List<Map<String, String>> sourceData = flatMessage.getData();
        Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size());
        for (Map<String, String> map : sourceData) {
            // 将Type类型的数据和T对象合并转换为泛型对象T
            T t = JSON.parseObject(JSON.toJSONString(map), getTypeArgument());
            targetData.add(t);
        }
        return targetData;
    }

}
5.3.3 具体类的同步服务实现
/**
 * User类的 Canal-RocketMQ通用处理服务实现
 *
 * @author zouhu
 * @data 2024-10-31 17:23
 */
@Component
public class UserCanalRocketMqRedisService extends AbstractCanalRocketMqRedisService<User> {
    public UserCanalRocketMqRedisService(RedisTemplate<String, Object> redisTemplate) {
        super(redisTemplate);
    }

    @Override
    protected String getModelName() {
        return "User";
    }
}

5.4 实体类

后续将根据这个实体类来进行测试。

/**
 * User 实体类
 *
 * @author zouhu
 * @data 2024-10-31 13:29
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class User extends Model<User> {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    private String name;

    private String email;
}

5.5 RocketMQ 消费者

/**
 * 监听所有表的数据修改 binlog
 * <p>
 *     目前只实现了单个表的处理逻辑, 后续可以使用策略模式实现不同表的处理逻辑
 * </p>
 *
 * @author zouhu
 * @data 2024-10-27 23:18
 */
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = "canal-test-topic",
        consumerGroup = "my-consumer_test-topic-1"
)
public class CanalCommonSyncBinlogConsumer implements RocketMQListener<FlatMessage> {

    private final UserCanalRocketMqRedisService userCanalRocketMqRedisService;

    @Override
    public void onMessage(FlatMessage flatMessage) {
        log.info("consumer message {}", flatMessage);
        try {
            userCanalRocketMqRedisService.process(flatMessage);
        } catch (Exception e) {
            log.warn(String.format("message [%s] 消费失败", flatMessage), e);
            throw new RuntimeException(e);
        }
    }
}

5.6 后续优化方案

使用策略模式实现不同表的处理策略.