python第十一天-----补:线程池

时间:2021-11-26 09:13:53

低版本:

 #!/usr/bin/env python
import threading
import time
import queue class TreadPool:
"""
将线程加入到队列中作为资源去完成任务
优点:简单好写容易理解
缺点:太尼玛多了.....
"""
def __init__(self, maxsize):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread) def get_thread(self):
return self._q.get() def add_thread(self):
self._q.put(threading.Thread) def task(arg, p):
print(arg)
time.sleep(1)
p.add_thread() pool = TreadPool(5) for i in range(100):
t = pool.get_thread()
obj = t(target=task, args=(i, pool))
obj.start()

高级版本:

 #!/usr/bin/env python
# -*- coding:utf-8 -*- import queue # 队列模块
import threading # 线程模块
import contextlib # 上下文模块
import time # 时间模块 StopEvent = object() # 创建一个停止时所需要用到的对象 class ThreadPool(object):
"""
线程池(用于放置任务,将任务作为队列中元素让线程去取得,可以复用线程减少开销)
"""
def __init__(self, max_num, max_task_num=None):
"""
构造方法
:param max_num:
:param max_task_num:所创建的队列内最大支持的任务个数
"""
if max_task_num:
self.q = queue.Queue(max_task_num) # 指定队列任务数量则创建有限队列
else:
self.q = queue.Queue() # 未指定队列任务数量则创建无限队列
self.max_num = max_num # 每次使用的最大线程个数
self.cancel = False # 任务取消,默认False,用于线程停止的判断
self.terminal = False # 任务终止,默认False,用于线程池终止的判断
self.generate_list = [] # 定义一个已生成任务列表
self.free_list = [] # 定义一个空闲任务列表 def run(self, func, args, callback=None):
"""
线程池执行一个任务方法
:param func: 传递进来的任务函数
:param args: 任务函数使用的参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
1、任务函数执行状态;
2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel: # 如果条件为真则不会继续执行
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread() # 如果现有空闲列表无元素并且已生成任务列表内
# 元素个数小于队列支持的最大数量则创建一个线程
w = (func, args, callback,) # 具体任务
self.q.put(w) # 将任务放入队列当中 def generate_thread(self):
"""
创建一个线程方法
"""
t = threading.Thread(target=self.call) # 调用线程类创建一个线程,参数传递self.call方法
t.start() # 线程开始 def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread() # 创建当前任务
self.generate_list.append(current_thread) # 生成任务列表添加当前任务 event = self.q.get() # 事件获取
while event != StopEvent: # 当前事件不是停止时执行 func, arguments, callback = event # 任务具体函数,参数获取
try:
result = func(*arguments) # 结果为任务处理的出的结果
success = True # 任务处理成功
except Exception as e:
success = False # 任务处理失败
result = None # 结果为None if callback is not None: # 回调不为空
try:
callback(success, result) # 将刚才执行结果返回
except Exception as e:
pass with self.worker_state(self.free_list, current_thread):
if self.terminal: # 如果线程池已经被终止
event = StopEvent # 事件变为空任务
else:
event = self.q.get() # 事件为正常任务
else: self.generate_list.remove(current_thread) # 生成任务列表移除当前任务 def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True # 线程停止,判定条件变为真
full_size = len(self.generate_list) # 获取还有几个在执行任务的线程
while full_size: # 向队列中添加相应个数的空任务
self.q.put(StopEvent)
full_size -= 1 def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True # 线程池关闭,判定条件变为真 while self.generate_list: # 当还有线程存在时放置空任务
self.q.put(StopEvent) self.q.queue.clear() # 将队列中所有任务清空 @contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread) # 等待状态列表中添加正在等待的线程数
try:
yield
finally:
state_list.remove(worker_thread) # 移除正在等待的线程数 # How to use pool = ThreadPool(5) # 创建一个每次支持5线程的线程池 def callback(status, result):
# status, execute action status
# result, execute action return value
pass def action(i): # 任务函数
print(i) for i in range(30): # 使用线程池执行30次任务
ret = pool.run(action, (i,), callback) time.sleep(1) # 1秒等待
print(len(pool.generate_list), len(pool.free_list)) # 打印线程池内当前任务个数及空任务个数
pool.close() # 线程停止
pool.terminate() # 线程池终止