前言:
如何做到,控制多设备并行执行测试用例呢。
思路篇
我们去想下,我们可以获取参数的信息,和设备的信息,那么我们也可以针对每台设备开启不一样的端口服务。那么每个服务都对应的端口,我们在获取设备列表的时候,要和 每个服务对应起来,这样,我们开启一个进城池,我们在进程池里去控制设备,每个进程池 控制不一样的设备即可。
实现篇
首先实现对应的参数篇和对应的设备端口,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
def startdevicesApp():
l_devices_list = []
port_list = []
alldevices = get_devices()
if len (alldevices)> 0 :
for item in alldevices:
port = random.randint( 1000 , 6000 )
port_list.append(port)
desired_caps = {
'platformName' : 'Android' ,
'deviceName' : item,
'platformVersion' : getPlatForm(item),
'appPackage' : get_apkname(apk_path), # 包名
'appActivity' : get_apk_lautc(apk_path), # apk的launcherActivity
'skipServerInstallation' : True ,
"port" :port
}
l_devices_list.append(desired_caps)
return l_devices_list,port_list
|
接下来,我们去写一个端口开启服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
class RunServer(threading.Thread): #启动服务的线程
def __init__( self , cmd):
threading.Thread.__init__( self )
self .cmd = cmd
def run( self ):
os.system( self .cmd)
def start(port_list: list ):
def __run(url):
time.sleep( 10 )
response = urllib.request.urlopen(url, timeout = 5 )
if str (response.getcode()).startswith( "2" ):
return True
for i in range ( 0 , len (port_list)):
cmd = "appium -p %s " % (
port_list[i])
if platform.system() = = "Windows" : # windows下启动server
t1 = RunServer(cmd)
p = Process(target = t1.start())
p.start()
while True :
time.sleep( 4 )
if __run( "http://127.0.0.1:" + port_list[i] + "/wd/hub/status" ):
break
|
我们开启服务了,接下来,我们怎样根据不同进程执行测试用例。
1
2
3
4
5
6
7
8
9
10
|
def runcase(devics):
#执行测试用例
pass
def run(deviceslist: list ):
pool = Pool( len (deviceslist))
for i in deviceslist:
pool. map (runcase, i)
pool.close()
pool.join()
|
接下来,就是我们去组合形成最后的执行的代码。
最终代码展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
from appium import webdriver
from androguard.core.bytecodes.apk import APK
import os
import random
apk_path = "/Users/lileilei/Downloads/com.tencent.mobileqq_8.5.0_1596.apk"
def get_devices() - > list :
all_devices = []
cmd = "adb devices"
reslut = os.popen(cmd).readlines()[ 1 :]
for item in reslut:
if item ! = "\n" :
all_devices.append( str (item).split( "\t" )[ 0 ])
return all_devices
def getPlatForm(dev: str ) - > str :
cmd = 'adb -s {} shell getprop ro.build.version.release' . format (dev)
reslut = os.popen(cmd).readlines()[ 0 ]
return str (reslut).split( "\n" )[ 0 ]
def get_apkname(apk):
a = APK(apk, False , "r" )
return a.get_package()
def get_apk_lautc(apk):
a = APK(apk, False , "r" )
return a.get_main_activity()
import platform
from multiprocessing import Process,Pool
import time,urllib.request
import threading
class RunServer(threading.Thread): #启动服务的线程
def __init__( self , cmd):
threading.Thread.__init__( self )
self .cmd = cmd
def run( self ):
os.system( self .cmd)
def start(port_list: list ):
def __run(url):
time.sleep( 10 )
response = urllib.request.urlopen(url, timeout = 5 )
if str (response.getcode()).startswith( "2" ):
return True
for i in range ( 0 , len (port_list)):
cmd = "appium -p %s " % (
port_list[i])
if platform.system() = = "Windows" : # windows下启动server
t1 = RunServer(cmd)
p = Process(target = t1.start())
p.start()
while True :
time.sleep( 4 )
if __run( "http://127.0.0.1:" + port_list[i] + "/wd/hub/status" ):
break
def startdevicesApp():
l_devices_list = []
port_list = []
alldevices = get_devices()
if len (alldevices)> 0 :
for item in alldevices:
port = random.randint( 1000 , 6000 )
port_list.append(port)
desired_caps = {
'platformName' : 'Android' ,
'deviceName' : item,
'platformVersion' : getPlatForm(item),
'appPackage' : get_apkname(apk_path), # 包名
'appActivity' : get_apk_lautc(apk_path), # apk的launcherActivity
'skipServerInstallation' : True ,
"port" :port
}
l_devices_list.append(desired_caps)
return l_devices_list,port_list
def runcase(devics):
#执行测试用例
pass
def run(deviceslist: list ):
pool = Pool( len (deviceslist))
for devices in deviceslist:
pool. map (runcase, devices)
pool.close()
pool.join()
if __name__ = = "__main__" :
l_devices_list,port_list = startdevicesApp()
start(port_list)
run(l_devices_list)
|
以上就是python 基于Appium控制多设备并行执行的详细内容,更多关于Appium控制多设备并行执行的资料请关注服务器之家其它相关文章!
原文链接:https://www.cnblogs.com/leiziv5/p/14225882.html