python多进程与多线程
1 ############################## 2 #多进程 3 ############################## 4 #import os 5 ''' 6 print('1Process start, pid:',os.getpid()) 7 print('Process start, ppid:',os.getppid()) 8 ''' 9 ''' 10 #unix环境支持 11 pid = os.fork() 12 if pid == 0: 13 print('child process,id:', os.getpid(), 'parent id is:', os.getppid()) 14 else: 15 print('pid:', os.getpid(), 'create child process:', pid) 16 ''' 17 18 ################# 19 #multiprocessing 20 ################# 21 ''' 22 from multiprocessing import Process 23 print('2Process start, pid:',os.getpid()) 24 def run_proc(name): 25 # while(1): 26 # pass 27 print('run child process %s (%s)...' %(name, os.getpid())) 28 29 print('4Process start, pid:',os.getpid()) 30 if __name__ == '__main__': 31 print('1Parent process %s.' % os.getpid()) 32 p = Process(target=run_proc, args=('test',)) 33 print('Child process will start.', os.getpid()) 34 #启动子进程 35 p.start() 36 print('3Process start, pid:',os.getpid()) 37 #等待子进程结束, 类似于wait() 38 p.join() 39 print('Child process end.', os.getpid()) 40 ''' 41 42 #由上面这个例子可以看出,python的代码是顺序执行的。 43 #执行p.start()方法后,开始执行子进程 44 #子进程的执行范围是2、4之间的代码行 45 #p.join()等待子进程返回 46 #单纯从打印的顺序,无法准确的判断父子进程的执行顺序 47 48 49 ''' 50 from multiprocessing import Pool 51 import os, time, random 52 def long_time_task(name): 53 print('Run task %s (%s)...' % (name, os.getpid())) 54 start = time.time() 55 time.sleep(random.random()*3) 56 end = time.time() 57 print('Task %s runs %0.2f seconds.' % (name, (end - start))) 58 59 if __name__=='__main__': 60 print('Parent process %s.' % os.getpid()) 61 p = Pool(5)#设置最大线程格个数,默认值是cpu核心数 62 for i in range(5): 63 p.apply_async(long_time_task, args=(i,))#创建线程 64 print('Waiting for all subprocesses done...') 65 p.close()#防止将任何其他任务提交到池中。完成所有任务后,工作进程将退出。 66 p.join()#调用前,必须调用close() or terminate() 67 print('All subprocess done.') 68 ''' 69 #win7下cmd输出内容如下 70 ''' 71 Parent process 25380. 72 Waiting for all subprocesses done... 73 Run task 0 (26008)... 74 Run task 1 (26216)... 75 Run task 2 (28108)... 76 Run task 3 (24732)... 77 Run task 4 (25008)... 78 Task 3 runs 0.15 seconds. 79 Task 4 runs 1.79 seconds. 80 Task 2 runs 2.45 seconds. 81 Task 0 runs 2.69 seconds. 82 Task 1 runs 2.98 seconds. 83 All subprocess done. 84 请按任意键继续. . . 85 ''' 86 87 ########### 88 #子进程 89 ########### 90 ''' 91 import subprocess #类似于exec()? 92 93 print ('$ nslookup www.python.org') 94 r = subprocess.call(['nslookup', 'www.python.org']) 95 print('Exit code:', r) 96 ''' 97 ''' win7输出 98 $ nslookup www.python.org 99 服务器: clou-ad.szclou.com 100 Address: 10.98.94.5 101 102 非权威应答: 103 名称: dualstack.python.map.fastly.net 104 Addresses: 2a04:4e42:36::223 105 151.101.228.223 106 Aliases: www.python.org 107 108 Exit code: 0 109 请按任意键继续. . . 110 ''' 111 ''' 112 import subprocess 113 print('$ nslookup') 114 p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 115 output, err = p.communicate(b'set q=mx\npython.org\nexit\n') 116 s='utf-8'#unix 117 s1='gbk'#win7系统 118 print(output.decode(s)) 119 print('Exit code:', p.returncode) 120 ''' 121 #相当于命令行下执行nslookup,然后输入 122 #set q=mx 123 #python.org 124 #exit 125 126 '''win7输出 127 $ nslookup 128 默认服务器: clou-ad.szclou.com 129 Address: 10.98.94.5 130 131 > > 服务器: clou-ad.szclou.com 132 Address: 10.98.94.5 133 134 python.org MX preference = 50, mail exchanger = mail.python.org 135 136 mail.python.org internet address = 188.166.95.178 137 mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1 138 > 139 Exit code: 0 140 请按任意键继续. . . 141 ''' 142 143 ########### 144 #进程间通信 145 ########### 146 ''' 147 from multiprocessing import Process, Queue 148 import os, time, random 149 150 def write(q): 151 print('Process to write: %s' % os.getpid()) 152 for value in ['A', 'B', 'C']: 153 print('put %s to queue.' % value) 154 q.put(value) 155 time.sleep(random.random()) 156 157 def read(q): 158 print('Process to read: %s' % os.getpid()) 159 while True: 160 value = q.get(True) 161 print('Get %s from queue.' % value) 162 163 if __name__=='__main__': 164 q=Queue() 165 pw = Process(target=write, args=(q,)) 166 pr = Process(target=read, args=(q,)) 167 168 pw.start() 169 pr.start() 170 pw.join() 171 pr.terminate() 172 ''' 173 ''' 174 Process to write: 28536 175 Process to read: 27968 176 put A to queue. 177 Get A from queue. 178 put B to queue. 179 Get B from queue. 180 put C to queue. 181 Get C from queue. 182 请按任意键继续. . . 183 ''' 184 185 ############################## 186 #多线程:python的多线程是真正的Posix Thread,而不是模拟出来的(linux) 187 ############################## 188 189 #Python的标准库提供了两个模块:_thread和threading,_thread是低级模块, 190 #threading是高级模块,对_thread进行了封装。绝大多数情况下, 191 #我们只需要使用threading这个高级模块。 192 ''' 193 import time, threading 194 195 print('thread %s is running...' % threading.current_thread().name) 196 197 def loop(): 198 print('thread %s is running...' % threading.current_thread().name) 199 n = 0 200 while n < 5: 201 n = n + 1 202 print('thread %s >>> %s' % (threading.current_thread().name, n)) 203 time.sleep(1) 204 print('thread %s ended.' % threading.current_thread().name) 205 206 207 print('print something') 208 t=threading.Thread(target=loop, name='LoopThread') 209 t.start() 210 t.join() 211 print('thread %s ended.' % threading.current_thread().name) 212 ''' 213 ''' 214 thread MainThread is running... 215 thread LoopThread is running... 216 thread LoopThread >>> 1 217 thread LoopThread >>> 2 218 thread LoopThread >>> 3 219 thread LoopThread >>> 4 220 thread LoopThread >>> 5 221 thread LoopThread ended. 222 thread MainThread ended. 223 请按任意键继续. . . 224 ''' 225 ''' 226 任何进程默认启动一个进程,名字是MainThread。 227 threading.current_thread()返回当前线程的实例 228 线程创建时,如果不指定名称,默认是Thread-1,....... 229 ''' 230 #定义锁 231 ''' 232 lock = threading.Lock() 233 lock.acquire() 234 lock.release() 235 ''' 236 237 import time, threading, multiprocessing 238 ''' 239 def loop(): 240 x = 0 241 while True: 242 x = x ^ 1 243 244 print('cpu:%s' % multiprocessing.cpu_count()) 245 for i in range(multiprocessing.cpu_count()): 246 t = threading.Thread(target=loop) 247 t.start() 248 ''' 249 250 251 ########### 252 #ThreadLocal:管理线程数据的全局变量 253 ########### 254 ''' 255 import threading 256 #全局变量local_school,一个ThreadLocal对象 257 #但每个属性,如local_school.student都是线程的局部变量,不会相互干扰 258 259 #ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求, 260 #用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。 261 local_school = threading.local() 262 263 def process_student(): 264 std = local_school.student 265 print('Hello, %s (in %s)' % (std, threading.current_thread().name)) 266 267 def process_thread(name): 268 local_school.student = name 269 process_student() 270 271 t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A') 272 t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B') 273 t1.start() 274 t2.start() 275 t1.join() 276 t2.join() 277 ''' 278 279 ''' 280 计算密集型:cpu利用率高,C语言的运行效率最高,脚本语言效率低 281 IO密集型:脚本语言的开发效率高,c语言最差 282 协程:单线程的一步编程模型。有了协程的支持,就可以基于事件驱动编写高效的多任务程序 283 ''' 284 285 286 ########### 287 #分布式进程 288 ###########
分布式进程示例:
1 #task_master.py 2 import random, time, queue 3 from multiprocessing.managers import BaseManager 4 #发送任务队列 5 task_queue = queue.Queue() 6 #接收结果队列 7 result_queue = queue.Queue() 8 9 #从BaseManager继承的QueueManager 10 class QueueManager(BaseManager): 11 pass 12 13 # 把两个Queue都注册到网络上, callable参数关联了Queue对象: 14 QueueManager.register('get_task_queue', callable=lambda:task_queue) 15 QueueManager.register('get_result_queue', callable=lambda:result_queue) 16 17 #创建实例对象 18 #绑定端口5000, 设置验证码 'abc' 19 manager = QueueManager(address=('', 5000), authkey=b'abc') 20 manager.start() 21 #获得通过网络访问的Queue对象 22 task = manager.get_task_queue() 23 result = manager.get_result_queue() 24 25 time.sleep(10) 26 #队列中添加任务 27 for i in range(10): 28 n = random.randint(0, 10000) 29 print('Put task %d...' % n) 30 task.put(n) 31 #读取结果 32 print('Try get results...') 33 for i in range(10): 34 r = result.get(timeout=10) 35 print('Result:%s' % r) 36 37 manager.shutdown() 38 print('master exit.')
1 #task_worker.py 2 import queue, time, sys 3 from multiprocessing.managers import BaseManager 4 5 class QueueManager(BaseManager): 6 pass 7 8 QueueManager.register('get_task_queue') 9 QueueManager.register('get_result_queue') 10 11 server_addr = '127.0.0.1' 12 print('Connect to server %s...' % server_addr) 13 m = QueueManager(address=(server_addr, 5000), authkey=b'abc') 14 m.connect() 15 task = m.get_task_queue() 16 result = m.get_result_queue() 17 18 ''' 19 for i in range(10): 20 try: 21 n = task.get(timeout=1) 22 print('run task %d * %d...' % (n, n)) 23 r = '%d * %d = %d' % (n, n, n*n) 24 time.sleep(1) 25 result.put(r) 26 except queue.Empty: 27 print('task queue is empty.') 28 ''' 29 t1 = time.time() 30 while 1: 31 # if not isinstance(task, queue.Queue()): 32 # raise ValueError('invalid Queue:%s' % task) 33 try: 34 if task.empty(): 35 time.sleep(1) 36 print('queue is empty') 37 continue 38 n = task.get(timeout=1) 39 print('run task %d * %d...' % (n, n)) 40 r = '%d * %d = %d' % (n, n, n*n) 41 result.put(r) 42 time.sleep(1) 43 if time.time() - t1 > 60: 44 print('over 60s') 45 break 46 except BaseException as f: 47 print('Error:', f) 48 break; 49 50 print('worker exit.')
可以多次执行work文件,创建多个进程,各个work进程中处理的数据之和是master进程中的数据,且各work处理的数据不会重复。
目前只能在Ubuntu12.04上执行,win7下可能由于权限问题,无法正常执行。