一、框架菜单
1.1 common模块
1.2 其他
二、Excel接口测试案例编写
三、读取Excel测试封装(核心封装)
excel_utils.py 读取Excel中的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import os
import xlrd #内置模块、第三方模块pip install 自定义模块
class ExcelUtils():
def __init__( self ,file_path,sheet_name):
self .file_path = file_path
self .sheet_name = sheet_name
self .sheet = self .get_sheet() # 整个表格对象
def get_sheet( self ):
wb = xlrd.open_workbook( self .file_path)
sheet = wb.sheet_by_name( self .sheet_name)
return sheet
def get_row_count( self ):
row_count = self .sheet.nrows
return row_count
def get_col_count( self ):
col_count = self .sheet.ncols
return col_count
def __get_cell_value( self ,row_index, col_index):
cell_value = self .sheet.cell_value(row_index,col_index)
return cell_value
def get_merged_info( self ):
merged_info = self .sheet.merged_cells
return merged_info
def get_merged_cell_value( self ,row_index, col_index):
"""既能获取普通单元格的数据又能获取合并单元格数据"""
cell_value = None
for (rlow, rhigh, clow, chigh) in self .get_merged_info():
if (row_index > = rlow and row_index < rhigh):
if (col_index > = clow and col_index < chigh):
cell_value = self .__get_cell_value(rlow, clow)
break ; # 防止循环去进行判断出现值覆盖的情况
else :
cell_value = self .__get_cell_value(row_index, col_index)
else :
cell_value = self .__get_cell_value(row_index, col_index)
return cell_value
def get_sheet_data_by_dict( self ):
all_data_list = []
first_row = self .sheet.row( 0 ) #获取首行数据
for row in range ( 1 , self .get_row_count()):
row_dict = {}
for col in range ( 0 , self .get_col_count()):
row_dict[first_row[col].value] = self .get_merged_cell_value(row, col)
all_data_list.append(row_dict)
return all_data_list
if __name__ = = '__main__' :
current_path = os.path.dirname(__file__)
excel_path = os.path.join( current_path, '..' , 'samples/data/test_case.xlsx' )
excelUtils = ExcelUtils(excel_path, "Sheet1" )
for row in excelUtils.get_sheet_data_by_dict():
print ( row )
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import os
from common.excel_utils import ExcelUtils
from common.config_utils import config
current_path = os.path.dirname(__file__)
test_data_path = os.path.join( current_path, '..' , config.CASE_DATA_PATH )
class TestdataUtils():
def __init__( self ,test_data_path = test_data_path):
self .test_data_path = test_data_path
self .test_data = ExcelUtils(test_data_path, "Sheet1" ).get_sheet_data_by_dict()
self .test_data_by_mysql = SqlUtils().get_mysql_test_case_info()
def __get_testcase_data_dict( self ):
testcase_dict = {}
for row_data in self .test_data:
testcase_dict.setdefault( row_data[ '测试用例编号' ],[] ).append( row_data )
return testcase_dict
def def_testcase_data_list( self ):
testcase_list = []
for k,v in self .__get_testcase_data_dict().items():
one_case_dict = {}
one_case_dict[ "case_id" ] = k
one_case_dict[ "case_info" ] = v
testcase_list.append( one_case_dict )
return testcase_list
if __name__ = = "__main__" :
testdataUtils = TestdataUtils()
for i in testdataUtils.def_testcase_data_list():
print ( i )
|
testdata_utils.py 读取Excel中的数据后处理成需要的数据
四、request封装(核心封装)
requests_utils.py 包含post请求,get请求,异常,调用断言
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
import ast
import re
import requests
import jsonpath
from requests.exceptions import RequestException
from requests.exceptions import ProxyError
from requests.exceptions import ConnectionError
from common.config_utils import config
from common.check_utils import CheckUtils
class RequestsUtils():
def __init__( self ):
self .hosts = config.hosts
self .headers = { "ContentType" : "application/json;charset=utf-8" }
self .session = requests.session()
self .temp_variables = {}
def __get( self ,get_info):
try :
url = self .hosts + get_info[ "请求地址" ]
response = self .session.get( url = url,
params = ast.literal_eval(get_info[ "请求参数(get)" ])
)
response.encoding = response.apparent_encoding
if get_info[ "取值方式" ] = = "json取值" :
value = jsonpath.jsonpath( response.json(),get_info[ "取值代码" ] )[ 0 ]
self .temp_variables[ get_info[ "传值变量" ] ] = value
elif get_info[ "取值方式" ] = = "正则取值" :
value = re.findall(get_info[ "取值代码" ],response.text)[ 0 ]
self .temp_variables[get_info[ "传值变量" ]] = value
result = CheckUtils(response).run_check(get_info[ '期望结果类型' ], get_info[ '期望结果' ])
except ProxyError as e:
result = { 'code' : 4 , 'result' : '[%s]请求:代理错误异常' % (get_info[ "接口名称" ])}
except ConnectionError as e:
result = { 'code' : 4 , 'result' : '[%s]请求:连接超时异常' % (get_info[ "接口名称" ])}
except RequestException as e:
result = { 'code' : 4 , 'result' : '[%s]请求:Request异常,原因:%s' % (get_info[ "接口名称" ], e.__str__())}
except Exception as e:
result = { 'code' : 4 , 'result' : '[%s]请求:系统异常,原因:%s' % (get_info[ "接口名称" ],e.__str__())}
return result
def __post( self ,post_info):
try :
url = self .hosts + post_info[ "请求地址" ]
response = self .session.post( url = url,
headers = self .headers,
params = ast.literal_eval(post_info[ "请求参数(get)" ]),
# data = post_infos["提交数据(post)"],
json = ast.literal_eval(post_info[ "提交数据(post)" ])
)
response.encoding = response.apparent_encoding
if post_info[ "取值方式" ] = = "json取值" :
value = jsonpath.jsonpath( response.json(),post_info[ "取值代码" ] )[ 0 ]
self .temp_variables[ post_info[ "传值变量" ] ] = value
elif post_info[ "取值方式" ] = = "正则取值" :
value = re.findall(post_info[ "取值代码" ],response.text)[ 0 ]
self .temp_variables[post_info[ "传值变量" ]] = value
#调用CheckUtils()
result = CheckUtils(response).run_check(post_info[ '期望结果类型' ],post_info[ '期望结果' ])
except ProxyError as e:
result = { 'code' : 4 , 'result' : '[%s]请求:代理错误异常' % (post_info[ "接口名称" ])}
except ConnectionError as e:
result = { 'code' : 4 , 'result' : '[%s]请求:连接超时异常' % (post_info[ "接口名称" ])}
except RequestException as e:
result = { 'code' : 4 , 'result' : '[%s]请求:Request异常,原因:%s' % (post_info[ "接口名称" ], e.__str__())}
except Exception as e:
result = { 'code' : 4 , 'result' : '[%s]请求:系统异常,原因:%s' % (post_info[ "接口名称" ],e.__str__())}
return result
def request( self ,step_info):
try :
request_type = step_info[ "请求方式" ]
param_variable_list = re.findall( '\\${\w+}' , step_info[ "请求参数(get)" ])
if param_variable_list:
for param_variable in param_variable_list:
step_info[ "请求参数(get)" ] = step_info[ "请求参数(get)" ]\
.replace(param_variable, '"%s"' % self .temp_variables.get(param_variable[ 2 : - 1 ]))
if request_type = = "get" :
result = self .__get( step_info )
elif request_type = = "post" :
data_variable_list = re.findall( '\\${\w+}' , step_info[ "提交数据(post)" ])
if data_variable_list:
for param_variable in data_variable_list:
step_info[ "提交数据(post)" ] = step_info[ "提交数据(post)" ] \
.replace(param_variable, '"%s"' % self .temp_variables.get(param_variable[ 2 : - 1 ]))
result = self .__post( step_info )
else :
result = { 'code' : 1 , 'result' : '请求方式不支持' }
except Exception as e:
result = { 'code' : 4 , 'result' : '用例编号[%s]的[%s]步骤出现系统异常,原因:%s' % (step_info[ '测试用例编号' ],step_info[ "测试用例步骤" ],e.__str__())}
return result
def request_by_step( self ,step_infos):
self .temp_variables = {}
for step_info in step_infos:
temp_result = self .request( step_info )
# print( temp_result )
if temp_result[ 'code' ]! = 0 :
break
return temp_result
if __name__ = = "__main__" :
case_info = [
{ '请求方式' : 'get' , '请求地址' : '/cgi-bin/token' , '请求参数(get)' : '{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}' , '提交数据(post)' : ' ', ' 取值方式 ': ' json取值 ', ' 传值变量 ': ' token ', ' 取值代码 ': ' $.access_token ', ' 期望结果类型 ': ' 正则匹配 ', ' 期望结果 ': ' { "access_token" : "(.+?)" , "expires_in" :(. + ?)}'},
{ '请求方式' : 'post' , '请求地址' : '/cgi-bin/tags/create' , '请求参数(get)' : '{"access_token":${token}}' , '提交数据(post)' : '{"tag" : {"name" : "衡东"}}' , '取值方式' : '无' , '传值变量' : ' ', ' 取值代码 ': ' ', ' 期望结果类型 ': ' 正则匹配 ', ' 期望结果 ': ' { "tag" :{ "id" :(. + ?), "name" : "衡东" }}'}
]
RequestsUtils().request_by_step(case_info)
|
五、断言封装(核心封装)
check_utils.py 断言封装,与实际结果核对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
import re
import ast
class CheckUtils():
def __init__( self ,check_response = None ):
self .ck_response = check_response
self .ck_rules = {
'无' : self .no_check,
'json键是否存在' : self .check_key,
'json键值对' : self .check_keyvalue,
'正则匹配' : self .check_regexp
}
self .pass_result = {
'code' : 0 ,
'response_reason' : self .ck_response.reason,
'response_code' : self .ck_response.status_code,
'response_headers' : self .ck_response.headers,
'response_body' : self .ck_response.text,
'check_result' : True ,
'message' : '' # 扩招作为日志输出等
}
self .fail_result = {
'code' : 2 ,
'response_reason' : self .ck_response.reason,
'response_code' : self .ck_response.status_code,
'response_headers' : self .ck_response.headers,
'response_body' : self .ck_response.text,
'check_result' : False ,
'message' : '' # 扩招作为日志输出等
}
def no_check( self ):
return self .pass_result
def check_key( self ,check_data = None ):
check_data_list = check_data.split( ',' ) #把需要判断的值做切割,取出键值
res_list = [] #存放每次比较的结果
wrong_key = [] #存放比较失败key
for check_data in check_data_list: #把切割的键值和取出响应结果中的所有的键一个一个对比
if check_data in self .ck_response.json().keys():
res_list.append( self .pass_result )
else :
res_list.append( self .fail_result )
wrong_key.append(check_data) #把失败的键放进来,便于后续日志输出
# print(res_list)
# print(wrong_key)
if self .fail_result in res_list:
return self .fail_result
else :
return self .pass_result
def check_keyvalue( self ,check_data = None ):
res_list = [] # 存放每次比较的结果
wrong_items = [] # 存放比较失败 items
for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串转成字典,items()取出键值对
if check_item in self .ck_response.json().items():
res_list.append( self .pass_result )
else :
res_list.append( self .fail_result )
wrong_items.append(check_item)
# print( res_list )
# print( wrong_items )
if self .fail_result in res_list:
return self .fail_result
else :
return self .pass_result
def check_regexp( self ,check_data = None ):
pattern = re. compile (check_data)
if re.findall(pattern = pattern,string = self .ck_response.text): #匹配到了,不为空,为true
return self .pass_result
else :
return self .fail_result
def run_check( self ,check_type = None ,check_data = None ):
code = self .ck_response.status_code
if code = = 200 :
if check_type in self .ck_rules.keys():
result = self .ck_rules[check_type](check_data)
return result
else :
self .fail_result[ 'message' ] = '不支持%s判断方法' % check_type
return self .fail_result
else :
self .fail_result[ 'message' ] = '请求的响应状态码非%s' % str (code)
return self .fail_result
if __name__ = = "__main__" :
# 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值
CheckUtils({ "access_token" : "hello" , "expires_" : 7200 }).check_key( "access_token,expires_in" )
#检查键值对是否存在
CheckUtils({ "access_token" : "hello" , "expires_i" : 7200 }).check_keyvalue( '{"expires_in": 7200}' )
#正则对比
#TURE
print (CheckUtils( '{"access_token":"hello","expires_in":7200}' ).check_regexp( '"expires_in":(.+?)' ))
#False
print (CheckUtils( '{"access_token":"hello","expires":7200}' ).check_regexp( '"expires_in":(.+?)' ))
|
六、api_testcase下的api_test.py 封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import warnings
import unittest
import paramunittest
from common.testdata_utils import TestdataUtils
from common.requests_utils import RequestsUtils
#如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql() exccel数据源:def_testcase_data_list()
case_infos = TestdataUtils().def_testcase_data_list_by_mysql()
@paramunittest .parametrized(
* case_infos
)
class APITest(paramunittest.ParametrizedTestCase):
def setUp( self ) - > None :
warnings.simplefilter( 'ignore' , ResourceWarning) #不会弹出警告提示
def setParameters( self , case_id, case_info):
self .case_id = case_id
self .case_info = case_info
def test_api_common_function( self ):
'''测试描述'''
self ._testMethodName = self .case_info[ 0 ].get( "测试用例编号" )
self ._testMethodDoc = self .case_info[ 0 ].get( "测试用例名称" )
actual_result = RequestsUtils().request_by_step( self .case_info)
self .assertTrue( actual_result.get( 'check_result' ),actual_result.get( 'message' ) )
if __name__ = = '__main__' :
unittest.main()
|
七、common下的log_utils.py 封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import os
import logging
import time
from common.config_utils import config
current_path = os.path.dirname(__file__)
log_output_path = os.path.join( current_path, '..' , config.LOG_PATH )
class LogUtils():
def __init__( self ,log_path = log_output_path):
self .log_name = os.path.join( log_output_path , 'ApiTest_%s.log' % time.strftime( '%Y_%m_%d' ) )
self .logger = logging.getLogger( "ApiTestLog" )
self .logger.setLevel( config.LOG_LEVEL )
console_handler = logging.StreamHandler() # 控制台输出
file_handler = logging.FileHandler( self .log_name, 'a' ,encoding = 'utf-8' ) # 文件输出
formatter = logging.Formatter( "%(asctime)s %(name)s %(levelname)s %(message)s" )
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
self .logger.addHandler( console_handler )
self .logger.addHandler( file_handler )
console_handler.close() # 防止打印日志重复
file_handler.close() # 防止打印日志重复
def get_logger( self ):
return self .logger
logger = LogUtils().get_logger() # 防止打印日志重复
if __name__ = = '__main__' :
logger.info( 'hello' )
|
八、common下的config_utils.py的封装
配置文件的编写:
对配置文件的读取封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
import os
import configparser
current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, "../conf/local_config.ini" )
print (cfgpath)
class ConfigUtils:
def __init__( self ,config_path = cfgpath):
self .__conf = configparser.ConfigParser()
self .__conf.read(config_path, encoding = "utf-8" )
def read_ini( self ,sec,option):
value = self .__conf.get(sec,option)
return value
@property
def hosts( self ):
value = self .read_ini( 'default' , 'hosts' )
return value
@property
def LOG_PATH( self ):
value = self .read_ini( 'path' , 'LOG_PATH' )
return value
@property
def CASE_DATA_PATH( self ):
value = self .read_ini( 'path' , 'CASE_DATA_PATH' )
return value
@property
def REPORT_PATH( self ):
value = self .read_ini( 'path' , 'REPORT_PATH' )
return value
@property
def LOG_LEVEL( self ):
value = int ( self .read_ini( 'log' , 'LOG_LEVEL' ))
return value
@property
def smtp_server( self ):
smtp_server_value = self .read_ini( 'email' , 'smtp_server' )
return smtp_server_value
@property
def smtp_sender( self ):
smtp_sender_value = self .read_ini( 'email' , 'smtp_sender' )
return smtp_sender_value
@property
def smtp_password( self ):
smtp_password_value = self .read_ini( 'email' , 'smtp_password' )
return smtp_password_value
@property
def smtp_receiver( self ):
smtp_receiver_value = self .read_ini( 'email' , 'smtp_receiver' )
return smtp_receiver_value
@property
def smtp_cc( self ):
smtp_cc_value = self .read_ini( 'email' , 'smtp_cc' )
return smtp_cc_value
@property
def smtp_subject( self ):
smtp_subject_value = self .read_ini( 'email' , 'smtp_subject' )
return smtp_subject_value
config = ConfigUtils()
if __name__ = = '__main__' :
current_path = os.path.dirname(__file__)
cfgpath = os.path.join(current_path, "../conf/local_config.ini" )
config_u = ConfigUtils()
print (config_u.hosts)
print (config_u.LOG_LEVEL)
|
九、test_runner下的run_case.py 封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
class RunCase():
def __init__( self ):
self .test_case_path = test_case_path
self .report_path = test_report_path
self .title = 'P1P2接口自动化测试报告'
self .description = '自动化接口测试框架'
self .tester = '测试开发组'
def load_test_suite( self ):
discover = unittest.defaultTestLoader.discover(start_dir = self .test_case_path,
pattern = 'api_test.py' ,
top_level_dir = self .test_case_path)
all_suite = unittest.TestSuite()
all_suite.addTest( discover )
return all_suite
def run( self ):
report_dir = HTMLTestReportCN.ReportDirectory( self .report_path)
report_dir.create_dir( self .title)
report_file_path = HTMLTestReportCN.GlobalMsg.get_value( 'report_path' )
fp = open ( report_file_path , 'wb' )
runner = HTMLTestReportCN.HTMLTestRunner(stream = fp,
title = self .title,
description = self .description,
tester = self .tester)
runner.run( self .load_test_suite() )
fp.close()
return report_file_path
if __name__ = = '__main__' :
report_path = RunCase().run()
EmailUtils( open (report_path, 'rb' ).read(), report_path).send_mail()
|
十、common下的email_utils.py 封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from common.config_utils import config
class EmailUtils():
def __init__( self ,smtp_body,smtp_attch_path = None ):
self .smtp_server = config.smtp_server
self .smtp_sender = config.smtp_sender
self .smtp_password = config.smtp_password
self .smtp_receiver = config.smtp_receiver
self .smtp_cc = config.smtp_cc
self .smtp_subject = config.smtp_subject
self .smtp_body = smtp_body
self .smtp_attch = smtp_attch_path
def mail_message_body( self ):
message = MIMEMultipart()
message[ 'from' ] = self .smtp_sender
message[ 'to' ] = self .smtp_receiver
message[ 'Cc' ] = self .smtp_cc
message[ 'subject' ] = self .smtp_subject
message.attach( MIMEText( self .smtp_body, 'html' , 'utf-8' ) )
if self .smtp_attch:
attach_file = MIMEText( open ( self .smtp_attch, 'rb' ).read(), 'base64' , 'utf-8' )
attach_file[ 'Content-Type' ] = 'application/octet-stream'
attach_file.add_header( 'Content-Disposition' , 'attachment' , filename = ( 'gbk' , '', os.path.basename( self .smtp_attch)))
message.attach(attach_file)
return message
def send_mail( self ):
smtp = smtplib.SMTP()
smtp.connect( self .smtp_server)
smtp.login(user = self .smtp_sender, password = self .smtp_password)
smtp.sendmail( self .smtp_sender, self .smtp_receiver.split( "," ) + self .smtp_cc.split( "," ), self .mail_message_body().as_string())
if __name__ = = '__main__' :
html_path = os.path.dirname(__file__) + '/../test_reports/接口自动化测试报告V1.1/接口自动化测试报告V1.1.html'
EmailUtils( '<h3 align="center">自动化测试报告</h3>' ,html_path).send_mail()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/123anqier-blog/p/13376455.html