本文实例讲述了Python实现的多进程和多线程功能。分享给大家供大家参考,具体如下:
听了朋友说起,他们目前开发的测试框架,用python实现的分布式系统。虽然python的执行效率没有c和c++那么高,但是依靠集群的力量,产生的压力很是牛逼啊。
了解了下大概的方式就是
1、有台主控机,负责调度,比如执行的参数等
2、有n多台执行机,每个执行机上部署一个python的xmlRPC server,主控机调用rpccall,然后执行机执行。rpccall里面会fork一些进程,每个进程再创建一些线程,去调用测试方法。这样,扩展性就很好了。
对于python的rpc call,之前也没有接触过,不是很了解,google了一下,发现很简单,拿了个网上的例子,如下,先部署一个rpcServer
1
2
3
4
5
|
from SimpleXMLRPCServer import SimpleXMLRPCServer
def add(a , b):
return a + bserver = SimpleXMLRPCServer(( "10.249.192.38" , 8000 )) #这里不要用localhost,否则只有本机才能访问
server.register_function(add)
server.serve_forever()
|
客户端:
1
2
3
4
5
|
from xmlrpclib import Server
Proxyserver = ServerProxy( "http://localhost:8000" )
try : ret = server.add( 30 , 90 ) print 'result:' , ret print 'result type:' , type (ret)
except
Exception, e: print "exception" ,e
|
其实还是很简单的。
然后再看了下python的多进程和多线程的方法,写了个例子,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
#encoding=utf-8
import sys
import os
import time
import pdb
import httplib
import thread
import threading
constant_p = 0 #创建全局变量,进程数
constant_s = 0 #创建全局变量,线程数
mutex_g = threading.RLock() #创建全局锁
def run(count): #该函数创建3个线程,同时调用3个不同的函数
a = 1
b = 0
thread.start_new_thread(test0,(a,b)) #这里传入的参数需要以元组的形式,跟void指针其实也差不多
thread.start_new_thread(test1,(a,b))
thread.start_new_thread(test2,(a,b))
def test0(a,b):
global mutex_g
global constant_s
threadid = thread.get_ident()
mutex_g.acquire() #这里需要把线程数说锁起来,否则结果会被修改
constant_s = constant_s + 1
mutex_g.release()
print "thread 0 called,and the threadid is:%d" % (threadid)
sys.exit( 0 )
def test1(a,b):
global mutex_g
global constant_s
threadid = thread.get_ident()
mutex_g.acquire()
constant_s = constant_s + 1
mutex_g.release()
print "thread 1 called,and the threadid is:%d" % (threadid)
sys.exit( 0 )
def test2(a,b):
global mutex_g
global constant_s
threadid = thread.get_ident()
mutex_g.acquire()
constant_s = constant_s + 1
mutex_g.release()
print "thread 2 called,and the threadid is:%d" % (threadid)
sys.exit( 0 )
def my_fork():
global constant_p
pid = os.fork() #fork一个子进程,子进程的pid=0同时2个进程会执行my_fork()函数
if (pid = = 0 ): #子进程执行到这个if里面
constant_p = constant_s + 1
run( 3 )
time.sleep( 5 )
print "total thread is %d" % constant_s #这个结果是3,因为子进程创建按了3个线程
elif (pid > 0 ): #父进程执行到这个if里面
constant_p = constant_s + 1 run( 4 )
time.sleep( 5 )
print "total thread is %d" % constant_s #这个结果也是3,因为该进程也创建了3个线程
else :
print "fork error"
sys.exit( 0 )
print "total process is %d" % constant_p #该结果是1,因为2个进程只执行了一次
constant_p = constant_s + 1
sys.exit( 0 )
if __name__ = = "__main__" :
my_fork()
#my_fork()
#my_fork()
|
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://blog.csdn.net/frankwangzy1103/article/details/6218621