some working learning总结学习(二)

时间:2023-02-01 07:47:09

 2. 读取excel上的测试用例和结果,成json格式,最终存到access数据库

结果如下:

{
    0: {
        'Req_ID': 'Fqqqqq/qqqqq',
        'Case_ID': 'ATC_LvPwrOff_Case001',
        'Description': 'kkkkkkkkkkk',
        'PreCondition': {
            'B': 4,
            'V: 0,
            'V': 0,
            'IN': 0,
            'IN': 0,
            'Ke: 1,
            'INN': 1,
            'INage': 12,
            'Re': 0,
            'Rue': 0
        },

excel_tojson.py

some working learning总结学习(二)some working learning总结学习(二)
from openpyxl import load_workbook
from openpyxl.styles import colors, Font

wb = load_workbook('VCU_TC_LvPwrOff.xlsx')
sheet = wb.active  # 获得当前正在显示的sheet, 也可以用wb.get_active_sheet()# 获得当前正在显示的sheet, 也可以用wb.get_active_sheet()

# b5_too = sheet.cell(row=1, column=2)
# print(b5_too.value)
#
# b6_too = sheet.cell(row=1, column=1)
# b7_too = sheet.cell(row=2, column=1)
# b8_too = sheet.cell(row=3, column=1)
#
# print(b6_too.value)
# print(b7_too.value)
# print(b8_too.value)
#
# print('-'*100)
# for cell in list(sheet.rows)[0]:
#     print(cell.value)
#
# print('-'*100)
# #判断一行的有效数据长度
# first_len = list(sheet.rows)[0]
# # for i in first_len:
# #     if 'None' == i.value:
# #         first_len.remove(i)
# print(len(first_len))
# print('-'*100)
# for row in sheet.iter_rows('A1:AF1'):
#     for cell in row:
#         print(cell.value)
TCrows=[]
First_info=[]
First_info_list = []
Second_info=[]
Second_info_list = []
pre_dic = {}
#获取testcase


#获取所有行对象
for row in sheet.iter_rows():
    TCrows.append(row)

def get_all_row():
    '''
      获取首行row的信息
      {"1":{"Req_ID":0,"Case_ID":1,"Description":2,"PreCondition":{
                    "IN_ChrgOffButton":x,
                    "IN_KeyStart":x}, "Action":{
                                        "IN_KeyStart":x
                                        }
        {'Req_ID': 0, 'Case_ID': 1, 'Description': 2, 'PreCondition': 3, 'Action': 13, 'ExpectedResults': 22, 'Result': 31}
      }
       }
      '''
    # for i in range(2, 14):
    pass
    # for i in range(2, 3):
    #     Cases.append(TCrows[i])
    # for Case in Cases:
    #     for cell in para_head:
    #         # print(cell.value,cell.coordinate)
    #         First_info_list.append(cell.value)
    #
    # # print(First_info_list)
    # len_first_list = len(First_info_list)
    # # print(len_first_list)
    # while True:
    #     if First_info_list[-1] == 'None':
    #         First_info_list.pop()
    #     else:
    #         break
    # return First_info_list, len_first_list

def get_row_first():
    '''
    获取首行row的信息
    '''
    for i in range(0, 1):
        First_info.append(TCrows[i])
    for para_head in First_info:
        for cell in para_head:
            # print(cell.value,cell.coordinate)
            First_info_list.append(cell.value)

    # print(First_info_list)
    len_first_list = len(First_info_list)
    # print(len_first_list)
    while True:
        if First_info_list[-1] == 'None':
            First_info_list.pop()
        else:
            break
    return First_info_list,len_first_list

def get_row_second():
    '''
    获取第二行row的信息
    '''
    for i in range(1, 2):
        Second_info.append(TCrows[i])
    for para_head in Second_info:
        for cell in para_head:
            # print(cell.value,cell.coordinate)
            Second_info_list.append(cell.value)
    len_second_list = len(Second_info_list)

    return Second_info_list,len_second_list

def parse_row_first(First_info_list):
    '''
    分析处理首行数据,划分
    '''
    for i in First_info_list:
        if i != None:
            pre_list.append(i)

    for first_item in pre_list:
        index = First_info_list.index(first_item)
        # print(first_item, ':', index)
        pre_dic[first_item]=index
    #仅仅返回最后四部分的索引值
    return pre_dic  #{'Req_ID': 0, 'Case_ID': 1, 'Description': 2, 'PreCondition': 3, 'Action': 13, 'ExpectedResults': 22, 'Result': 31}

def parse_item_length(pre_dic):
    index_PreCondition = pre_dic['PreCondition']
    index_Action = pre_dic['Action']
    index_ExpectedResults = pre_dic['ExpectedResults']
    index_Result = pre_dic['Result']

    len_PreCondition = pre_dic['Action']-pre_dic['PreCondition']
    len_Action = pre_dic['ExpectedResults']-pre_dic['Action']
    len_precondition = pre_dic['Result']-pre_dic['ExpectedResults']
    return index_PreCondition,index_Action,index_ExpectedResults,index_Result,len_PreCondition,len_Action,len_precondition

def parse_second_item():
    '''
    处理第二行的item数据,使得Precondition[3,x]
    '''


if __name__ == '__main__':
    First_info_list, len_first_list = get_row_first()
    pre_list = []  #添加首行item信息

    pre_dic = parse_row_first(First_info_list)
    index_PreCondition, index_Action, index_ExpectedResults,index_Result,len_PreCondition, len_Action, len_precondition=parse_item_length(pre_dic)
    #处理第二行title的数据
    Second_info_list, len_second_list = get_row_second()

    Cases=[]

    for i in range(2, 3):
        Cases.append(TCrows[i])

    sum_num = 0
    Result = {}
    for case in Cases:
        model_list = {}
        model_list[First_info_list[0]]= case[0].value
        model_list[First_info_list[1]]= case[1].value
        model_list[First_info_list[2]]= case[2].value
        # {'Req_ID': 0, 'Case_ID': 1, 'Description': 2, 'PreCondition': 3, 'Action': 13, 'ExpectedResults': 22, 'Result': 31}
        precond_dict={}
        for i in range(3,3+len_PreCondition):  #(3,13)
            precond_dict[Second_info_list[i]]= case[i].value
        model_list['PreCondition'] = precond_dict
        act_dict = {}
        for j in range(index_Action, index_ExpectedResults):  # (13,22)
            act_dict[Second_info_list[j]] = case[j].value
        model_list['Action'] = act_dict
        Expect_dict = {}
        for k in range(index_ExpectedResults, index_Result):  # (22,31)
            Expect_dict[Second_info_list[k]] = case[k].value
        model_list['Action'] = Expect_dict
        model_list['Result']= case[index_Result].value

        Result[sum_num] = model_list
        sum_num += 1
    print(Result)


'''
结果展示如下:
{
    0: {
        'Req_ID': 'Func_LvPwrOff_Req001/Func_LvPwrOff_Req020/Func_LvPwrOff_Req021/Func_LvPwrOff_Req030',
        'Case_ID': 'ATC_LvPwrOff_Case001',
        'Description': 'KeyOn状态下电/VCU判断可以进行低压下电后,对BMS、DCDC、MCU(四个)、WPT、OBC等ECU低压下电控制-拉低硬线PwrOn/VCU判断可以进行低压下电后,对水泵、风扇、真空泵等低压执行器进行低压下电控制/任何状态(休闲、驾驶、充电)下低压下电,均需对热管理系统进行下电控制,控制热管理系统停止工作,降低整车静态功耗',
        'PreCondition': {
            'BMS_Tx_stPowComplete': 4,
            'VCU_Tx_stChrgFed': 0,
            'VCU_Tx_stReady': 0,
            'IN_ChrgOffButton': 0,
            'IN_KeyStart': 0,
            'KeySwt': 1,
            'IN_PowON': 1,
            'IN_12Voltage': 12,
            'RCValue': 0,
            'RC2Value': 0
        },
        'Action': {
            'VCU_Tx_stDCDCEn': 2,
            'VCU_Tx_powMaxPerm': 0,
            'VCU_Tx_stChrgFed': 0,
            'O_S_BMSPwrOn': 0,
            'O_S_MCUPwrOn': 0,
            'O_S_DCPwrOn': 0,
            'O_S_VCUst': 0,
            'O_S_OBCPwrOn': 0,
            'O_S_ACPDURly_ON': 0
        },
        'Result': None
    }
}

。。。。。。
'''
View Code

 3.  使用pypyodbc完成数据上传access数据库功能

cursor.execute("insert into resource(cid,name) values(%s, '%s')" % (12,name) );

pypy_odbc.py

some working learning总结学习(二)some working learning总结学习(二)
# import pypyodbc
# str = 'Driver={Microsoft Access Driver (*.mdb,*.accdb)};DBQ=D:\\db\\DB_BenShaw.accdb'
# db=pypyodbc.win_connect_mdb(str) # 打开数据库连接
# curser = db.cursor()# 产生cursor游标
# curser.execute("select * from 测试用例")

#
#导入模块
import pypyodbc
import json

# import MySQLdb
#定义conn
def mdb_conn(db_name):
    """
    功能:创建数据库连接
    :param db_name: 数据库名称
    :param db_name: 数据库密码,默认为空
    :return: 返回数据库连接
    """
    # str = 'Driver={Microsoft Access Driver (*.mdb)};PWD' + password + ";DBQ=" + db_name
    str = 'Driver={Microsoft Access Driver (*.mdb,*.accdb)}'+";DBQ=" + db_name
    conn = pypyodbc.win_connect_mdb(str)

    return conn

#增加记录
def mdb_add(conn, cur, sql):
    """
    功能:向数据库插入数据
    :param conn: 数据库连接
    :param cur: 游标
    :param sql: sql语句
    :return: sql语句是否执行成功
    """
    try:
        cur.execute(sql)
        conn.commit()
        return True
    except Exception as e:
        print(e)
        return False

#删除记录
def mdb_del(conn, cur, sql):
    """
    功能:向数据库删除数据
    :param conn: 数据库连接
    :param cur: 游标
    :param sql: sql语句
    :return: sql语句是否执行成功
    """
    try:
        cur.execute(sql)
        conn.commit()
        return True
    except:
        return False

#修改记录
def mdb_modi(conn, cur, sql):
    """
    功能:向数据库修改数据
    :param conn: 数据库连接
    :param cur: 游标
    :param sql: sql语句
    :return: sql语句是否执行成功
    """
    try:
        cur.execute(sql)
        conn.commit()
        return True
    except:
        return False

#查询记录
def mdb_sel(cur, sql):
    """
    功能:向数据库查询数据
    :param cur: 游标
    :param sql: sql语句
    :return: 查询结果集
    """
    try:
        cur.execute(sql)
        return cur.fetchall()
    except:
        return []


def read_test_conf():
    # 将字典类型的文件转换成字典
    f=open('a.txt',"r",encoding='utf-8')
    test_config=eval(f.read())
    f.close()
    return test_config

def dic2sql(dic):
    sf = ''

    for key in dic:
        tup = [key,dic[key]]
        sf += (str(tup) + ',')
    sf = sf.rstrip(',')

    return sf


if __name__ == '__main__':
    pathfile = 'D:\\db\\DB_BenShaw.accdb'
    tablename = '测试用例'
    conn = mdb_conn(pathfile)
    cur = conn.cursor()
    #读取testcase文件数据
    test_config = read_test_conf()

    for key,value in test_config.items():
        print("用例%s"%key+"进行数据的插入")

        Req_id_str = value['Req_ID']
        Req_id_str = Req_id_str.split('/')
        Req_id_list = []
        for i in Req_id_str:
            Req_id_list.append(i)         #['Func_LvPwrOff_Req001', 'Func_LvPwrOff_Req020', 'Func_LvPwrOff_Req021', 'Func_LvPwrOff_Req030'] 4
        # print(Req_id_list,len(Req_id_list))

        count = 0
        for Req_id in Req_id_list:
            print("用例%s里的第%s 条需求" % (key,count) + "进行数据的插入")
            #写入access数据库
            Function = '低压下电管理'
            Req_ID = Req_id
            #用例分级 = 功能测试(默认)
            TC_ID =  value['Case_ID']
            TC_Name = value['Description']
            import pymysql
            TC_Pre = value['PreCondition']
            TC_Pre = json.dumps(TC_Pre)

            # TC_Pre = pymysql.escape_string(TC_Pre)
            TC_Process = value['Action']
            TC_Process = json.dumps(TC_Process)

            TC_Result = value['Result']

#插入之前进行查询,有则覆盖,无则增加
 #           查
            sql = "SELECT ID FROM " + tablename + " where TC_ID= '%s'" % TC_ID
            sel_data = mdb_sel(cur, sql)
            print(sel_data)
            if sel_data:
                # 改
                sql = "Update " + tablename + " Set TC_Pre = '%s',TC_Process = '%s',TC_Result='%s' where TC_ID = '%s'" % (
                    TC_Pre, TC_Process, TC_Result, TC_ID)
                if mdb_modi(conn, cur, sql):
                    print("%s修改成功22222222222!" % count)
                else:
                    print("修改失败!")

            else:
                # 增
                sql = "Insert Into 测试用例(Function,Req_ID,用例分级,TC_ID,TC_Name,TC_Pre,TC_Process,TC_Result) Values ('低压下电管理', '%s','功能测试','%s','%s','%s','%s','%s')" % (
                    Req_ID, TC_ID, TC_Name, TC_Pre, TC_Process, TC_Result)
                # sql = "Insert Into " + tablename + "(Function,Req_ID,用例分级,TC_ID,TC_Name,TC_Pre,TC_Process,TC_Result) Values(低压下电管理, Func_LvPwrOff_Req001, 功能测试, ATC_LvPwrOff_Case001,Description,PreCondition,Action,Result)"
                if mdb_add(conn, cur, sql):
                    print("第%s条数据插入成功!" % count)
                else:
                    print("第%s条数据插入失败!" % count)

            count +=1

    # #删
    # sql = "Delete * FROM " + tablename + " where id = 32"
    # if mdb_del(conn, cur, sql):
    #    print("删除成功!")
    # else:
    #    print("删除失败!")
    #
    # #改
    # sql = "Update " + tablename + " Set IsFullName = 1 where ID = 33"
    # if mdb_modi(conn, cur, sql):
    #    print("修改成功!")
    # else:
    #    print("修改失败!")

    #查
    # sql = "SELECT * FROM " + tablename + " where id > 10"
    # sel_data = mdb_sel(cur, sql)
    # print(sel_data)

    cur.close()    #关闭游标
    conn.close()   #关闭数据库连接
View Code