Python 多线程多进程10.4.1 应用于自动化测试项目

时间:2021-07-10 06:54:28

分布式和并行是完全不同的概念,分布式只负责将一个测试脚本可调用不同的远程环境来执行;并行强调“同时”的概念,它可以借助多线程或多进程技术并行来执行脚本技术。


10.1 单进程的时代


         在单线程的时代,当处理器要处理多个任务时,必须要对这些任务排一下执行顺序并按照这个顺序来执行任务。假如我们创建了两个任务,听音乐(music)和看电影(move),在单线程中我们只能按先后顺序来执行这两个任务。

Onethread.py

[html] view plain copy
  1. #coding=utf-8  
  2. from time import sleep, ctime  
  3.    
  4. def music():  
  5.    print 'I was listening to music! %s' %ctime()  
  6.    sleep(2)  
  7.    
  8. def move():  
  9.    print 'I was at the movies! %s' %ctime()  
  10.    sleep(5)  
  11.      
  12. if __name__ == '__main__':  
  13.    music()  
  14.    move()  
  15. print 'allend:', ctime()  

         别创建了两个任务music 和move,执行music 需要2 秒,执行move 需要5 秒,通过sleep()

方法设置休眠时间来模拟任务的运行时间。

    给music() 和move()两个方法设置参数,用于接收要播放的歌曲和视频,通过for 循环控制播放的次数,分别让歌曲和电影播放了2 次:onethread_with_pri.py:

[html] view plain copy
  1. #coding=utf-8  
  2. from time import sleep, ctime  
  3.    
  4. def music(func):  
  5.     for i in range(2):  
  6.         print 'I was listening to music %s! %s'%(func,ctime())  
  7.         sleep(2)  
  8.    
  9. def move(func):  
  10.      for i in range(2):  
  11.         print 'I was at the movies %s! %s'%(func,ctime())  
  12.         sleep(5)  
  13.      
  14. if __name__ == '__main__':  
  15.     music(u'爱情买卖')  
  16.     move(u'阿凡达')  
  17. print 'all end:', ctime()  


10.2 多线程技术


         Python 通过两个标准库thread和threading 提供对线程的支持。thread 提供了低级别的、原始的线程以及一个简单的锁。threading 基于Java 的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python 中则是独立的对象。


   10.2.1 threading 模块


         避免使用thread 模块,因为是它不支持守护线程。当主线程退出时,所有的子线程不论它

们是否还在工作,都会被强行退出。threading模块则支持守护线程。Threads.py:

[html] view plain copy
  1. #coding=utf-8  
  2. from time import sleep, ctime  
  3. import threading  
  4.    
  5. def music(func):  
  6.    for i in range(2):  
  7.         print 'I was listening to music %s! %s'%(func,ctime())  
  8.         sleep(2)  
  9.    
  10. def move(func):  
  11.     for i in range(2):  
  12.         print 'I was at the movies %s! %s'%(func,ctime())  
  13.         sleep(5)  
  14.    
  15. threads=[]  
  16. t1=threading.Thread(target=music,args=(u'爱情买卖',))  
  17. threads.append(t1)  
  18. t2=threading.Thread(target=move,args=(u'阿凡达',))  
  19. threads.append(t2)  
  20.      
  21. if __name__ == '__main__':  
  22.    for i in threads:  
  23.         i.start()  
  24.    #keep thread  
  25.    for i in threads:  
  26.         i.join()  
  27.          
  28. print 'all end:', ctime()  

import threading 引入线程模块。

threads = [] 创建线程数组,用于装载线程。

threading.Thread()通过调用threading模块的Thread()方法来创建线程。

start() 开始线程活动。

join() 等待线程终止。

通过for 循环遍历thread 数组中所装载的线程;然后通过start()函数启动每一个线程。

join()会等到线程结束,或者在给了timeout 参数的时候,等到超时为止。join()的另一个比较重要的方面是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。


  10.2.2 优化线程的创建

         从上面例子中发现线程的创建是颇为麻烦的,每创建一个线程都需要创建一个t(t1、t2、...),如果创建的线程较多时这样极其不方便。Player.py

[html] view plain copy
  1. #coding=utf-8  
  2. from time import sleep, ctime  
  3. import threading  
  4.    
  5. def music(func):  
  6.     for i in range(2):  
  7.         print 'I was listening to music %s! %s'%(func,ctime())  
  8.         sleep(2)  
  9.    
  10. def move(func):  
  11.      for i in range(2):  
  12.         print 'I was at the movies %s! %s'%(func,ctime())  
  13.         sleep(5)  
  14. def player(name):  
  15.     r=name.split(".")[1]  
  16.     if r=="mp3" orr=="MP3":  
  17.         music(name)  
  18.     elif r=="mp4" or  r=="MP4":  
  19.         move(name)  
  20.     else:  
  21.         print "Error:the format is notrecognized!"  
  22.    
  23. list=["the sale oflove.mp3","love.MP3","a fan da.mp4","the sale oflove.MP4"]  
  24. threads=[]  
  25. files=range(len(list))  
  26. for i in files:  
  27.    t=threading.Thread(target=player,args=(list[i],))  
  28.     threads.append(t)  
  29.      
  30. if __name__ == '__main__':  
  31.     for i in files:  
  32.         threads[i].start()  
  33.    
  34.     #keep thread  
  35.     for i in files:  
  36.         threads[i].join()  
  37.          
  38. print 'all end:', ctime()  

    创建了一个player()函数,这个函数用于判断播放文件的类型。如果是mp3 格式的,

我们将调用music()函数,如果是mp4 格式的我们调用move()函数。哪果两种格式都不是那么只能告诉用户你所提供有文件我播放不了。

         然后创建了一个list 的文件列表,注意为文件加上后缀名。然后我们用len(list) 来计算list列表有多少个文件,确定循环次数。

    接着通过一个for 循环,把list 中的文件添加到线程中数组threads[]中。接着启动threads[]线程组,最后打印结束时间。

现在向list 数组中添加一个文件,程序运行时会自动为其创建一个线程。

 

通过上面的程序,我们发现player()用于判断文件扩展名,然后调用music()和move() ,其实,music()和move()完整工作是相同的:Super_player.py

[html] view plain copy
  1. #coding=utf-8  
  2. from time import sleep, ctime  
  3. import threading  
  4.    
  5. def super_player(name,time):  
  6.     for i in range(2):  
  7.         print 'Start playing...%s!%s' %(file,ctime())  
  8.         sleep(time)  
  9.      
  10.    
  11. dic={"love.mp3":3,"love.rmvb":103,"love.mp4":65}  
  12. threads=[]  
  13. files=range(len(dic))  
  14. for file,time in dic.items():  
  15.    t=threading.Thread(target=super_player,args=(file,time))  
  16.     threads.append(t)  
  17.      
  18. if __name__ == '__main__':  
  19.     for i in files:  
  20.         threads[i].start()  
  21.    
  22.     #keep thread  
  23.     for i in files:  
  24.         threads[i].join()  
  25.          
  26. print 'all end:', ctime()  

首先创建字典list ,用于定义要播放的文件及时长(秒),通过字典的items()方法来循环的取file和time,取到的这两个值用于创建线程。接着创建super_player()函数,用于接收file 和time,用于确定要播放的文件及时长。最后是线程启动运行。

  10.2.3 创建线程类

                   创建自己的线程类:mythread.py

[html] view plain copy
  1. #coding=utf-8  
  2. import threading  
  3. from time import sleep,ctime  
  4.    
  5. class MyThread(threading.Thread):  
  6.    def __init__(self,func,args,name=" "):  
  7.        threading.Thread.__init__(self)  
  8.        self.name=name  
  9.        self.func=func  
  10.        self.args=args  
  11.    
  12.    def run(self):  
  13.        apply(self.func,self.args)  
  14.    
  15.    
  16. def super_player(file,time):  
  17.    for i in range(2):  
  18.        print 'Starting playing:%s!\t%s\n' %(file,ctime())  
  19.        sleep(time)  
  20. dic={"the sale oflove.mp3":3,"a fan da.mp4":6}  
  21.    
  22. threads=[]  
  23. files=range(len(dic))  
  24.    
  25. for file,time in dic.items():  
  26.    t=MyThread(super_player,(file,time),super_player.__name__)  
  27.    threads.append(t)  
  28.    
  29. if __name__ == '__main__':  
  30.    for i in files:  
  31.        threads[i].start()  
  32.    for i in files:  
  33.        threads[i].join()  
  34. print 'end:%s' %ctime()  

 

 

MyThread(threading.Thread)

创建MyThread 类,用于继承threading.Thread 类。

__init__() 类的初始化方法对func、args、name 等参数进行初始化。

apply() 当函数参数已经存在于一个元组或字典中时,间接地调用函数。args 是一个包含将要提供给函数的按位置传递的参数的元组。如果省略了args,任何参数都不会被传递,kwargs 是一个包含关键字参数的字典。

10.3 多进程技术

  10.3.1 mutiprocessing 模块

         multiprocessing 提供了本地和远程的并发性,有效的通过全局解释锁(Global Interceptor Lock, GIL)来使用进程(而不是线程)。。由于GIL 的存在,在CPU 密集型的程序当中,使用多线程并不能有效地利用多核CPU 的优势,因为一个解释器在同一时刻只会有一个线程在执行。

multiprocessing 模块的Process实现了多进程。process.py:

[html] view plain copy
  1. #coding=utf-8  
  2. from timeimport sleep, ctime  
  3. import multiprocessing  
  4.    
  5. def super_player(f,time):  
  6.     for i in range(2):  
  7.         print 'Start playing...%s! %s'%(f,ctime())  
  8.         sleep(time)  
  9.      
  10. dic={"love.mp3":3,"love.rmvb":4,"love.mp4":5}  
  11. process=[]  
  12. files=range(len(dic))  
  13. for f,time indic.items():  
  14.     t=multiprocessing.Process(target=super_player,args=(f,time))  
  15.     process.append(t)  
  16.      
  17. if __name__ =='__main__':  
  18.     for i in files:  
  19.         process[i].start()  
  20.     #keep thread  
  21.     for i in files:  
  22.         process[i].join()  
  23.          
  24.     print 'all end:%s'%ctime()  


multiprocessing.Process 对象来创建一个进程。Process对象与Thread 对象的用法相同,也有start(),run(), join()的方法。multiprocessing.Process(group=None,target=None, name=None, args=(), kwargs={})target 表示调用对象,args 表示调用对象的位置参数元组。kwargs 表示调用对象的字典。Name 为别名。Group 实质上不使用。多程的结果显示是以进程

组为顺序进行显示的。在*nix 上面创建的新的进程使用的是fork

  10.3.2 Pipe 和 Queue

         multiprocessing 包中有Pipe 类和Queue 类来分别支持这两种IPC 机制。Pipe 和Queue 可以用来传送常见的对象。

    (1)Pipe可以是单向(half-duplex),也可以是双向(duplex),通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。一个进程从PIPE 一端输入对象,然后被PIPE 另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。pipe.py: 注:本程序只能在Linux/Unix上运行

[html] view plain copy
  1. #coding=utf-8  
  2. import multiprocessing  
  3. def proc1(pipe):  
  4.    pipe.send('hello')  
  5.    print('proc1 rec:',pipe.recv())  
  6. def proc2(pipe):  
  7.    print('proc2 rec:',pipe.recv())  
  8.    pipe.send('hello, too')  
  9.      
  10. pipe = multiprocessing.Pipe()  
  11. p1 =multiprocessing.Process(target=proc1args=(pipe[0],))  
  12. p2 =multiprocessing.Process(target=proc2args=(pipe[1],))  
  13. p1.start()  
  14. p2.start()  
  15. p1.join()  
  16. p2.join()  

         这里的Pipe 是双向的。Pipe 对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe 的一端(Connection 对象)。我们对Pipe 的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

 

         (2) Queue 与Pipe 相类似,都是先进先出的结构。但Queue 允许多个进程放入,多个进程从队列取出对象。Queue 使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。queue.py: 注:本程序只能在linux/Unix上运行:

[html] view plain copy
  1. #coding=utf-8  
  2. import multiprocessing  
  3. import time,os  
  4. #input worker  
  5. def inputQ(queue):  
  6.    info = str(os.getpid()) + '(put):' + str(time.time())  
  7.    queue.put(info)  
  8.    
  9. #output worker  
  10. def outputQ(queue,lock):  
  11.    info = queue.get()  
  12.    lock.acquire()  
  13.    print (str(os.getpid()) + '(get):' + info)  
  14.    lock.release()  
  15.    
  16. #Main  
  17. record1 = [] # store input processes  
  18. record2 = [] # store outputprocesses  
  19. lock = multiprocessing.Lock()#addlock ,avoid to san luan daying  
  20. queue = multiprocessing.Queue(3)  
  21. #input processes  
  22. for i in range(10):  
  23.    process = multiprocessing.Process(target=inputQ,args=(queue,))  
  24.    process.start()  
  25.    record1.append(process)  
  26.    
  27. #output processes  
  28. for i in range(10):  
  29.    process = multiprocessing.Process(target=outputQ,args=(queue,lock))  
  30.    process.start()  
  31.    record2.append(process)  
  32.      
  33. for p in record1:  
  34.    p.join()  
  35.      
  36. queue.close()  
  37. for p in record2:  
  38. p.join()  
  39.    

10.4 应用于自动化测试

     10.4.1 应用于自动化测试项目

为实现多进程运行测试用例,我需要对文件结构进行调整:Thread\TestProject

/test_project/thread1/baidu_sta.py -----测试用例

    /thread1/__init__.py

          /thread2/youdao_sta.py -----测试用例

 /thread2/__init__.py

               /report/ ----测试报告目录

            /all_tests_pro.py

                   创建了thread1 和thread2 两个文件夹,分别放入了两个测试用例; 下面我们编写

all_tests_process.py 文件来通过多进程来执行测试用例。test_all_pro.py:

[html] view plain copy
  1. #coding=utf-8  
  2. import unittest, time, os,multiprocessing  
  3. import HTMLTestRunner  
  4. def EEEcreatsuit():  
  5.    casedir=[]  
  6.    listaa=os.listdir('\\Users\\ewang\\Desktop\\Python_Selenium2\\Thread\\TestProject')  
  7.    print listaa  
  8.    for xx in listaa:  
  9.         if "thread" in xx:  
  10.            casedir.append(xx)  
  11.      
  12.    suite=[]  
  13.    for n in casedir:  
  14.         testunit=unittest.TestSuite()  
  15.         discover=unittest.defaultTestLoader.discover(n,pattern ='test*.py',top_level_dir=n)  
  16.         print discover  
  17.         for test_suite in discover:  
  18.             for test_case in test_suite:  
  19.                 testunit.addTests(test_case)  
  20.         suite.append(testunit)  
  21.    
  22.    print "===casedir:%r====" %casedir  
  23.    print "+++++++++++++++++++++++++++++++++++++++++++++++"  
  24.    print "!!!suite:%r!!!" %suite  
  25.    return suite,casedir  
  26.    
  27.    
  28. def EEEEEmultiRunCase(suite,casedir):  
  29.    now =time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time()))  
  30.    filename = '.\\report/'+now+'result.html'  
  31.    fp = file(filename, 'wb')  
  32.    proclist=[]  
  33.    s=0  
  34.    for i in suite:  
  35.         runner =HTMLTestRunner.HTMLTestRunner(stream=fp,title=u'测试报告',description=u'用例执行情况:' )  
  36.         proc =multiprocessing.Process(target=runner.run(i),args=(i,))  
  37.         proclist.append(proc)  
  38.         s=s+1  
  39.    for proc in proclist: proc.start()  
  40.    for proc in proclist: proc.join()  
  41.    fp.close()  
  42.    
  43. if __name__ == "__main__":  
  44.    runtmp=EEEcreatsuit()  
  45.   EEEEEmultiRunCase(runtmp[0],runtmp[1])  

定义casedir 数组,读取test_project 目录下的文件夹,找到文件夹的名包含“thread”的文件夹添加到casedir 数组中.

定位suite 数组,for 循环读取casedir数组中的数据(即thread1 和thread2 两个文件夹)。通过discover 分别读取文件夹下匹配test*.py 规则的用例文件,将所有用例文件添加到testunit 测试条件中,再将测试套件追加到定义的suite 数组中。

在整个EEEcreatsuite1()函数中返回suite 和casedir 两个数组的值。

定义proclist()函数,for 循环把suite数组中的用例执行结果写入HTMLTestRunner 测试报告。multiprocessing.Process 创建用例执行的多进程,把创建的多进程追加到proclist 数组中,for 循环proc.start()开始进程活动,proc.join()等待线程终止。

 10.4.2 应用于Grid2分布式测试

         Selenium Grid 只是提供多系统、多浏览器的执行环境,Selenium Grid 本身并不提供并行的执行策略。也就是说我们不可能简单地丢给SeleniumGrid 一个test case 它就能并行地在不同的平台及浏览器下运行。如果您希望利用Selenium Grid 分布式并行的执行测试脚本,那么需要结束Python 多线程技术。

启动Selenium Server

在本机打开两个命令提示符窗口分别:

启动一个hub (默认端口4444),ip 地址为:172.16.10.66

>java -jar selenium-server-standalone-2.39.0.jar -role hubtandalone-2.39.0.jar-role hub

启动一个本地node(节点)

>java -jar selenium-server-standalone-2.39.0.jar -role node -port 5555 -jarselenium-server-stan启动一个远程node(节点),ip 为:172.16.10.34

>java -jar selenium-server-standalone-2.39.0ar -role node -port 5555  -hubhttp://172.16.10.66:4444/grid/register

 fnngj@fnngj-VirtualBox:~/selenium$java -jar selenium-server-standalone-2.39.0ar -role node -po

grid_thread.py

[html] view plain copy
  1. </pre><pre name="code" class="python"><pre name="code" class="python">#coding=utf-8  
  2. from threading import Thread  
  3. from selenium import webdriver  
  4. import time  
  5.    
  6. list ={'http://127.0.0.1:4444/wd/hub':'chrome',  
  7.            'http://127.0.0.1:5555/wd/hub':'internet explorer',  
  8.             'http://172.16.10.34:5555/wd/hub':'firefox'  
  9.             }  
  10.    
  11. def test_baidu(host,browser):  
  12.         print 'start:%s' %time.ctime()  
  13.         print host,browser  
  14.         driver = webdriver.Remote( command_executor=host,  
  15.                                    desired_capabilities={'platform': 'ANY',  
  16.                                                          'browserName':browser,  
  17.                                                          'version': '',  
  18.                                                          'javascriptEnabled': True  
  19.                                   })  
  20.       driver.get('http://www.baidu.com')  
  21.       driver.find_element_by_id("kw").send_keys(browser)  
  22.       driver.find_element_by_id("su").click()  
  23.       sleep(2)  
  24.       driver.close()  
  25.    
  26. threads = []  
  27. files = range(len(list))  
  28. for host,browser in list.items():  
  29.    t = Thread(target=test_baidu,args=(host,browser))  
  30.    threads.append(t)  
  31. if __name__ == '__main__':  
  32.    for i in files:  
  33.         threads[i].start()  
  34.    for i in files:  
  35.         threads[i].join()  
  36. print 'end:%s' %time.ctime()