Python 多线程
分布式和并行是完全不同的概念,分布式只负责将一个测试脚本可调用不同的远程环境来执行;并行强调“同时”的概念,它可以借助多线程或多进程技术并行来执行脚本技术。
10.1 单进程的时代
在单线程的时代,当处理器要处理多个任务时,必须要对这些任务排一下执行顺序并按照这个顺序来执行任务。假如我们创建了两个任务,听音乐(music)和看电影(move),在单线程中我们只能按先后顺序来执行这两个任务。
Onethread.py
#coding=utf-8 from time import sleep, ctime def music(): print 'I was listening to music! %s' %ctime() sleep(2) def move(): print 'I was at the movies! %s' %ctime() sleep(5) if __name__ == '__main__': music() move() print 'allend:', ctime()
别创建了两个任务music 和move,执行music 需要2 秒,执行move 需要5 秒,通过sleep()
方法设置休眠时间来模拟任务的运行时间。
给music() 和move()两个方法设置参数,用于接收要播放的歌曲和视频,通过for 循环控制播放的次数,分别让歌曲和电影播放了2 次:onethread_with_pri.py:
#coding=utf-8 from time import sleep, ctime def music(func): for i in range(2): print 'I was listening to music %s! %s'%(func,ctime()) sleep(2) def move(func): for i in range(2): print 'I was at the movies %s! %s'%(func,ctime()) sleep(5) if __name__ == '__main__': music(u'爱情买卖') move(u'阿凡达') print 'all end:', ctime()
10.2 多线程技术
Python 通过两个标准库thread和threading 提供对线程的支持。thread 提供了低级别的、原始的线程以及一个简单的锁。threading 基于Java 的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python 中则是独立的对象。
10.2.1 threading 模块
避免使用thread 模块,因为是它不支持守护线程。当主线程退出时,所有的子线程不论它
们是否还在工作,都会被强行退出。threading模块则支持守护线程。Threads.py:
#coding=utf-8 from time import sleep, ctime import threading def music(func): for i in range(2): print 'I was listening to music %s! %s'%(func,ctime()) sleep(2) def move(func): for i in range(2): print 'I was at the movies %s! %s'%(func,ctime()) sleep(5) threads=[] t1=threading.Thread(target=music,args=(u'爱情买卖',)) threads.append(t1) t2=threading.Thread(target=move,args=(u'阿凡达',)) threads.append(t2) if __name__ == '__main__': for i in threads: i.start() #keep thread for i in threads: i.join() print 'all end:', ctime()
import threading 引入线程模块。
threads = [] 创建线程数组,用于装载线程。
threading.Thread()通过调用threading模块的Thread()方法来创建线程。
start() 开始线程活动。
join() 等待线程终止。
通过for 循环遍历thread 数组中所装载的线程;然后通过start()函数启动每一个线程。
join()会等到线程结束,或者在给了timeout 参数的时候,等到超时为止。join()的另一个比较重要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。
10.2.2 优化线程的创建
从上面例子中发现线程的创建是颇为麻烦的,每创建一个线程都需要创建一个t(t1、t2、...),如果创建的线程较多时这样极其不方便。Player.py
#coding=utf-8 from time import sleep, ctime import threading def music(func): for i in range(2): print 'I was listening to music %s! %s'%(func,ctime()) sleep(2) def move(func): for i in range(2): print 'I was at the movies %s! %s'%(func,ctime()) sleep(5) def player(name): r=name.split(".")[1] if r=="mp3" orr=="MP3": music(name) elif r=="mp4" or r=="MP4": move(name) else: print "Error:the format is notrecognized!" list=["the sale oflove.mp3","love.MP3","a fan da.mp4","the sale oflove.MP4"] threads=[] files=range(len(list)) for i in files: t=threading.Thread(target=player,args=(list[i],)) threads.append(t) if __name__ == '__main__': for i in files: threads[i].start() #keep thread for i in files: threads[i].join() print 'all end:', ctime()
创建了一个player()函数,这个函数用于判断播放文件的类型。如果是mp3 格式的,
我们将调用music()函数,如果是mp4 格式的我们调用move()函数。哪果两种格式都不是那么只能告诉用户你所提供有文件我播放不了。
然后创建了一个list 的文件列表,注意为文件加上后缀名。然后我们用len(list) 来计算list列表有多少个文件,确定循环次数。
接着通过一个for 循环,把list 中的文件添加到线程中数组threads[]中。接着启动threads[]线程组,最后打印结束时间。
现在向list 数组中添加一个文件,程序运行时会自动为其创建一个线程。
通过上面的程序,我们发现player()用于判断文件扩展名,然后调用music()和move() ,其实,music()和move()完整工作是相同的:Super_player.py
#coding=utf-8 from time import sleep, ctime import threading def super_player(name,time): for i in range(2): print 'Start playing...%s!%s' %(file,ctime()) sleep(time) dic={"love.mp3":3,"love.rmvb":103,"love.mp4":65} threads=[] files=range(len(dic)) for file,time in dic.items(): t=threading.Thread(target=super_player,args=(file,time)) threads.append(t) if __name__ == '__main__': for i in files: threads[i].start() #keep thread for i in files: threads[i].join() print 'all end:', ctime()
首先创建字典list ,用于定义要播放的文件及时长(秒),通过字典的items()方法来循环的取file和time,取到的这两个值用于创建线程。接着创建super_player()函数,用于接收file 和time,用于确定要播放的文件及时长。最后是线程启动运行。
10.2.3 创建线程类
创建自己的线程类:mythread.py
#coding=utf-8 import threading from time import sleep,ctime class MyThread(threading.Thread): def __init__(self,func,args,name=" "): threading.Thread.__init__(self) self.name=name self.func=func self.args=args def run(self): apply(self.func,self.args) def super_player(file,time): for i in range(2): print 'Starting playing:%s!\t%s\n' %(file,ctime()) sleep(time) dic={"the sale oflove.mp3":3,"a fan da.mp4":6} threads=[] files=range(len(dic)) for file,time in dic.items(): t=MyThread(super_player,(file,time),super_player.__name__) threads.append(t) if __name__ == '__main__': for i in files: threads[i].start() for i in files: threads[i].join() print 'end:%s' %ctime()
MyThread(threading.Thread)
创建MyThread 类,用于继承threading.Thread 类。
__init__() 类的初始化方法对func、args、name 等参数进行初始化。
apply() 当函数参数已经存在于一个元组或字典中时,间接地调用函数。args 是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了args,任何参数都不会被传递,kwargs 是一个包含关键字参数的字典。
10.3 多进程技术
10.3.1 mutiprocessing 模块
multiprocessing 提供了本地和远程的并发性,有效的通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。。由于GIL 的存在,在CPU 密集型的程序当中,使用多线程并不能有效地利用多核CPU 的优势,因为一个解释器在同一时刻只会有一个线程在执行。
multiprocessing 模块的Process实现了多进程。process.py:
#coding=utf-8 from timeimport sleep, ctime import multiprocessing def super_player(f,time): for i in range(2): print 'Start playing...%s! %s'%(f,ctime()) sleep(time) dic={"love.mp3":3,"love.rmvb":4,"love.mp4":5} process=[] files=range(len(dic)) for f,time indic.items(): t=multiprocessing.Process(target=super_player,args=(f,time)) process.append(t) if __name__ =='__main__': for i in files: process[i].start() #keep thread for i in files: process[i].join() print 'all end:%s'%ctime()
multiprocessing.Process 对象来创建一个进程。Process对象与Thread 对象的用法相同,也有start(),run(), join()的方法。multiprocessing.Process(group=None,target=None, name=None, args=(), kwargs={})target 表示调用对象,args 表示调用对象的位置参数元组。kwargs 表示调用对象的字典。Name 为别名。Group 实质上不使用。多程的结果显示是以进程
组为顺序进行显示的。在*nix 上面创建的新的进程使用的是fork
10.3.2 Pipe 和 Queue
multiprocessing 包中有Pipe 类和Queue 类来分别支持这两种IPC 机制。Pipe 和Queue 可以用来传送常见的对象。
(1)Pipe可以是单向(half-duplex),也可以是双向(duplex),通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从PIPE 一端输入对象,然后被PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。pipe.py: 注:本程序只能在linux/Unix上运行
#coding=utf-8 import multiprocessing def proc1(pipe): pipe.send('hello') print('proc1 rec:',pipe.recv()) def proc2(pipe): print('proc2 rec:',pipe.recv()) pipe.send('hello, too') pipe = multiprocessing.Pipe() p1 =multiprocessing.Process(target=proc1, args=(pipe[0],)) p2 =multiprocessing.Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join()
这里的Pipe 是双向的。Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe 的一端(Connection 对象)。我们对Pipe 的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
(2) Queue 与Pipe 相类似,都是先进先出的结构。但Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。queue.py: 注:本程序只能在linux/Unix上运行:
#coding=utf-8 import multiprocessing import time,os #input worker def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.time()) queue.put(info) #output worker def outputQ(queue,lock): info = queue.get() lock.acquire() print (str(os.getpid()) + '(get):' + info) lock.release() #Main record1 = [] # store input processes record2 = [] # store outputprocesses lock = multiprocessing.Lock()#addlock ,avoid to san luan daying queue = multiprocessing.Queue(3) #input processes for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) #output processes for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,lock)) process.start() record2.append(process) for p in record1: p.join() queue.close() for p in record2: p.join()
10.4 应用于自动化测试
10.4.1 应用于自动化测试项目
为实现多进程运行测试用例,我需要对文件结构进行调整:Thread\TestProject
/test_project/thread1/baidu_sta.py -----测试用例
/thread1/__init__.py
/thread2/youdao_sta.py -----测试用例
/thread2/__init__.py
/report/ ----测试报告目录
/all_tests_pro.py
创建了thread1 和thread2 两个文件夹,分别放入了两个测试用例; 下面我们编写
all_tests_process.py 文件来通过多进程来执行测试用例。test_all_pro.py:
#coding=utf-8 import unittest, time, os,multiprocessing import HTMLTestRunner def EEEcreatsuit(): casedir=[] listaa=os.listdir('\\Users\\ewang\\Desktop\\Python_Selenium2\\Thread\\TestProject') print listaa for xx in listaa: if "thread" in xx: casedir.append(xx) suite=[] for n in casedir: testunit=unittest.TestSuite() discover=unittest.defaultTestLoader.discover(n,pattern ='test*.py',top_level_dir=n) print discover for test_suite in discover: for test_case in test_suite: testunit.addTests(test_case) suite.append(testunit) print "===casedir:%r====" %casedir print "+++++++++++++++++++++++++++++++++++++++++++++++" print "!!!suite:%r!!!" %suite return suite,casedir def EEEEEmultiRunCase(suite,casedir): now =time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time())) filename = '.\\report/'+now+'result.html' fp = file(filename, 'wb') proclist=[] s=0 for i in suite: runner =HTMLTestRunner.HTMLTestRunner(stream=fp,title=u'测试报告',description=u'用例执行情况:' ) proc =multiprocessing.Process(target=runner.run(i),args=(i,)) proclist.append(proc) s=s+1 for proc in proclist: proc.start() for proc in proclist: proc.join() fp.close() if __name__ == "__main__": runtmp=EEEcreatsuit() EEEEEmultiRunCase(runtmp[0],runtmp[1])
定义casedir 数组,读取test_project 目录下的文件夹,找到文件夹的名包含“thread”的文件夹添加到casedir 数组中.
定位suite 数组,for 循环读取casedir数组中的数据(即thread1 和thread2 两个文件夹)。通过discover 分别读取文件夹下匹配test*.py 规则的用例文件,将所有用例文件添加到testunit 测试条件中,再将测试套件追加到定义的suite 数组中。
在整个EEEcreatsuite1()函数中返回suite 和casedir 两个数组的值。
定义proclist()函数,for 循环把suite数组中的用例执行结果写入HTMLTestRunner 测试报告。multiprocessing.Process 创建用例执行的多进程,把创建的多进程追加到proclist 数组中,for 循环proc.start()开始进程活动,proc.join()等待线程终止。
10.4.2 应用于Grid2分布式测试
Selenium Grid 只是提供多系统、多浏览器的执行环境,Selenium Grid 本身并不提供并行的执行策略。也就是说我们不可能简单地丢给SeleniumGrid 一个test case 它就能并行地在不同的平台及浏览器下运行。如果您希望利用Selenium Grid 分布式并行的执行测试脚本,那么需要结束Python 多线程技术。
启动Selenium Server
在本机打开两个命令提示符窗口分别:
启动一个hub (默认端口4444),ip 地址为:172.16.10.66
>java -jar selenium-server-standalone-2.39.0.jar -role hubtandalone-2.39.0.jar-role hub
启动一个本地node(节点)
>java -jar selenium-server-standalone-2.39.0.jar -role node -port 5555 -jarselenium-server-stan启动一个远程node(节点),ip 为:172.16.10.34:
>java -jar selenium-server-standalone-2.39.0ar -role node -port 5555 -hubhttp://172.16.10.66:4444/grid/register
fnngj@fnngj-VirtualBox:~/selenium$java -jar selenium-server-standalone-2.39.0ar -role node -po
grid_thread.py
</pre><pre name="code" class="python"><pre name="code" class="python">#coding=utf-8 from threading import Thread from selenium import webdriver import time list ={'http://127.0.0.1:4444/wd/hub':'chrome', 'http://127.0.0.1:5555/wd/hub':'internet explorer', 'http://172.16.10.34:5555/wd/hub':'firefox' } def test_baidu(host,browser): print 'start:%s' %time.ctime() print host,browser driver = webdriver.Remote( command_executor=host, desired_capabilities={'platform': 'ANY', 'browserName':browser, 'version': '', 'javascriptEnabled': True }) driver.get('http://www.baidu.com') driver.find_element_by_id("kw").send_keys(browser) driver.find_element_by_id("su").click() sleep(2) driver.close() threads = [] files = range(len(list)) for host,browser in list.items(): t = Thread(target=test_baidu,args=(host,browser)) threads.append(t) if __name__ == '__main__': for i in files: threads[i].start() for i in files: threads[i].join() print 'end:%s' %time.ctime()
</pre><pre>