分布式和并行是完全不同的概念,分布式只负责将一个测试脚本可调用不同的远程环境来执行;并行强调“同时”的概念,它可以借助多线程或多进程技术并行来执行脚本技术。
10.1 单进程的时代
在单线程的时代,当处理器要处理多个任务时,必须要对这些任务排一下执行顺序并按照这个顺序来执行任务。假如我们创建了两个任务,听音乐(music)和看电影(move),在单线程中我们只能按先后顺序来执行这两个任务。
Onethread.py
[html] view plain copy
- #coding=utf-8
- from time import sleep, ctime
-
- def music():
- print 'I was listening to music! %s' %ctime()
- sleep(2)
-
- def move():
- print 'I was at the movies! %s' %ctime()
- sleep(5)
-
- if __name__ == '__main__':
- music()
- move()
- print 'allend:', ctime()
别创建了两个任务music 和move,执行music 需要2 秒,执行move 需要5 秒,通过sleep()
方法设置休眠时间来模拟任务的运行时间。
给music() 和move()两个方法设置参数,用于接收要播放的歌曲和视频,通过for 循环控制播放的次数,分别让歌曲和电影播放了2 次:onethread_with_pri.py:
[html] view plain copy
- #coding=utf-8
- from time import sleep, ctime
-
- def music(func):
- for i in range(2):
- print 'I was listening to music %s! %s'%(func,ctime())
- sleep(2)
-
- def move(func):
- for i in range(2):
- print 'I was at the movies %s! %s'%(func,ctime())
- sleep(5)
-
- if __name__ == '__main__':
- music(u'爱情买卖')
- move(u'阿凡达')
- print 'all end:', ctime()
10.2 多线程技术
Python 通过两个标准库thread和threading 提供对线程的支持。thread 提供了低级别的、原始的线程以及一个简单的锁。threading 基于Java 的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python 中则是独立的对象。
10.2.1 threading 模块
避免使用thread 模块,因为是它不支持守护线程。当主线程退出时,所有的子线程不论它
们是否还在工作,都会被强行退出。threading模块则支持守护线程。Threads.py:
[html] view plain copy
- #coding=utf-8
- from time import sleep, ctime
- import threading
-
- def music(func):
- for i in range(2):
- print 'I was listening to music %s! %s'%(func,ctime())
- sleep(2)
-
- def move(func):
- for i in range(2):
- print 'I was at the movies %s! %s'%(func,ctime())
- sleep(5)
-
- threads=[]
- t1=threading.Thread(target=music,args=(u'爱情买卖',))
- threads.append(t1)
- t2=threading.Thread(target=move,args=(u'阿凡达',))
- threads.append(t2)
-
- if __name__ == '__main__':
- for i in threads:
- i.start()
- #keep thread
- for i in threads:
- i.join()
-
- print 'all end:', ctime()
import threading 引入线程模块。
threads = [] 创建线程数组,用于装载线程。
threading.Thread()通过调用threading模块的Thread()方法来创建线程。
start() 开始线程活动。
join() 等待线程终止。
通过for 循环遍历thread 数组中所装载的线程;然后通过start()函数启动每一个线程。
join()会等到线程结束,或者在给了timeout 参数的时候,等到超时为止。join()的另一个比较重要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。
10.2.2 优化线程的创建
从上面例子中发现线程的创建是颇为麻烦的,每创建一个线程都需要创建一个t(t1、t2、...),如果创建的线程较多时这样极其不方便。Player.py
[html] view plain copy
- #coding=utf-8
- from time import sleep, ctime
- import threading
-
- def music(func):
- for i in range(2):
- print 'I was listening to music %s! %s'%(func,ctime())
- sleep(2)
-
- def move(func):
- for i in range(2):
- print 'I was at the movies %s! %s'%(func,ctime())
- sleep(5)
- def player(name):
- r=name.split(".")[1]
- if r=="mp3" orr=="MP3":
- music(name)
- elif r=="mp4" or r=="MP4":
- move(name)
- else:
- print "Error:the format is notrecognized!"
-
- list=["the sale oflove.mp3","love.MP3","a fan da.mp4","the sale oflove.MP4"]
- threads=[]
- files=range(len(list))
- for i in files:
- t=threading.Thread(target=player,args=(list[i],))
- threads.append(t)
-
- if __name__ == '__main__':
- for i in files:
- threads[i].start()
-
- #keep thread
- for i in files:
- threads[i].join()
-
- print 'all end:', ctime()
创建了一个player()函数,这个函数用于判断播放文件的类型。如果是mp3 格式的,
我们将调用music()函数,如果是mp4 格式的我们调用move()函数。哪果两种格式都不是那么只能告诉用户你所提供有文件我播放不了。
然后创建了一个list 的文件列表,注意为文件加上后缀名。然后我们用len(list) 来计算list列表有多少个文件,确定循环次数。
接着通过一个for 循环,把list 中的文件添加到线程中数组threads[]中。接着启动threads[]线程组,最后打印结束时间。
现在向list 数组中添加一个文件,程序运行时会自动为其创建一个线程。
通过上面的程序,我们发现player()用于判断文件扩展名,然后调用music()和move() ,其实,music()和move()完整工作是相同的:Super_player.py
[html] view plain copy
- #coding=utf-8
- from time import sleep, ctime
- import threading
-
- def super_player(name,time):
- for i in range(2):
- print 'Start playing...%s!%s' %(file,ctime())
- sleep(time)
-
-
- dic={"love.mp3":3,"love.rmvb":103,"love.mp4":65}
- threads=[]
- files=range(len(dic))
- for file,time in dic.items():
- t=threading.Thread(target=super_player,args=(file,time))
- threads.append(t)
-
- if __name__ == '__main__':
- for i in files:
- threads[i].start()
-
- #keep thread
- for i in files:
- threads[i].join()
-
- print 'all end:', ctime()
首先创建字典list ,用于定义要播放的文件及时长(秒),通过字典的items()方法来循环的取file和time,取到的这两个值用于创建线程。接着创建super_player()函数,用于接收file 和time,用于确定要播放的文件及时长。最后是线程启动运行。
10.2.3 创建线程类
创建自己的线程类:mythread.py
[html] view plain copy
- #coding=utf-8
- import threading
- from time import sleep,ctime
-
- class MyThread(threading.Thread):
- def __init__(self,func,args,name=" "):
- threading.Thread.__init__(self)
- self.name=name
- self.func=func
- self.args=args
-
- def run(self):
- apply(self.func,self.args)
-
-
- def super_player(file,time):
- for i in range(2):
- print 'Starting playing:%s!\t%s\n' %(file,ctime())
- sleep(time)
- dic={"the sale oflove.mp3":3,"a fan da.mp4":6}
-
- threads=[]
- files=range(len(dic))
-
- for file,time in dic.items():
- t=MyThread(super_player,(file,time),super_player.__name__)
- threads.append(t)
-
- if __name__ == '__main__':
- for i in files:
- threads[i].start()
- for i in files:
- threads[i].join()
- print 'end:%s' %ctime()
MyThread(threading.Thread)
创建MyThread 类,用于继承threading.Thread 类。
__init__() 类的初始化方法对func、args、name 等参数进行初始化。
apply() 当函数参数已经存在于一个元组或字典中时,间接地调用函数。args 是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了args,任何参数都不会被传递,kwargs 是一个包含关键字参数的字典。
10.3 多进程技术
10.3.1 mutiprocessing 模块
multiprocessing 提供了本地和远程的并发性,有效的通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。。由于GIL 的存在,在CPU 密集型的程序当中,使用多线程并不能有效地利用多核CPU 的优势,因为一个解释器在同一时刻只会有一个线程在执行。
multiprocessing 模块的Process实现了多进程。process.py:
[html] view plain copy
- #coding=utf-8
- from timeimport sleep, ctime
- import multiprocessing
-
- def super_player(f,time):
- for i in range(2):
- print 'Start playing...%s! %s'%(f,ctime())
- sleep(time)
-
- dic={"love.mp3":3,"love.rmvb":4,"love.mp4":5}
- process=[]
- files=range(len(dic))
- for f,time indic.items():
- t=multiprocessing.Process(target=super_player,args=(f,time))
- process.append(t)
-
- if __name__ =='__main__':
- for i in files:
- process[i].start()
- #keep thread
- for i in files:
- process[i].join()
-
- print 'all end:%s'%ctime()
multiprocessing.Process 对象来创建一个进程。Process对象与Thread 对象的用法相同,也有start(),run(), join()的方法。multiprocessing.Process(group=None,target=None, name=None, args=(), kwargs={})target 表示调用对象,args 表示调用对象的位置参数元组。kwargs 表示调用对象的字典。Name 为别名。Group 实质上不使用。多程的结果显示是以进程
组为顺序进行显示的。在*nix 上面创建的新的进程使用的是fork
10.3.2 Pipe 和 Queue
multiprocessing 包中有Pipe 类和Queue 类来分别支持这两种IPC 机制。Pipe 和Queue 可以用来传送常见的对象。
(1)Pipe可以是单向(half-duplex),也可以是双向(duplex),通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从PIPE 一端输入对象,然后被PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。pipe.py: 注:本程序只能在Linux/Unix上运行
[html] view plain copy
- #coding=utf-8
- import multiprocessing
- def proc1(pipe):
- pipe.send('hello')
- print('proc1 rec:',pipe.recv())
- def proc2(pipe):
- print('proc2 rec:',pipe.recv())
- pipe.send('hello, too')
-
- pipe = multiprocessing.Pipe()
- p1 =multiprocessing.Process(target=proc1, args=(pipe[0],))
- p2 =multiprocessing.Process(target=proc2, args=(pipe[1],))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
这里的Pipe 是双向的。Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe 的一端(Connection 对象)。我们对Pipe 的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
(2) Queue 与Pipe 相类似,都是先进先出的结构。但Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。queue.py: 注:本程序只能在linux/Unix上运行:
[html] view plain copy
- #coding=utf-8
- import multiprocessing
- import time,os
- #input worker
- def inputQ(queue):
- info = str(os.getpid()) + '(put):' + str(time.time())
- queue.put(info)
-
- #output worker
- def outputQ(queue,lock):
- info = queue.get()
- lock.acquire()
- print (str(os.getpid()) + '(get):' + info)
- lock.release()
-
- #Main
- record1 = [] # store input processes
- record2 = [] # store outputprocesses
- lock = multiprocessing.Lock()#addlock ,avoid to san luan daying
- queue = multiprocessing.Queue(3)
- #input processes
- for i in range(10):
- process = multiprocessing.Process(target=inputQ,args=(queue,))
- process.start()
- record1.append(process)
-
- #output processes
- for i in range(10):
- process = multiprocessing.Process(target=outputQ,args=(queue,lock))
- process.start()
- record2.append(process)
-
- for p in record1:
- p.join()
-
- queue.close()
- for p in record2:
- p.join()
-
10.4 应用于自动化测试
10.4.1 应用于自动化测试项目
为实现多进程运行测试用例,我需要对文件结构进行调整:Thread\TestProject
/test_project/thread1/baidu_sta.py -----测试用例
/thread1/__init__.py
/thread2/youdao_sta.py -----测试用例
/thread2/__init__.py
/report/ ----测试报告目录
/all_tests_pro.py
创建了thread1 和thread2 两个文件夹,分别放入了两个测试用例; 下面我们编写
all_tests_process.py 文件来通过多进程来执行测试用例。test_all_pro.py:
[html] view plain copy
- #coding=utf-8
- import unittest, time, os,multiprocessing
- import HTMLTestRunner
- def EEEcreatsuit():
- casedir=[]
- listaa=os.listdir('\\Users\\ewang\\Desktop\\Python_Selenium2\\Thread\\TestProject')
- print listaa
- for xx in listaa:
- if "thread" in xx:
- casedir.append(xx)
-
- suite=[]
- for n in casedir:
- testunit=unittest.TestSuite()
- discover=unittest.defaultTestLoader.discover(n,pattern ='test*.py',top_level_dir=n)
- print discover
- for test_suite in discover:
- for test_case in test_suite:
- testunit.addTests(test_case)
- suite.append(testunit)
-
- print "===casedir:%r====" %casedir
- print "+++++++++++++++++++++++++++++++++++++++++++++++"
- print "!!!suite:%r!!!" %suite
- return suite,casedir
-
-
- def EEEEEmultiRunCase(suite,casedir):
- now =time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time()))
- filename = '.\\report/'+now+'result.html'
- fp = file(filename, 'wb')
- proclist=[]
- s=0
- for i in suite:
- runner =HTMLTestRunner.HTMLTestRunner(stream=fp,title=u'测试报告',description=u'用例执行情况:' )
- proc =multiprocessing.Process(target=runner.run(i),args=(i,))
- proclist.append(proc)
- s=s+1
- for proc in proclist: proc.start()
- for proc in proclist: proc.join()
- fp.close()
-
- if __name__ == "__main__":
- runtmp=EEEcreatsuit()
- EEEEEmultiRunCase(runtmp[0],runtmp[1])
定义casedir 数组,读取test_project 目录下的文件夹,找到文件夹的名包含“thread”的文件夹添加到casedir 数组中.
定位suite 数组,for 循环读取casedir数组中的数据(即thread1 和thread2 两个文件夹)。通过discover 分别读取文件夹下匹配test*.py 规则的用例文件,将所有用例文件添加到testunit 测试条件中,再将测试套件追加到定义的suite 数组中。
在整个EEEcreatsuite1()函数中返回suite 和casedir 两个数组的值。
定义proclist()函数,for 循环把suite数组中的用例执行结果写入HTMLTestRunner 测试报告。multiprocessing.Process 创建用例执行的多进程,把创建的多进程追加到proclist 数组中,for 循环proc.start()开始进程活动,proc.join()等待线程终止。
10.4.2 应用于Grid2分布式测试
Selenium Grid 只是提供多系统、多浏览器的执行环境,Selenium Grid 本身并不提供并行的执行策略。也就是说我们不可能简单地丢给SeleniumGrid 一个test case 它就能并行地在不同的平台及浏览器下运行。如果您希望利用Selenium Grid 分布式并行的执行测试脚本,那么需要结束Python 多线程技术。
启动Selenium Server
在本机打开两个命令提示符窗口分别:
启动一个hub (默认端口4444),ip 地址为:172.16.10.66
>java -jar selenium-server-standalone-2.39.0.jar -role hubtandalone-2.39.0.jar-role hub
启动一个本地node(节点)
>java -jar selenium-server-standalone-2.39.0.jar -role node -port 5555 -jarselenium-server-stan启动一个远程node(节点),ip 为:172.16.10.34:
>java -jar selenium-server-standalone-2.39.0ar -role node -port 5555 -hubhttp://172.16.10.66:4444/grid/register
fnngj@fnngj-VirtualBox:~/selenium$java -jar selenium-server-standalone-2.39.0ar -role node -po
grid_thread.py
[html] view plain copy
- </pre><pre name="code" class="python"><pre name="code" class="python">#coding=utf-8
- from threading import Thread
- from selenium import webdriver
- import time
-
- list ={'http://127.0.0.1:4444/wd/hub':'chrome',
- 'http://127.0.0.1:5555/wd/hub':'internet explorer',
- 'http://172.16.10.34:5555/wd/hub':'firefox'
- }
-
- def test_baidu(host,browser):
- print 'start:%s' %time.ctime()
- print host,browser
- driver = webdriver.Remote( command_executor=host,
- desired_capabilities={'platform': 'ANY',
- 'browserName':browser,
- 'version': '',
- 'javascriptEnabled': True
- })
- driver.get('http://www.baidu.com')
- driver.find_element_by_id("kw").send_keys(browser)
- driver.find_element_by_id("su").click()
- sleep(2)
- driver.close()
-
- threads = []
- files = range(len(list))
- for host,browser in list.items():
- t = Thread(target=test_baidu,args=(host,browser))
- threads.append(t)
- if __name__ == '__main__':
- for i in files:
- threads[i].start()
- for i in files:
- threads[i].join()
- print 'end:%s' %time.ctime()