SpringBoot MySQL BinLog 监听数据变化(多库多表)

时间:2025-03-30 19:29:17
  • /**
  • * @Description BinLog数据工具
  • * @Author WangKun
  • * @Date 2024/8/12 16:40
  • * @Version
  • */
  • @Slf4j
  • public class BinLogDataUtils {
  • /**
  • * @param db
  • * @param table
  • * @Description 获取columns集合
  • * @Throws
  • * @Return <, >
  • * @Date 2024-08-12 16:10:08
  • * @Author WangKun
  • **/
  • public static Map<String, Field> getColumnsMap(String db, String table) {
  • PreparedStatement ps = null;
  • ResultSet rs = null;
  • Connection connection = null;
  • try {
  • //获取数据源
  • DataSource dataSource = ();
  • connection = ();
  • // 执行sql获取表数据
  • String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
  • ps = (preSql);
  • (1, db);
  • (2, table);
  • rs = ();
  • Map<String, Field> map = new HashMap<>(());
  • while (()) {
  • String column = ("COLUMN_NAME");
  • int idx = ("ORDINAL_POSITION");
  • if (column != null && idx >= 1) {
  • // sql的位置从1开始
  • (column, new Field(("TABLE_SCHEMA"), ("TABLE_NAME"), idx - 1, column, ("DATA_TYPE")));
  • }
  • }
  • ();
  • ();
  • ();
  • return map;
  • } catch (SQLException e) {
  • ("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);
  • } finally {
  • try {
  • if (ps != null) {
  • ();
  • }
  • if (rs != null) {
  • ();
  • }
  • if (connection != null) {
  • ();
  • }
  • } catch (SQLException e) {
  • ("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);
  • }
  • }
  • return null;
  • }
  • /**
  • * @param row
  • * @param dbTable
  • * @param columMap
  • * @param eventType
  • * @Description 新增或删除操作数据格式化
  • * @Throws
  • * @Return
  • * @Date 2024-08-12 16:53:07
  • * @Author WangKun
  • **/
  • private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {
  • if (null == row || null == columMap || != ()) {
  • return null;
  • }
  • // 初始化Item
  • BinLog item = new BinLog();
  • (eventType);
  • (columMap);
  • Map<String, Serializable> beOrAf = new HashMap<>();
  • ((key, colum) -> {
  • Serializable serializable = row[];
  • if (serializable instanceof byte[]) {
  • (key, new String((byte[]) serializable));
  • } else {
  • (key, serializable);
  • }
  • });
  • // 写操作放after,删操作放before
  • if (isWrite(eventType)) {
  • (beOrAf);
  • }
  • if (isDelete(eventType)) {
  • (beOrAf);
  • }
  • return item;
  • }
  • /**
  • * @param mapEntry
  • * @param columMap
  • * @param eventType
  • * @Description 更新操作数据格式化
  • * @Throws
  • * @Return
  • * @Date 2024-08-12 16:52:46
  • * @Author WangKun
  • **/
  • private static BinLog updateColum(<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {
  • if (null == mapEntry || null == columMap) {
  • return null;
  • }
  • BinLog item = new BinLog();
  • (eventType);
  • (columMap);
  • Map<String, Serializable> be = new HashMap<>();
  • Map<String, Serializable> af = new HashMap<>();
  • ((key, colum) -> {
  • Serializable serializableKey = ()[];
  • Serializable serializableValue = ()[];
  • if (serializableKey instanceof byte[]) {
  • (key, new String((byte[]) serializableKey));
  • } else {
  • (key, serializableKey);
  • }
  • if (serializableValue instanceof byte[]) {
  • (key, new String((byte[]) serializableValue));
  • } else {
  • (key, serializableValue);
  • }
  • });
  • (be);
  • (af);
  • return item;
  • }
  • /**
  • * @param data
  • * @param dbTableIdCols
  • * @param dbTableCols
  • * @param eventType
  • * @param queue
  • * @Description 更新数据
  • * @Throws
  • * @Return void
  • * @Date 2024-08-14 17:35:49
  • * @Author WangKun
  • **/
  • public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {
  • for (<Serializable[], Serializable[]> row : ()) {
  • if ((())) {
  • String dbTable = (());
  • BinLog item = updateColum(row, (dbTable), eventType);
  • (dbTable);
  • try {
  • (item);
  • } catch (InterruptedException e) {
  • ("BinLog 更新数据添加阻塞队列异常:{}", (), e);
  • }
  • }
  • }
  • }
  • /**
  • * @param eventType
  • * @param rows
  • * @param tableId
  • * @param dbTableIdCols
  • * @param dbTableCols
  • * @param queue
  • * @Description 新增与删除数据
  • * @Throws
  • * @Return void
  • * @Date 2024-08-13 17:30:30
  • * @Author WangKun
  • **/
  • public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {
  • for (Serializable[] row : rows) {
  • if ((tableId)) {
  • String dbTable = (tableId);
  • BinLog item = insertOrDeletedColum(row, dbTable, (dbTable), eventType);
  • (dbTable);
  • try {
  • (item);
  • } catch (InterruptedException e) {
  • ("BinLog 新增或者删除数据添加阻塞队列异常:{}", (), e);
  • }
  • }
  • }
  • }
  • }