本文总结分享介绍接口测试框架开发,环境使用python3+selenium3+unittest+ddt+requests测试框架及ddt数据驱动,采用Excel管理测试用例等集成测试数据功能,以及使用HTMLTestRunner来生成测试报告,目前有开源的poman、Jmeter等接口测试工具,为什么还要开发接口测试框架呢?因接口测试工具也有存在几点不足。
- 测试数据不可控制。比如接口返回数据不可控,就无法自动断言接口返回的数据,不能断定是接口程序引起,还是测试数据变化引起的错误,所以需要做一些初始化测试数据。接口工具没有具备初始化测试数据功能,无法做到真正的接口测试自动化。
- 无法测试加密接口。实际项目中,多数接口不是可以随便调用,一般情况无法摸拟和生成加密算法。如时间戳和MDB加密算法,一般接口工具无法摸拟。
- 扩展能力不足。开源的接口测试工具无法实现扩展功能。比如,我们想生成不同格式的测试报告,想将测试报告发送到指定邮箱,又想让接口测试集成到CI中,做持续集成定时任务。
测试框架处理流程
测试框架处理过程如下:
- 首先初始化清空数据库表的数据,向数据库插入测试数据;
- 调用被测试系统提供的接口,先数据驱动读取excel用例一行数据;
- 发送请求数据,根据传参数据,向数据库查询得到对应的数据;
- 将查询的结果组装成JSON格式的数据,同时根据返回的数据值与Excel的值对比判断,并写入结果至指定Excel测试用例表格;
- 通过单元测试框架断言接口返回的数据,并生成测试报告,最后把生成最新的测试报告HTML文件发送指定的邮箱。
测试框架结构目录介绍
目录结构介绍如下:
- config/: 文件路径配置
- database/: 测试用例模板文件及数据库和发送邮箱配置文件
- db_fixture/: 初始化接口测试数据
- lib/: 程序核心模块。包含有excel解析读写、发送邮箱、发送请求、生成最新测试报告文件
- package/: 存放第三方库包。如HTMLTestRunner,用于生成HTML格式测试报告
- report/: 生成接口自动化测试报告
- testcase/: 用于编写接口自动化测试用例
- run_demo.py: 执行所有接口测试用例的主程序
- GitHub项目地址: https://github.com/yingoja/DemoAPI
数据库封装
[tester]
name = Jason [mysqlconf]
host = 127.0.0.1
port = 3306
user = root
password = 123456
db_name = guest [user]
# 发送邮箱服务器
HOST_SERVER = smtp.163.com
# 邮件发件人
FROM = 111@163.com
# 邮件收件人
TO = 222@126.com
# 发送邮箱用户名/密码
user = aaa
password = aaa
# 邮件主题
SUBJECT = 发布会系统接口自动化测试报告
config.ini
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from config import setting
from pymysql import connect,cursors
from pymysql.err import OperationalError
import configparser as cparser # --------- 读取config.ini配置文件 ---------------
cf = cparser.ConfigParser()
cf.read(setting.TEST_CONFIG,encoding='UTF-8')
host = cf.get("mysqlconf","host")
port = cf.get("mysqlconf","port")
user = cf.get("mysqlconf","user")
password = cf.get("mysqlconf","password")
db = cf.get("mysqlconf","db_name") class DB:
"""
MySQL基本操作
"""
def __init__(self):
try:
# 连接数据库
self.conn = connect(host = host,
user = user,
password = password,
db = db,
charset = 'utf8mb4',
cursorclass = cursors.DictCursor
)
except OperationalError as e:
print("Mysql Error %d: %s" % (e.args[0],e.args[1])) # 清除表数据
def clear(self,table_name):
real_sql = "delete from " + table_name + ";"
with self.conn.cursor() as cursor:
# 取消表的外键约束
cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
cursor.execute(real_sql)
self.conn.commit() # 插入表数据
def insert(self, table_name, table_data):
for key in table_data:
table_data[key] = "'"+str(table_data[key])+"'"
key = ','.join(table_data.keys())
value = ','.join(table_data.values())
real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")" with self.conn.cursor() as cursor:
cursor.execute(real_sql)
self.conn.commit() # 关闭数据库
def close(self):
self.conn.close() # 初始化数据
def init_data(self, datas):
for table, data in datas.items():
self.clear(table)
for d in data:
self.insert(table, d)
self.close()
mysql_db.py
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import sys, time, os
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from db_fixture.mysql_db import DB # 定义过去时间
past_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()-100000))
# 定义将来时间
future_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()+10000)) # 创建测试数据
datas = {
# 发布会表数据
'sign_event':[
{'id':1,'name':'红米Pro发布会','`limit`':2000,'status':1,'address':'北京会展中心','start_time':future_time},
{'id':2,'name':'苹果iphon6发布会','`limit`':1000,'status':1,'address':'宝安体育馆','start_time':future_time},
{'id':3,'name':'华为荣耀8发布会','`limit`':2000,'status':0,'address':'深圳福田会展中心','start_time':future_time},
{'id':4,'name':'苹果iphon8发布会','`limit`':2000,'status':1,'address':'深圳湾体育中心','start_time':past_time},
{'id':5,'name':'小米5发布会','`limit`':2000,'status':1,'address':'北京国家会议中心','start_time':future_time},
],
# 嘉宾表数据
'sign_guest':[
{'id':1,'realname':'Tom','phone':13511886601,'email':'alen@mail.com','sign':0,'event_id':1},
{'id':2,'realname':'Jason','phone':13511886602,'email':'sign@mail.com','sign':1,'event_id':1},
{'id':3,'realname':'Jams','phone':13511886603,'email':'tom@mail.com','sign':0,'event_id':5},
],
} # 测试数据插入表
def init_data():
DB().init_data(datas)
test_data.py
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_DIR) # 配置文件
TEST_CONFIG = os.path.join(BASE_DIR,"database","config.ini")
# 测试用例模板文件
SOURCE_FILE = os.path.join(BASE_DIR,"database","DemoAPITestCase.xlsx")
# excel测试用例结果文件
TARGET_FILE = os.path.join(BASE_DIR,"report","excelReport","DemoAPITestCase.xlsx")
# 测试用例报告
TEST_REPORT = os.path.join(BASE_DIR,"report")
# 测试用例程序文件
TEST_CASE = os.path.join(BASE_DIR,"testcase")
setting.py
程序核心模块
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os def new_report(testreport):
"""
生成最新的测试报告文件
:param testreport:
:return:返回文件
"""
lists = os.listdir(testreport)
lists.sort(key=lambda fn: os.path.getmtime(testreport + "\\" + fn))
file_new = os.path.join(testreport,lists[-1])
return file_new
netReport.py
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import xlrd class ReadExcel():
"""读取excel文件数据"""
def __init__(self,fileName, SheetName="Sheet1"):
self.data = xlrd.open_workbook(fileName)
self.table = self.data.sheet_by_name(SheetName) # 获取总行数、总列数
self.nrows = self.table.nrows
self.ncols = self.table.ncols
def read_data(self):
if self.nrows > 1:
# 获取第一行的内容,列表格式
keys = self.table.row_values(0)
listApiData = []
# 获取每一行的内容,列表格式
for col in range(1, self.nrows):
values = self.table.row_values(col)
# keys,values组合转换为字典
api_dict = dict(zip(keys, values))
listApiData.append(api_dict)
return listApiData
else:
print("表格是空数据!")
return None
readexcel.py
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys,json
sys.path.append(os.path.dirname(os.path.dirname(__file__))) class SendRequests():
"""发送请求数据"""
def sendRequests(self,s,apiData):
try:
#从读取的表格中获取响应的参数作为传递
method = apiData["method"]
url = apiData["url"]
if apiData["params"] == "":
par = None
else:
par = eval(apiData["params"])
if apiData["headers"] == "":
h = None
else:
h = eval(apiData["headers"])
if apiData["body"] == "":
body_data = None
else:
body_data = eval(apiData["body"])
type = apiData["type"]
v = False
if type == "data":
body = body_data
elif type == "json":
body = json.dumps(body_data)
else:
body = body_data #发送请求
re = s.request(method=method,url=url,headers=h,params=par,data=body,verify=v)
return re
except Exception as e:
print(e)
sendrequests.py
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from config import setting
import smtplib
from lib.newReport import new_report
import configparser
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart def send_mail(file_new):
"""
定义发送邮件
:param file_new:
:return: 成功:打印发送邮箱成功;失败:返回失败信息
"""
f = open(file_new,'rb')
mail_body = f.read()
f.close()
#发送附件
con = configparser.ConfigParser()
con.read(setting.TEST_CONFIG,encoding='utf-8')
report = new_report(setting.TEST_REPORT)
sendfile = open(report,'rb').read()
# --------- 读取config.ini配置文件 ---------------
HOST = con.get("user","HOST_SERVER")
SENDER = con.get("user","FROM")
RECEIVER = con.get("user","TO")
USER = con.get("user","user")
PWD = con.get("user","password")
SUBJECT = con.get("user","SUBJECT") att = MIMEText(sendfile,'base64','utf-8')
att["Content-Type"] = 'application/octet-stream'
att.add_header("Content-Disposition", "attachment", filename=("gbk", "", report)) msg = MIMEMultipart('related')
msg.attach(att)
msgtext = MIMEText(mail_body,'html','utf-8')
msg.attach(msgtext)
msg['Subject'] = SUBJECT
msg['from'] = SENDER
msg['to'] = RECEIVER try:
server = smtplib.SMTP()
server.connect(HOST)
server.starttls()
server.login(USER,PWD)
server.sendmail(SENDER,RECEIVER,msg.as_string())
server.quit()
print("邮件发送成功!")
except Exception as e:
print("失败: " + str(e))
sendmail.py
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import shutil
from config import setting
from openpyxl import load_workbook
from openpyxl.styles import Font,Alignment
from openpyxl.styles.colors import RED,GREEN,DARKYELLOW
import configparser as cparser # --------- 读取config.ini配置文件 ---------------
cf = cparser.ConfigParser()
cf.read(setting.TEST_CONFIG,encoding='UTF-8')
name = cf.get("tester","name") class WriteExcel():
"""文件写入数据"""
def __init__(self,fileName):
self.filename = fileName
if not os.path.exists(self.filename):
# 文件不存在,则拷贝模板文件至指定报告目录下
shutil.copyfile(setting.SOURCE_FILE,setting.TARGET_FILE)
self.wb = load_workbook(self.filename)
self.ws = self.wb.active def write_data(self,row_n,value):
"""
写入测试结果
:param row_n:数据所在行数
:param value: 测试结果值
:return: 无
"""
font_GREEN = Font(name='宋体', color=GREEN, bold=True)
font_RED = Font(name='宋体', color=RED, bold=True)
font1 = Font(name='宋体', color=DARKYELLOW, bold=True)
align = Alignment(horizontal='center', vertical='center')
# 获数所在行数
L_n = "L" + str(row_n)
M_n = "M" + str(row_n)
if value == "PASS":
self.ws.cell(row_n, 12, value)
self.ws[L_n].font = font_GREEN
if value == "FAIL":
self.ws.cell(row_n, 12, value)
self.ws[L_n].font = font_RED
self.ws.cell(row_n, 13, name)
self.ws[L_n].alignment = align
self.ws[M_n].font = font1
self.ws[M_n].alignment = align
self.wb.save(self.filename)
writeexcel.py
接口测试用例编写
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import unittest,requests,ddt
from config import setting
from lib.readexcel import ReadExcel
from lib.sendrequests import SendRequests
from lib.writeexcel import WriteExcel testData = ReadExcel(setting.SOURCE_FILE, "Sheet1").read_data() @ddt.ddt
class Demo_API(unittest.TestCase):
"""发布会系统"""
def setUp(self):
self.s = requests.session() def tearDown(self):
pass @ddt.data(*testData)
def test_api(self,data):
# 获取ID字段数值,截取结尾数字并去掉开头0
rowNum = int(data['ID'].split("_")[2])
# 发送请求
re = SendRequests().sendRequests(self.s,data)
# 获取服务端返回的值
self.result = re.json()
# 获取excel表格数据的状态码和消息
readData_code = int(data["status_code"])
readData_msg = data["msg"]
if readData_code == self.result['status'] and readData_msg == self.result['message']:
OK_data = "PASS"
WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,OK_data)
if readData_code != self.result['status'] or readData_msg != self.result['message']:
NOT_data = "FAIL"
WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,NOT_data)
self.assertEqual(self.result['status'], readData_code, "返回实际结果是->:%s" % self.result['status'])
self.assertEqual(self.result['message'], readData_msg, "返回实际结果是->:%s" % self.result['message']) if __name__=='__main__':
unittest.main()
testAPI.py
集成测试报告
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia' import os,sys
sys.path.append(os.path.dirname(__file__))
from config import setting
import unittest,time
from HTMLTestRunner import HTMLTestRunner
from lib.sendmail import send_mail
from lib.newReport import new_report
from db_fixture import test_data
from package.HTMLTestRunner import HTMLTestRunner def add_case(test_path=setting.TEST_CASE):
"""加载所有的测试用例"""
discover = unittest.defaultTestLoader.discover(test_path, pattern='*API.py')
return discover def run_case(all_case,result_path=setting.TEST_REPORT):
"""执行所有的测试用例""" # 初始化接口测试数据
test_data.init_data() now = time.strftime("%Y-%m-%d %H_%M_%S")
filename = result_path + '/' + now + 'result.html'
fp = open(filename,'wb')
runner = HTMLTestRunner(stream=fp,title='发布会系统接口自动化测试报告',
description='环境:windows 7 浏览器:chrome',
tester='Jason')
runner.run(all_case)
fp.close()
report = new_report(setting.TEST_REPORT) #调用模块生成最新的报告
send_mail(report) #调用发送邮件模块 if __name__ =="__main__":
cases = add_case()
run_case(cases)
run_demo.py
测试结果展示
- HTML测试结果报告:
- Excel测试用例结果
- 邮件收到的测试报告