python(进程池/线程池)

时间:2023-11-09 18:54:56

进程池

import multiprocessing
import time def do_calculation(data):
print(multiprocessing.current_process().name + " " + str(data))
time.sleep(3)
return data * 2 def start_process():
print ('Starting', multiprocessing.current_process().name) if __name__ == '__main__':
inputs = list(range(10))
print ('Input :' + str(inputs)) pool_size = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=pool_size,
initializer=start_process,
)

#more inputs
    #more_inputs = [11,12,13,14,15,16,17,18,19,20]
    #pool_outputs = pool.map(do_calculation, more_inputs) pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks print ('Pool :' + str(pool_outputs))

运行如下:

root # python pool.py
Input :[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
('Starting', 'PoolWorker-1')
PoolWorker-1 0
('Starting', 'PoolWorker-2')
PoolWorker-2 2
PoolWorker-1 1
PoolWorker-2 3
PoolWorker-1 4
PoolWorker-2 6
PoolWorker-1 5
PoolWorker-2 7
PoolWorker-1 8
PoolWorker-1 9
Pool :[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

如果任务是动态添加可以用如下代码

import multiprocessing
import time def worker(data):
if data == 3:
time.sleep(10)
else:
time.sleep(1)
print(multiprocessing.current_process().name + " " + str(data))
return data * 2 def start_process():
print ('Starting', multiprocessing.current_process().name) #callback func
def say(res):
print res if __name__ == '__main__':
inputs = list(range(10))
print ('Input :' + str(inputs)) pool_size = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=pool_size,
initializer=start_process,
) for i in inputs:
pool.apply_async(worker, (i,))
#pool.apply_async(worker, (i,), callback=say) pool.close() # no more tasks
pool.join() # wrap up current tasks

输出如下:

root # python pool2.py
Input :[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
('Starting', 'PoolWorker-1')
('Starting', 'PoolWorker-2')
PoolWorker-1 0
PoolWorker-2 1
PoolWorker-1 2
PoolWorker-1 4
PoolWorker-1 5
PoolWorker-1 6
PoolWorker-1 7
PoolWorker-1 8
PoolWorker-1 9
PoolWorker-2 3

线程池

import Queue
import sys
import threading
import time thread_count = 2
mutex = threading.Lock() class MyThread(threading.Thread):
def __init__(self, workQueue, resultQueue,timeout=0, **kwargs):
threading.Thread.__init__(self, kwargs=kwargs)
self.timeout = 1
self.setDaemon(True)
self.workQueue = workQueue
self.resultQueue = resultQueue
self.start() def run(self):
while True:
try:
callable, args, kwargs = self.workQueue.get(timeout=self.timeout)
res = callable(args, self.getName())
self.resultQueue.put(res) except Queue.Empty:
break
except :
print sys.exc_info()
raise class ThreadPool:
def __init__( self, num_of_threads=10):
self.workQueue = Queue.Queue()
self.resultQueue = Queue.Queue()
self.threads = []
self.__createThreadPool( num_of_threads ) def __createThreadPool( self, num_of_threads ):
for i in range( num_of_threads ):
thread = MyThread( self.workQueue, self.resultQueue )
self.threads.append(thread) def wait_for_complete(self):
while len(self.threads):
thread = self.threads.pop()
if thread.isAlive():
thread.join() def add_job( self, callable, args, **kwargs ):
while True:
if self.workQueue.qsize() < 10000:
self.workQueue.put( (callable,args,kwargs) )
break
time.sleep(0.1) def worker(data, threadid): if mutex.acquire(1):
print threadid, data
mutex.release() time.sleep(3)
return data * 2 if __name__ == '__main__': threadPool = ThreadPool(thread_count)
for i in range(10):
threadPool.add_job(worker, i) threadPool.wait_for_complete()
print 'result Queue\'s length == %d '% threadPool.resultQueue.qsize()
while threadPool.resultQueue.qsize():
print threadPool.resultQueue.get()
print 'end testing'

运行如下:

root # python g.py
Thread-1 0
Thread-2 1
Thread-1 2
Thread-2 3
Thread-1 4
Thread-2 5
Thread-1 6
Thread-2 7
Thread-1 8
Thread-2 9
result Queue's length == 10
0
2
4
6
8
10
12
14
16
18
end testing

简单测试了一下,如果worker函数需要做大量耗cpu的运算,用进程池速度比线程池快数倍。

2.492s VS 0.598s