python操作mysql数据库读取一个数据库的表写入另一个数据库

时间:2022-12-11 17:49:46

写这个肯定是工作需要了,不啰嗦,直接说事
我现在有两台主机,一台是公司主机,一台是客户主机,要求把公司主机上的三个表同步到客户主机上的数据库
注意是同步,首先就得考虑用linux定时任务或者主从复制,主从复制因为我没有权限在主机上设置,所以只能选择通过脚本,做定时任务
涉及的三个表创建语句

# 创建表`schedule_building`
create_sql_schedule_building = """
    DROP table IF EXISTS schedule_building ;
    CREATE TABLE `schedule_building` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `uuid` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      `proj_id` int(11) DEFAULT NULL,
      `team_id` int(11) DEFAULT NULL,
      `Building` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      `status` tinyint(4) DEFAULT '1',
      `cid` int(11) DEFAULT NULL,
      `cusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '建立人',
      `ctime` datetime DEFAULT NULL,
      `uid` int(11) DEFAULT NULL,
      `uusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '修改人',
      `utime` datetime DEFAULT NULL,
      `random_no` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=91 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""

# 创建表`schedule_floor`
create_sql_schedule_floor = """
    DROP table IF EXISTS schedule_floor ;
    CREATE TABLE `schedule_floor` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `m_id` int(11) DEFAULT NULL,
      `sort` int(11) DEFAULT NULL,
      `cname` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
      `ctime` datetime DEFAULT NULL,
      `cid` int(11) DEFAULT NULL,
      `utime` datetime DEFAULT NULL,
      `uid` int(11) DEFAULT NULL,
      `status` tinyint(4) DEFAULT '1',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3249 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ;
"""

# 创建表`schedule_room`
create_sql_schedule_room = """
    DROP table IF EXISTS schedule_room ;
    CREATE TABLE `schedule_room` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `m_id` int(11) DEFAULT NULL,
      `ilevel` int(11) DEFAULT NULL,
      `parent_id` int(11) DEFAULT NULL,
      `cname` varchar(50) DEFAULT NULL,
      `mark` varchar(50) DEFAULT NULL,
      `status` tinyint(4) DEFAULT '1',
      `sort` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=926 DEFAULT CHARSET=utf8;
"""

核心代码

from mysql_base import DataBaseParent_local, DataBaseParent_remote, DataBaseParent_test
import MySQLdb

db1 = DataBaseParent_local()
db2 = DataBaseParent_remote()
db3 = DataBaseParent_test()


def read(tb_name):
    sql = "SELECT * FROM {0};".format(tb_name)
    rows, length = db1.select(sql)
    data = []
    for row in rows:
        data.append(row)
    return data


def write_building():
    schedule_building = read("schedule_building")
    sql_schedule_building_2 = "delete from schedule_building ;"
    sql_schedule_building_3 = "insert into schedule_building values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);"
    db3.insert_many(sql_schedule_building_3, sql_schedule_building_2, schedule_building)


def write_floor():
    schedule_floor = read("schedule_floor")
    sql_schedule_floor_2 = "delete from schedule_floor ;"
    sql_schedule_floor_1 = "insert into schedule_floor values(%s,%s,%s,%s,%s,%s,%s,%s,%s);"
    db3.insert_many(sql_schedule_floor_1, sql_schedule_floor_2, schedule_floor)


def write_room():
    schedule_room = read("schedule_room")
    sql_schedule_room_2 = "delete from schedule_room ;"
    sql_schedule_room_1 = "insert into schedule_room values(%s,%s,%s,%s,%s,%s,%s,%s);"
    db3.insert_many(sql_schedule_room_1, sql_schedule_room_2, schedule_room)


if __name__ == '__main__':
    write_floor()
    write_building()
    write_room()

数据库共享基类

#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""DB共享类库"""
# 使用此类,先实例化一个DataBaseParent_local对象,然后对象调用相应方法
# from django.db import connection

import MySQLdb
db1 = MySQLdb.connect("www.shdfshajd.cn", "db_user", "qazeDC!@12", "xcx", charset='utf8')
db3 = MySQLdb.connect("localhost", "root", "root", "apollo", charset='utf8')

class DataBaseParent_local:
    def __init__(self):
        self.cursor = "Initial Status"
        self.cursor = db1.cursor()
        if self.cursor == "Initial Status":
            raise Exception("Can't connect to Database server!")

    # 返回元组套元组数据
    def select(self, sqlstr):
        # result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
        cur = db1.cursor()
        cur.execute(sqlstr)
        List = cur.fetchall()
        iTotal_length = len(List)
        self.description = cur.description
        cur.close()
        return List, iTotal_length

    # 返回列表套字典数据
    def select_include_name(self, sqlstr):
        # result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
        cur = db1.cursor()
        cur.execute(sqlstr)
        index = cur.description
        List = cur.fetchall()
        iTotal_length = len(List)
        result = []
        for res in List:
            row = {}
            for i in range(len(index) - 1):
                row[index[i][0]] = res[i]
            result.append(row)
        cur.close()
        return result, iTotal_length

    # 返回指定页码数据(元组套元组)
    def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
        # List: (('apollo','male','28'),('jack','male','27'))
        # iTotal_length: 查询结果元组的长度
        # select_size:分页每页显示
        # pageNo:页码
        List, iTotal_length = self.select(sqlstr)
        # 确定页码
        if iTotal_length % select_size == 0:
            iTotal_Page = iTotal_length / select_size
        else:
            iTotal_Page = iTotal_length / select_size + 1

        start, end = (pageNo - 1) * select_size, pageNo * select_size
        if end >= iTotal_length: end = iTotal_length
        if iTotal_length == 0 or start > iTotal_length or start < 0:
            return [], iTotal_length, iTotal_Page, pageNo, select_size
        # 假设有10条数据,select_size=5,对应结果如下:
        # List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
        # iTotal_length:10
        # iTotal_Page:2
        # pageNo:1
        # select_size:5
        return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size

    # 执行sql语句
    def executesql(self, sqlstr):
        cur = db1.cursor()
        r = cur.execute(sqlstr)
        db1.commit()
        cur.close()
        return r

    # 插入数据
    def insert(self, sql, param):
        cur = self.cursor
        n = cur.execute(sql, param)
        db1.commit()
        cur.close()
        return n

    def release(self):
        return 0


class DataBaseParent_test:
    def __init__(self):
        self.cursor = "Initial Status"
        self.cursor = db3.cursor()
        if self.cursor == "Initial Status":
            raise Exception("Can't connect to Database server!")

    # 返回元组套元组数据
    def select(self, sqlstr):
        # result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
        cur = db3.cursor()
        cur.execute(sqlstr)
        List = cur.fetchall()
        iTotal_length = len(List)
        self.description = cur.description
        cur.close()
        return List, iTotal_length

    # 返回列表套字典数据
    def select_include_name(self, sqlstr):
        # result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
        cur = db3.cursor()
        cur.execute(sqlstr)
        index = cur.description
        List = cur.fetchall()
        iTotal_length = len(List)
        result = []
        for res in List:
            row = {}
            for i in range(len(index) - 1):
                row[index[i][0]] = res[i]
            result.append(row)
        cur.close()
        return result, iTotal_length

    # 返回指定页码数据(元组套元组)
    def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
        # List: (('apollo','male','28'),('jack','male','27'))
        # iTotal_length: 查询结果元组的长度
        # select_size:分页每页显示
        # pageNo:页码
        List, iTotal_length = self.select(sqlstr)
        # 确定页码
        if iTotal_length % select_size == 0:
            iTotal_Page = iTotal_length / select_size
        else:
            iTotal_Page = iTotal_length / select_size + 1

        start, end = (pageNo - 1) * select_size, pageNo * select_size
        if end >= iTotal_length: end = iTotal_length
        if iTotal_length == 0 or start > iTotal_length or start < 0:
            return [], iTotal_length, iTotal_Page, pageNo, select_size
        # 假设有10条数据,select_size=5,对应结果如下:
        # List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
        # iTotal_length:10
        # iTotal_Page:2
        # pageNo:1
        # select_size:5
        return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size

    # 执行sql语句
    def executesql(self, sqlstr):
        cur = db3.cursor()
        r = cur.execute(sqlstr)
        db1.commit()
        cur.close()
        return r

    # 插入数据
    def insert(self, sql, param):
        cur = self.cursor
        n = cur.execute(sql, param)
        db3.commit()
        cur.close()
        return n

    def release(self):
        return 0

    def insert_many(self, sql, sql1, args):
        cur = self.cursor
        cur.execute(sql1)
        res = cur.executemany(sql, args)
        db3.commit()
        # cur.close()
        return res