一、问题分析
首先通过python编写脚本来启动Appium服务,这里需要subprocess模块,该模块可以创建新的进程,并且连接到进程的输入、输出、错误等管道信息并且可以获取进程的返回值。
二、场景构造
在做多终端并发运行测试时候,Appium都要给不同终端都要分配唯一的服务:
- Appium服务器端口:4723,bp端口为4724
- Appium服务器端口:4725,bp端口为4726
备注:
1、bp端口( --bootstrap-port)是appium和设备之间通信的端口,如果不指定到时无法操作多台设备运行脚本
2、对于端口是否可用?如何处理?(后面会介绍,可以在运行的log日志或者通过shell命令查看当前端口是否可用)
三、单服务实现方式
1 import subprocess 2 from time import ctime 3 4 def appiumserver_start(host,port): 5 '''启动appium server''' 6 bootstrap_port = str(port + 1) 7 cmd = 'start /b appium -a ' + host + ' -p ' + str(port) + ' -bp ' + str(bootstrap_port) 8 print('%s at %s' %(cmd,ctime())) 9 subprocess.Popen(cmd, shell=True,stdout=open('./appium_log/'+str(port)+'.log','a'),stderr=subprocess.STDOUT) 10 11 12 if __name__ == '__main__': 13 host = '127.0.0.1' 14 port=4723 15 appium_start(host,port)
四、检测端口是否被占用
- 在appium服务启动是否成功,此处需要校验,方法如下:
- 首先查看有没有生成对应的log文件,查看log里面的内容
- 命令查看(netstate -ano | findstr 端口号)
C:\Users\Administrator>netstat -ano |findstr "4723" TCP 127.0.0.1:4723 0.0.0.0:0 LISTENING 8224
- 服务启动后如何关闭?方法如下:
- 通过netstat命令找到对应的Appium进程pid然后可以在系统任务管理器去关闭进程;
- 使用如下命令来关闭(taskkill -f -pid
appium
进程id)
- 实现代码
1 import socket 2 import os 3 4 def check_port(host, port): 5 """检测指定的端口是否被占用""" 6 7 #创建socket对象 8 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 9 try: 10 s.connect((host, port)) 11 s.shutdown(2) 12 except OSError as msg: 13 print('port %s is available! ' %port) 14 print(msg) 15 return True 16 else: 17 print('port %s already be in use !' % port) 18 return False 19 20 21 def release_port(port): 22 """释放指定的端口""" 23 24 #查找对应端口的pid 25 cmd_find='netstat -aon | findstr %s' %port 26 print(cmd_find) 27 28 #返回命令执行后的结果 29 result = os.popen(cmd_find).read() 30 print(result) 31 32 if str(port) and 'LISTENING' in result: 33 #获取端口对应的pid进程 34 i=result.index('LISTENING') 35 start=i+len('LISTENING')+7 36 end=result.index('\n') 37 pid=result[start:end] 38 39 # 关闭被占用端口的pid 40 cmd_kill='taskkill -f -pid %s' %pid 41 print(cmd_kill) 42 os.popen(cmd_kill) 43 44 else: 45 print('port %s is available !' %port) 46 47 48 if __name__ == '__main__': 49 host='127.0.0.1' 50 port=4723 51 check_port(host,port) 52 release_port(port)
五、通过多进程并发启动Appium多服务
- 此处需要用到multiprocessing多进程模块
1 import multiprocessing 2 import subprocess 3 from time import ctime 4 5 def appiumserver_start(host,port): 6 '''启动appium server''' 7 8 bootstrap_port = str(port + 1) 9 cmd = 'start /b appium -a ' + host + ' -p ' + str(port) + ' --bootstrap-port ' + str(bootstrap_port) 10 11 print('%s at %s' %(cmd,ctime())) 12 subprocess.Popen(cmd, shell=True,stdout=open('./appium_log/'+str(port)+'.log','a'),stderr=subprocess.STDOUT) 13 14 15 #构建appium进程组 16 appium_process=[] 17 18 #加载appium进程 19 for i in range(2): 20 host='127.0.0.1' 21 port = 4723 + 2 * i 22 appium=multiprocessing.Process(target=appiumserver_start,args=(host,port)) 23 appium_process.append(appium) 24 25 26 if __name__ == '__main__': 27 #并发启动appium服务 28 for appium in appium_process: 29 appium.start() 30 for appium in appium_process: 31 appium.join()