首先来看实例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# -*- coding:utf-8 -*-
import requests
import datetime
import time
import threading
'''
allow_redirects = False禁止重定向,添加在request参数后
get请求用params传参
post请求,数据类型form,用data传参
post请求,数据类型form,用data传参
post请求,数据类型json,json传参
timeout:请求超时时间,添加在request参数后
nub = 10#设置并发线程数
ResponseTime=float(result.elapsed.microseconds)/1000 #获取响应时间,单位ms
ThinkTime = 0.5#设置思考时间
AverageTime = "{:.3f}".format(float(sum(myrequest.times))/float(len(myrequest.times)))#计算数组的平均值,保留3位小数
totaltime = float(hour)*60*60 + float(minute)*60 + float(second) #计算总的思考时间+请求时间
'''
class url_request:
times = []
error = []
def weather_DC( self ):
myrequest = url_request()
weatherinfo_search = 'https://restapi.amap.com/v3/weather/weatherInfo?parameters'
params = { 'key' : 'cd1b11e80ffac05253196aa2a1233f25' ,
'city' : 110101 ,
'extensions' : 'base' ,
'output' : 'JSON' }
result = requests.get(url = weatherinfo_search, params = params)
print ( "状态码:" ,result.status_code)
print ( "返回报文:" ,result.text)
ResponseTime = float (result.elapsed.microseconds) / 1000
myrequest.times.append(ResponseTime)
if result.status_code ! = 200 :
myrequest.error.append( "0" )
if __name__ = = '__main__' :
myrequest = url_request()
threads = []
starttime = datetime.datetime.now()
print ( "请求开始时间:request start time %s" % starttime)
nub = 10
ThinkTime = 0.5
for i in range ( 1 , nub + 1 ):
t = threading.Thread(target = myrequest.weather_DC())
threads.append(t)
for t in threads:
time.sleep(ThinkTime)
print ( "线程数:thread %s" % t)
t.setDaemon( True )
t.start()
t.join()
endtime = datetime.datetime.now()
print ( "请求结束时间:request end time %s" % endtime)
time.sleep( 3 )
AverageTime = "{:.3f}" . format ( float ( sum (myrequest.times)) / float ( len (myrequest.times)))
print ( "平均响应时间:Average Response Time %s ms" % AverageTime)
usetime = str (endtime - starttime)
hour = usetime.split( ':' ).pop( 0 )
minute = usetime.split( ':' ).pop( 1 )
second = usetime.split( ':' ).pop( 2 )
totaltime = float (hour) * 60 * 60 + float (minute) * 60 + float (second)
print ( "并发数:Concurrent processing %s" % nub)
print ( "#总共消耗的时间:use total time %s s" % (totaltime - float (nub * ThinkTime)))
print ( "错误请求数:fail request %s s" % myrequest.error.count( "0" ))
|
实例扩展:
利用ruquest发送请求,利用多线程模拟并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
#!/user/bin/env python
#coding=utf-8
import requests
import datetime
import time
import threading
class url_request():
times = []
error = []
def req( self ,AppID,url):
myreq = url_request()
headers = { 'User-Agent' : 'Mozilla/5.0 (Linux; Android 4.2.1; en-us; Nexus 4 Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19' }
payload = { 'AppID' :AppID, 'CurrentURL' :url}
r = requests.post( "http://xx.xxx.com/WeiXinJSAccessToken/json/WeChatJSTicket" ,headers = headers,data = payload)
ResponseTime = float (r.elapsed.microseconds) / 1000 #获取响应时间,单位ms
myreq.times.append(ResponseTime) #将响应时间写入数组
if r.status_code ! = 200 :
myreq.error.append( "0" )
if __name__ = = '__main__' :
myreq = url_request()
threads = []
starttime = datetime.datetime.now()
print "request start time %s" % starttime
nub = 50 #设置并发线程数
ThinkTime = 0.5 #设置思考时间
for i in range ( 1 , nub + 1 ):
t = threading.Thread(target = myreq.req, args = ( '12' , 'http://m.ctrip.com/webapp/cpage/#mypoints' ))
threads.append(t)
for t in threads:
time.sleep(ThinkTime)
#print "thread %s" %t #打印线程
t.setDaemon( True )
t.start()
t.join()
endtime = datetime.datetime.now()
print "request end time %s" % endtime
time.sleep( 3 )
AverageTime = "{:.3f}" . format ( float ( sum (myreq.times)) / float ( len (myreq.times))) #计算数组的平均值,保留3位小数
print "Average Response Time %s ms" % AverageTime #打印平均响应时间
usetime = str (endtime - starttime)
hour = usetime.split( ':' ).pop( 0 )
minute = usetime.split( ':' ).pop( 1 )
second = usetime.split( ':' ).pop( 2 )
totaltime = float (hour) * 60 * 60 + float (minute) * 60 + float (second) #计算总的思考时间+请求时间
print "Concurrent processing %s" % nub #打印并发数
print "use total time %s s" % (totaltime - float (nub * ThinkTime)) #打印总共消耗的时间
print "fail request %s" % myreq.error.count( "0" ) #打印错误请求数
|
1
2
3
4
5
6
|
request start time 2015 - 02 - 10 18 : 24 : 14.316000
request end time 2015 - 02 - 10 18 : 24 : 39.769000
Average Response Time 46.700 ms
Concurrent processing 50
use total time 25.453 s
fail request 1
|
到此这篇关于Python3接口性能测试实例代码的文章就介绍到这了,更多相关Python3实现简单的接口性能测试内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://www.cnblogs.com/jiaown123/p/14902136.html