Python之路:线程池

时间:2023-01-09 18:32:45

版本一

#!/usr/bin/env  python
# --*--coding:utf-8 --*--
import Queue
import threading class ThreadPool(object):
#创建类
    def __init__(self, max_num=20):#进程函数,默认最大20个进程
        self.queue = Queue.Queue(max_num)#生成进程
        for i in xrange(max_num):#循环进程
            self.queue.put(threading.Thread)#上传进程     def get_thread(self):#下载进程函数
        return self.queue.get()     def add_thread(self):#生成进程函数
        self.queue.put(threading.Thread) pool = ThreadPool(10)#执行类,并传默认进程数值 def func(arg, p):#打印进程
    print arg
    import time
    time.sleep(2)#间隔2秒
    p.add_thread() for i in xrange(30):#循环进程
    thread = pool.get_thread()
    t = thread(target=func, args=(i, pool))#传值到func函数,并且执行
    t.start()

版本二

from Queue import Queue
import contextlib
import threading
 
WorkerStop = object()
 
 
class ThreadPool:
 
    workers = 0
 
    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)
 
    def __init__(self, maxthreads=20, name=None):
 
        self.q = Queue(0)
        self.max = maxthreads
        self.name = name
        self.waiters = []
        self.working = []
 
    def start(self):
        while self.workers < min(self.max, self.q.qsize()+len(self.working)):
            self.startAWorker()
 
    def startAWorker(self):
        self.workers += 1
        name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
        newThread = self.threadFactory(target=self._worker, name=name)
        newThread.start()
 
    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)
 
    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)
 
 
    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)
 
    def _worker(self):
        ct = self.currentThread()
        o = self.q.get()
        while o is not WorkerStop:
            with self._workerState(self.working, ct):
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass
 
                    else:
                        pass
 
                del function, args, kwargs
 
                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        #context.call(ctx, log.err)
                        pass
 
                del onResult, result
 
            with self._workerState(self.waiters, ct):
                o = self.q.get()
 
    def stop(self):
        while self.workers:
            self.q.put(WorkerStop)
            self.workers -= 1
 
 
"""
def show(arg):
    import time
    time.sleep(1)
    print arg
 
 
pool = ThreadPool(20)
 
for i in range(500):
    pool.callInThread(show, i)
 
pool.start()
pool.stop()
"""