python线程池实现

时间:2022-04-07 18:31:03

python 的线程池主要有threadpool,不过它并不是内置的库,每次使用都需要安装,而且使用起来也不是那么好用,所以自己写了一个线程池实现,每次需要使用直接import即可。其中还可以根据传入的特征量handlerkey来获取每个任务的结果。

#!/bin/env python
#
-*- coding:utf-8 -*-

"""
@lx
created on 2016-04-14
"""

import Queue
import sys
import threading
import time
import StringIO
import traceback

reload(sys)
sys.setdefaultencoding(
"utf8")


class MyThread(threading.Thread):
"""Background thread connected to the requests/results queues."""
def __init__(self, workQueue, resultQueue, timeout=0.1, **kwds):
threading.Thread.
__init__(self, **kwds)
self.setDaemon(True)
self._workQueue
= workQueue
self._resultQueue
= resultQueue
self._timeout
= timeout
self._dismissed
= threading.Event()
self.start()

def run(self):
"""Repeatedly process the job queue until told to exit."""
while True:
if self._dismissed.isSet():
break

handlerKey
= None # unique key
code = 0 # callback return code
handlerRet = None
errMsg
= ""

try:
callable, args, kwds
= self._workQueue.get(True, self._timeout)
except Queue.Empty:
continue
except:
exceptMsg
= StringIO.StringIO()
traceback.print_exc(file
=exceptMsg)
errMsg
= exceptMsg.getvalue()
code
= 3301 # system error
self._resultQueue.put(
(handlerKey, code, (callable, args, kwds), errMsg))
break

if self._dismissed.isSet():
self._workQueue.put((callable, args, kwds))
break

try:
if "handlerKey" in kwds:
handlerKey
= kwds["handlerKey"]
handlerRet
= callable(*args, **kwds) # block
self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
except:
exceptMsg
= StringIO.StringIO()
traceback.print_exc(file
=exceptMsg)
errMsg
= exceptMsg.getvalue()
code
= 3303
self._resultQueue.put((handlerKey, code, handlerRet, errMsg))

def dismiss(self):
"""Sets a flag to tell the thread to exit when done with current job."""
self._dismissed.set()


class ThreadPool(object):
def __init__(self, workerNums=3, timeout=0.1):
self._workerNums
= workerNums
self._timeout
= timeout
self._workQueue
= Queue.Queue() # no maximum
self._resultQueue = Queue.Queue()
self.workers
= []
self.dismissedWorkers
= []
self._createWorkers(self._workerNums)

def _createWorkers(self, workerNums):
"""Add num_workers worker threads to the pool."""
for i in range(workerNums):
worker
= MyThread(self._workQueue, self._resultQueue,
timeout
=self._timeout)
self.workers.append(worker)

def _dismissWorkers(self, workerNums, _join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismissList
= []
for i in range(min(workerNums, len(self.workers))):
worker
= self.workers.pop()
worker.dismiss()
dismissList.append(worker)

if _join:
for worker in dismissList:
worker.join()
else:
self.dismissedWorkers.extend(dismissList)

def _joinAllDissmissedWorkers(self):
"""
Perform Thread.join() on all
worker threads that have been dismissed.
"""
for worker in self.dismissedWorkers:
worker.join()
self.dismissedWorkers
= []

def addJob(self, callable, *args, **kwds):
self._workQueue.put((callable, args, kwds))

def getResult(self, block=False, timeout=0.1):
try:
item
= self._resultQueue.get(block, timeout)
return item
except Queue.Empty, e:
return None
except:
raise

def waitForComplete(self, timeout=0.1):
"""
Last function. To dismiss all worker threads. Delete ThreadPool.
:param timeout
"""
while True:
workerNums
= self._workQueue.qsize() # 释放掉所有线程
runWorkers = len(self.workers)

if 0 == workerNums:
time.sleep(timeout)
# waiting for thread to do job
self._dismissWorkers(runWorkers)
break
# if workerNums < runWorkers: # 不能这样子乱取消
# self._dismissWorkers(runWorkers - workerNums)
time.sleep(timeout)
self._joinAllDissmissedWorkers()


if "__main__" == __name__:
test1
= """
def doSomething(*args, **kwds):
if "sleep" in kwds:
sleep = kwds["sleep"]
msgTxt = "sleep %fs.." % sleep
time.sleep(sleep)
return msgTxt

for i in range(10):
print doSomething(sleep=0.1, handlerKey="key-%d"%i)

wm = ThreadPool(10)
for i in range(10):
wm.addJob(doSomething, sleep=1, handlerKey="key-%d"%i)
wm.waitForComplete()
for i in range(10):
print wm.getResult()
del wm
"""
# test2 = """

def doSomething_(*args, **kwds):
sleep
= int(args[0])
msgTxt
= "sleep %ds.." % sleep
time.sleep(sleep)
return msgTxt


wm
= ThreadPool(10)
result
= []
for i in range(10):
data
= 5
wm.addJob(doSomething_, data)

while 1:
res
= wm.getResult()
if res:
result.append(res)
if 10 == len(result):
break
print "sleep 0.1"
time.sleep(
0.1)
print time.time()
wm.waitForComplete()
print time.time()
# """

 

原创文章,转载请备注原文地址 http://www.cnblogs.com/lxmhhy/p/6032924.html

知识交流讨论请加qq群:180214441。谢谢合作