2. 读取excel上的测试用例和结果,成json格式,最终存到access数据库
结果如下:
{ 0: { 'Req_ID': 'Fqqqqq/qqqqq', 'Case_ID': 'ATC_LvPwrOff_Case001', 'Description': 'kkkkkkkkkkk', 'PreCondition': { 'B': 4, 'V: 0, 'V': 0, 'IN': 0, 'IN': 0, 'Ke: 1, 'INN': 1, 'INage': 12, 'Re': 0, 'Rue': 0 },
excel_tojson.py
from openpyxl import load_workbook from openpyxl.styles import colors, Font wb = load_workbook('VCU_TC_LvPwrOff.xlsx') sheet = wb.active # 获得当前正在显示的sheet, 也可以用wb.get_active_sheet()# 获得当前正在显示的sheet, 也可以用wb.get_active_sheet() # b5_too = sheet.cell(row=1, column=2) # print(b5_too.value) # # b6_too = sheet.cell(row=1, column=1) # b7_too = sheet.cell(row=2, column=1) # b8_too = sheet.cell(row=3, column=1) # # print(b6_too.value) # print(b7_too.value) # print(b8_too.value) # # print('-'*100) # for cell in list(sheet.rows)[0]: # print(cell.value) # # print('-'*100) # #判断一行的有效数据长度 # first_len = list(sheet.rows)[0] # # for i in first_len: # # if 'None' == i.value: # # first_len.remove(i) # print(len(first_len)) # print('-'*100) # for row in sheet.iter_rows('A1:AF1'): # for cell in row: # print(cell.value) TCrows=[] First_info=[] First_info_list = [] Second_info=[] Second_info_list = [] pre_dic = {} #获取testcase #获取所有行对象 for row in sheet.iter_rows(): TCrows.append(row) def get_all_row(): ''' 获取首行row的信息 {"1":{"Req_ID":0,"Case_ID":1,"Description":2,"PreCondition":{ "IN_ChrgOffButton":x, "IN_KeyStart":x}, "Action":{ "IN_KeyStart":x } {'Req_ID': 0, 'Case_ID': 1, 'Description': 2, 'PreCondition': 3, 'Action': 13, 'ExpectedResults': 22, 'Result': 31} } } ''' # for i in range(2, 14): pass # for i in range(2, 3): # Cases.append(TCrows[i]) # for Case in Cases: # for cell in para_head: # # print(cell.value,cell.coordinate) # First_info_list.append(cell.value) # # # print(First_info_list) # len_first_list = len(First_info_list) # # print(len_first_list) # while True: # if First_info_list[-1] == 'None': # First_info_list.pop() # else: # break # return First_info_list, len_first_list def get_row_first(): ''' 获取首行row的信息 ''' for i in range(0, 1): First_info.append(TCrows[i]) for para_head in First_info: for cell in para_head: # print(cell.value,cell.coordinate) First_info_list.append(cell.value) # print(First_info_list) len_first_list = len(First_info_list) # print(len_first_list) while True: if First_info_list[-1] == 'None': First_info_list.pop() else: break return First_info_list,len_first_list def get_row_second(): ''' 获取第二行row的信息 ''' for i in range(1, 2): Second_info.append(TCrows[i]) for para_head in Second_info: for cell in para_head: # print(cell.value,cell.coordinate) Second_info_list.append(cell.value) len_second_list = len(Second_info_list) return Second_info_list,len_second_list def parse_row_first(First_info_list): ''' 分析处理首行数据,划分 ''' for i in First_info_list: if i != None: pre_list.append(i) for first_item in pre_list: index = First_info_list.index(first_item) # print(first_item, ':', index) pre_dic[first_item]=index #仅仅返回最后四部分的索引值 return pre_dic #{'Req_ID': 0, 'Case_ID': 1, 'Description': 2, 'PreCondition': 3, 'Action': 13, 'ExpectedResults': 22, 'Result': 31} def parse_item_length(pre_dic): index_PreCondition = pre_dic['PreCondition'] index_Action = pre_dic['Action'] index_ExpectedResults = pre_dic['ExpectedResults'] index_Result = pre_dic['Result'] len_PreCondition = pre_dic['Action']-pre_dic['PreCondition'] len_Action = pre_dic['ExpectedResults']-pre_dic['Action'] len_precondition = pre_dic['Result']-pre_dic['ExpectedResults'] return index_PreCondition,index_Action,index_ExpectedResults,index_Result,len_PreCondition,len_Action,len_precondition def parse_second_item(): ''' 处理第二行的item数据,使得Precondition[3,x] ''' if __name__ == '__main__': First_info_list, len_first_list = get_row_first() pre_list = [] #添加首行item信息 pre_dic = parse_row_first(First_info_list) index_PreCondition, index_Action, index_ExpectedResults,index_Result,len_PreCondition, len_Action, len_precondition=parse_item_length(pre_dic) #处理第二行title的数据 Second_info_list, len_second_list = get_row_second() Cases=[] for i in range(2, 3): Cases.append(TCrows[i]) sum_num = 0 Result = {} for case in Cases: model_list = {} model_list[First_info_list[0]]= case[0].value model_list[First_info_list[1]]= case[1].value model_list[First_info_list[2]]= case[2].value # {'Req_ID': 0, 'Case_ID': 1, 'Description': 2, 'PreCondition': 3, 'Action': 13, 'ExpectedResults': 22, 'Result': 31} precond_dict={} for i in range(3,3+len_PreCondition): #(3,13) precond_dict[Second_info_list[i]]= case[i].value model_list['PreCondition'] = precond_dict act_dict = {} for j in range(index_Action, index_ExpectedResults): # (13,22) act_dict[Second_info_list[j]] = case[j].value model_list['Action'] = act_dict Expect_dict = {} for k in range(index_ExpectedResults, index_Result): # (22,31) Expect_dict[Second_info_list[k]] = case[k].value model_list['Action'] = Expect_dict model_list['Result']= case[index_Result].value Result[sum_num] = model_list sum_num += 1 print(Result) ''' 结果展示如下: { 0: { 'Req_ID': 'Func_LvPwrOff_Req001/Func_LvPwrOff_Req020/Func_LvPwrOff_Req021/Func_LvPwrOff_Req030', 'Case_ID': 'ATC_LvPwrOff_Case001', 'Description': 'KeyOn状态下电/VCU判断可以进行低压下电后,对BMS、DCDC、MCU(四个)、WPT、OBC等ECU低压下电控制-拉低硬线PwrOn/VCU判断可以进行低压下电后,对水泵、风扇、真空泵等低压执行器进行低压下电控制/任何状态(休闲、驾驶、充电)下低压下电,均需对热管理系统进行下电控制,控制热管理系统停止工作,降低整车静态功耗', 'PreCondition': { 'BMS_Tx_stPowComplete': 4, 'VCU_Tx_stChrgFed': 0, 'VCU_Tx_stReady': 0, 'IN_ChrgOffButton': 0, 'IN_KeyStart': 0, 'KeySwt': 1, 'IN_PowON': 1, 'IN_12Voltage': 12, 'RCValue': 0, 'RC2Value': 0 }, 'Action': { 'VCU_Tx_stDCDCEn': 2, 'VCU_Tx_powMaxPerm': 0, 'VCU_Tx_stChrgFed': 0, 'O_S_BMSPwrOn': 0, 'O_S_MCUPwrOn': 0, 'O_S_DCPwrOn': 0, 'O_S_VCUst': 0, 'O_S_OBCPwrOn': 0, 'O_S_ACPDURly_ON': 0 }, 'Result': None } } 。。。。。。 '''
3. 使用pypyodbc完成数据上传access数据库功能
cursor.execute("insert into resource(cid,name) values(%s, '%s')" % (12,name) );
pypy_odbc.py
# import pypyodbc # str = 'Driver={Microsoft Access Driver (*.mdb,*.accdb)};DBQ=D:\\db\\DB_BenShaw.accdb' # db=pypyodbc.win_connect_mdb(str) # 打开数据库连接 # curser = db.cursor()# 产生cursor游标 # curser.execute("select * from 测试用例") # #导入模块 import pypyodbc import json # import MySQLdb #定义conn def mdb_conn(db_name): """ 功能:创建数据库连接 :param db_name: 数据库名称 :param db_name: 数据库密码,默认为空 :return: 返回数据库连接 """ # str = 'Driver={Microsoft Access Driver (*.mdb)};PWD' + password + ";DBQ=" + db_name str = 'Driver={Microsoft Access Driver (*.mdb,*.accdb)}'+";DBQ=" + db_name conn = pypyodbc.win_connect_mdb(str) return conn #增加记录 def mdb_add(conn, cur, sql): """ 功能:向数据库插入数据 :param conn: 数据库连接 :param cur: 游标 :param sql: sql语句 :return: sql语句是否执行成功 """ try: cur.execute(sql) conn.commit() return True except Exception as e: print(e) return False #删除记录 def mdb_del(conn, cur, sql): """ 功能:向数据库删除数据 :param conn: 数据库连接 :param cur: 游标 :param sql: sql语句 :return: sql语句是否执行成功 """ try: cur.execute(sql) conn.commit() return True except: return False #修改记录 def mdb_modi(conn, cur, sql): """ 功能:向数据库修改数据 :param conn: 数据库连接 :param cur: 游标 :param sql: sql语句 :return: sql语句是否执行成功 """ try: cur.execute(sql) conn.commit() return True except: return False #查询记录 def mdb_sel(cur, sql): """ 功能:向数据库查询数据 :param cur: 游标 :param sql: sql语句 :return: 查询结果集 """ try: cur.execute(sql) return cur.fetchall() except: return [] def read_test_conf(): # 将字典类型的文件转换成字典 f=open('a.txt',"r",encoding='utf-8') test_config=eval(f.read()) f.close() return test_config def dic2sql(dic): sf = '' for key in dic: tup = [key,dic[key]] sf += (str(tup) + ',') sf = sf.rstrip(',') return sf if __name__ == '__main__': pathfile = 'D:\\db\\DB_BenShaw.accdb' tablename = '测试用例' conn = mdb_conn(pathfile) cur = conn.cursor() #读取testcase文件数据 test_config = read_test_conf() for key,value in test_config.items(): print("用例%s"%key+"进行数据的插入") Req_id_str = value['Req_ID'] Req_id_str = Req_id_str.split('/') Req_id_list = [] for i in Req_id_str: Req_id_list.append(i) #['Func_LvPwrOff_Req001', 'Func_LvPwrOff_Req020', 'Func_LvPwrOff_Req021', 'Func_LvPwrOff_Req030'] 4 # print(Req_id_list,len(Req_id_list)) count = 0 for Req_id in Req_id_list: print("用例%s里的第%s 条需求" % (key,count) + "进行数据的插入") #写入access数据库 Function = '低压下电管理' Req_ID = Req_id #用例分级 = 功能测试(默认) TC_ID = value['Case_ID'] TC_Name = value['Description'] import pymysql TC_Pre = value['PreCondition'] TC_Pre = json.dumps(TC_Pre) # TC_Pre = pymysql.escape_string(TC_Pre) TC_Process = value['Action'] TC_Process = json.dumps(TC_Process) TC_Result = value['Result'] #插入之前进行查询,有则覆盖,无则增加 # 查 sql = "SELECT ID FROM " + tablename + " where TC_ID= '%s'" % TC_ID sel_data = mdb_sel(cur, sql) print(sel_data) if sel_data: # 改 sql = "Update " + tablename + " Set TC_Pre = '%s',TC_Process = '%s',TC_Result='%s' where TC_ID = '%s'" % ( TC_Pre, TC_Process, TC_Result, TC_ID) if mdb_modi(conn, cur, sql): print("%s修改成功22222222222!" % count) else: print("修改失败!") else: # 增 sql = "Insert Into 测试用例(Function,Req_ID,用例分级,TC_ID,TC_Name,TC_Pre,TC_Process,TC_Result) Values ('低压下电管理', '%s','功能测试','%s','%s','%s','%s','%s')" % ( Req_ID, TC_ID, TC_Name, TC_Pre, TC_Process, TC_Result) # sql = "Insert Into " + tablename + "(Function,Req_ID,用例分级,TC_ID,TC_Name,TC_Pre,TC_Process,TC_Result) Values(低压下电管理, Func_LvPwrOff_Req001, 功能测试, ATC_LvPwrOff_Case001,Description,PreCondition,Action,Result)" if mdb_add(conn, cur, sql): print("第%s条数据插入成功!" % count) else: print("第%s条数据插入失败!" % count) count +=1 # #删 # sql = "Delete * FROM " + tablename + " where id = 32" # if mdb_del(conn, cur, sql): # print("删除成功!") # else: # print("删除失败!") # # #改 # sql = "Update " + tablename + " Set IsFullName = 1 where ID = 33" # if mdb_modi(conn, cur, sql): # print("修改成功!") # else: # print("修改失败!") #查 # sql = "SELECT * FROM " + tablename + " where id > 10" # sel_data = mdb_sel(cur, sql) # print(sel_data) cur.close() #关闭游标 conn.close() #关闭数据库连接