Python 数据库之间差异对比

时间:2022-10-18 14:52:51
参考资料:
 
此脚本用于两个数据库之间的表、列、栏位、索引的差异对比。
cat oracle_diff.py
#!/home/dba/.pyenv/versions/3.5.2/bin/python
#coding=utf-8
import cx_Oracle
import time
import difflib
import os
v_host=os.popen('echo $HOSTNAME')
class Oracle_Status_Output():
    def __init__(self,username,password,tns):
        try:
            self.db = cx_Oracle.connect(username,password,tns)
            self.cursor = self.db.cursor()
        except Exception as e:
            print('Wrong')
            print(e)
    def schemas_tables_count(self,sql,db):
        try:  
            self.cursor.execute(sql)
            v_result=self.cursor.fetchall()
            #print(v_result)
            count = 0
            for i in range(len(v_result)):
                #print(v_result[i][1],'--',v_result[i][0])
                count = int(v_result[i][0]) + count
            print(db,'Count Tables','--',count)
        except Exception as e:
            print('Wrong--schemas_tables_count()')
            print(e)
    def schemas_tables_list(self,sql):
        try:
            self.cursor.execute(sql)
            v_result=self.cursor.fetchall()
            #print(v_result)
            return v_result
        except Exception as e:
            print('Wrong--schemas_tables_list()')
            print(e)
    def schemas_tables_columns_list(self,sql,data):
        try:
            self.cursor.execute(sql,A=data)
            v_result=self.cursor.fetchall()
            return v_result
        except Exception as e:
            print('schemas_tables_columns_list')
            print(e)
    def schemas_tables_indexes_list(self,sql,data):
        try:
            self.cursor.execute(sql,A=data)
            v_result=self.cursor.fetchall()
            return v_result
        except Exception as e:
            print('schemas_tables_indexes_list')
            print(e)
    def close(self):
        self.db.close()
schemas_tables_count_sql = "SELECT COUNT(1),S.OWNER FROM DBA_TABLES S WHERE S.OWNER in ('PAY','BOSS','SETTLE','ISMP','TEMP_DSF','ACCOUNT') GROUP BY S.OWNER"
schemas_tables_list_sql  = "SELECT S.OWNER||'.'||S.TABLE_NAME FROM DBA_TABLES S WHERE S.OWNER in ('PAY','BOSS','SETTLE','ISMP','TEMP_DSF','ACCOUNT')"
schemas_tables_columns_sql = "select a.OWNER||'.'||a.TABLE_NAME||'.'||a.COLUMN_NAME||'.'||a.DATA_TYPE||'.'||a.DATA_LENGTH from dba_tab_columns a where a.OWNER||'.'||a.TABLE_NAME = :A"
schemas_tables_indexes_sql = "SELECT T.table_owner||'.'||T.table_name||'.'||T.index_name FROM DBA_INDEXES T WHERE T.table_owner||'.'||T.table_name = :A"
jx_db  = Oracle_Status_Output('dbadmin','QazWsx12','106.15.109.134:1522/paydb')
pro_db = Oracle_Status_Output('dbadmin','QazWsx12','localhost:1521/paydb')
jx_db.schemas_tables_count(schemas_tables_count_sql,'JX ')
pro_db.schemas_tables_count(schemas_tables_count_sql,'PRO')
jx_schemas_tables  = jx_db.schemas_tables_list(schemas_tables_list_sql)
pro_schemas_tables = pro_db.schemas_tables_list(schemas_tables_list_sql)
#print(jx_schemas_tables)
#print(pro_schemas_tables)
def diff_jx_pro(listA,listB,listClass):
    if listA !=[] and listB !=[]:
        #listD  = list(set(listA).union(set(listB)))
        listC  = sorted(list(set(listA).intersection(set(listB))))
        listAC = sorted(list(set(listA).difference(set(listC))))
        listBC = sorted(list(set(listB).difference(set(listC))))
        #if sorted(listD) == sorted(listC):
        #    print('All Tables OK')
        if listC == []:
            #print('JX ',listClass,':',listA)
            #print('PRO ',listClass,':',listB)
            print('Intersection>>','JX ',listClass,':',listAC,'--->','PRO',listClass,':',listBC)
        elif listAC != [] or listBC != []:
            print('Difference  >>','JX ',listClass,':',listAC,'--->','PRO',listClass,':',listBC)
        else:
            pass
        return listC
       
if __name__ == '__main__':
    #diff_jx_pro(jx_schemas_tables,pro_schemas_tables)
    tables_lists = diff_jx_pro(jx_schemas_tables,pro_schemas_tables,'Tables')
    for i in range(len(tables_lists)):
        table_name = "".join(tuple(tables_lists[i]))
        #print(table_name)
        jx_schemas_tables_columns = jx_db.schemas_tables_columns_list(schemas_tables_columns_sql,table_name)
        pro_schemas_tables_columns = pro_db.schemas_tables_columns_list(schemas_tables_columns_sql,table_name)
        diff_jx_pro(jx_schemas_tables_columns,pro_schemas_tables_columns,'Columns')
    for i in range(len(tables_lists)):
        table_name = "".join(tuple(tables_lists[i]))
        #print(table_name)
        jx_schemas_tables_indexes = jx_db.schemas_tables_indexes_list(schemas_tables_indexes_sql,table_name)
        pro_schemas_tables_indexes = pro_db.schemas_tables_indexes_list(schemas_tables_indexes_sql,table_name)
        diff_jx_pro(jx_schemas_tables_indexes,pro_schemas_tables_indexes,'Indexes')
    jx_db.close()
    pro_db.close()
    print('------------Table, column and index check completed--------------')