/**
* @Description BinLog数据工具
* @Author WangKun
* @Date 2024/8/12 16:40
* @Version
*/
public class BinLogDataUtils {
/**
* @param db
* @param table
* @Description 获取columns集合
* @Throws
* @Return <, >
* @Date 2024-08-12 16:10:08
* @Author WangKun
**/
public static Map<String, Field> getColumnsMap(String db, String table) {
PreparedStatement ps = null;
ResultSet rs = null;
Connection connection = null;
try {
//获取数据源
DataSource dataSource = ();
connection = ();
// 执行sql获取表数据
String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
ps = (preSql);
(1, db);
(2, table);
rs = ();
Map<String, Field> map = new HashMap<>(());
while (()) {
String column = ("COLUMN_NAME");
int idx = ("ORDINAL_POSITION");
if (column != null && idx >= 1) {
// sql的位置从1开始
(column, new Field(("TABLE_SCHEMA"), ("TABLE_NAME"), idx - 1, column, ("DATA_TYPE")));
}
}
();
();
();
return map;
} catch (SQLException e) {
("加载BinLog监控配置库.表字段错误, db_table={}.{} ", db, table, e);
} finally {
try {
if (ps != null) {
();
}
if (rs != null) {
();
}
if (connection != null) {
();
}
} catch (SQLException e) {
("加载BinLog监控配置库.表字段错误关闭连接失败, db_table={}.{} ", db, table, e);
}
}
return null;
}
/**
* @param row
* @param dbTable
* @param columMap
* @param eventType
* @Description 新增或删除操作数据格式化
* @Throws
* @Return
* @Date 2024-08-12 16:53:07
* @Author WangKun
**/
private static BinLog insertOrDeletedColum(Serializable[] row, String dbTable, Map<String, Field> columMap, EventType eventType) {
if (null == row || null == columMap || != ()) {
return null;
}
// 初始化Item
BinLog item = new BinLog();
(eventType);
(columMap);
Map<String, Serializable> beOrAf = new HashMap<>();
((key, colum) -> {
Serializable serializable = row[];
if (serializable instanceof byte[]) {
(key, new String((byte[]) serializable));
} else {
(key, serializable);
}
});
// 写操作放after,删操作放before
if (isWrite(eventType)) {
(beOrAf);
}
if (isDelete(eventType)) {
(beOrAf);
}
return item;
}
/**
* @param mapEntry
* @param columMap
* @param eventType
* @Description 更新操作数据格式化
* @Throws
* @Return
* @Date 2024-08-12 16:52:46
* @Author WangKun
**/
private static BinLog updateColum(<Serializable[], Serializable[]> mapEntry, Map<String, Field> columMap, EventType eventType) {
if (null == mapEntry || null == columMap) {
return null;
}
BinLog item = new BinLog();
(eventType);
(columMap);
Map<String, Serializable> be = new HashMap<>();
Map<String, Serializable> af = new HashMap<>();
((key, colum) -> {
Serializable serializableKey = ()[];
Serializable serializableValue = ()[];
if (serializableKey instanceof byte[]) {
(key, new String((byte[]) serializableKey));
} else {
(key, serializableKey);
}
if (serializableValue instanceof byte[]) {
(key, new String((byte[]) serializableValue));
} else {
(key, serializableValue);
}
});
(be);
(af);
return item;
}
/**
* @param data
* @param dbTableIdCols
* @param dbTableCols
* @param eventType
* @param queue
* @Description 更新数据
* @Throws
* @Return void
* @Date 2024-08-14 17:35:49
* @Author WangKun
**/
public static void updateData(UpdateRowsEventData data, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, EventType eventType, BlockingQueue<BinLog> queue) {
for (<Serializable[], Serializable[]> row : ()) {
if ((())) {
String dbTable = (());
BinLog item = updateColum(row, (dbTable), eventType);
(dbTable);
try {
(item);
} catch (InterruptedException e) {
("BinLog 更新数据添加阻塞队列异常:{}", (), e);
}
}
}
}
/**
* @param eventType
* @param rows
* @param tableId
* @param dbTableIdCols
* @param dbTableCols
* @param queue
* @Description 新增与删除数据
* @Throws
* @Return void
* @Date 2024-08-13 17:30:30
* @Author WangKun
**/
public static void insertOrDeletedData(EventType eventType, List<Serializable[]> rows, long tableId, Map<Long, String> dbTableIdCols, Map<String, Map<String, Field>> dbTableCols, BlockingQueue<BinLog> queue) {
for (Serializable[] row : rows) {
if ((tableId)) {
String dbTable = (tableId);
BinLog item = insertOrDeletedColum(row, dbTable, (dbTable), eventType);
(dbTable);
try {
(item);
} catch (InterruptedException e) {
("BinLog 新增或者删除数据添加阻塞队列异常:{}", (), e);
}
}
}
}
}