可以重复利用的线程
直接上代码
from threading import Thread, current_thread from queue import Queue # 重写线程类 class MyThread(Thread): def __init__(self): super().__init__() self.daemon = True # 守护线程 self.queue = Queue(10) self.start() # 实例化的时候开启线程 def run(self): # 子线程只有这一个线程, 从队列里面拿任务 while True: task, args, kwargs = self.queue.get() # 拿任务 也是元组 task(*args, **kwargs) # 可能有,可能没有,所有传入不定长参数 self.queue.task_done() # 结束任务 def apply_async(self, func, args=(), kwargs={}): # 自写任务,不是重写任务, 充当生产者, 给线程提供任务(把任务扔到队列) self.queue.put((func, args, kwargs)) def join_R(self): # 主线程等待子线程结束 self.queue.join() # task_done 为0 的时候就阻塞 def func(): print(1, current_thread()) def func2(*args, **kwargs): print(2, current_thread()) print('func: ', args, kwargs) t = MyThread() t.apply_async(func) t.apply_async(func2, args=(1,2), kwargs={'a':1, 'b':2}) print("任务提交完成") t.join_R() print("任务完成")
结果:
任务提交完成 1 <MyThread(Thread-1, started daemon -1223214272)> 2 <MyThread(Thread-1, started daemon -1223214272)> func: (1, 2) {'a': 1, 'b': 2} 任务完成 任务完成后,主线程就开始退出, 因此守护线程被杀死
线程池的简单实现
池的概念
主线程: 相当于生产者,只管向线程池提交任务。
并不关心线程池是如何执行任务的。
因此,并不关心是哪一个线程执行的这个任务。
线程池: 相当于消费者,负责接收任务,
并将任务分配到一个空闲的线程中去执行。
代码实现如下:
from threading import Thread, current_thread from queue import Queue class T_pool: def __init__(self, n): # 准备多少个池 super().__init__() self.queue = Queue() for i in range(n): # 在池里开多少个线程 Thread(target=self.fun, daemon=Thread).start() # 守护进程 并启动 def fun(self): # 生产者 while True: task = self.queue.get() task() self.queue.task_done() def apply_async(self, task): # 消费者 self.queue.put(task) def join(self): self.queue.join() def func(): print(current_thread()) def func2(): print(current_thread()) p = T_pool(2) p.apply_async(func) p.apply_async(func2) p.join()
结果:
<Thread(Thread-1, started daemon -1223324864)> <Thread(Thread-1, started daemon -1223324864)>
Python自带的池
内置线程池
from multiprocessing.pool import ThreadPool # 线程池 from multiprocessing import pool # 进程池 # 内置线程池 def fun(*args, **kwargs): print(args, kwargs) p = ThreadPool(2) # 直接使用内置的 p.apply_async(fun, args=(1,2), kwds={'a':1}) p.close() # 要求:在join前必须要close,这样就不允许再提交任务了 p.join()
结果:
(1, 2) {'a': 1}
内置进程池
from multiprocessing import Pool # 进程池 # 内置进程池 def fun(*args, **kwargs): print(args, kwargs) if __name__ == '__main__': # 必须要有一个main测试 p = Pool(2) # pool的实例化必须在main测试之下 p.apply_async(fun, args=(1,2), kwds={'a':1}) p.close() # 要求:在join前必须要close,这样就不允许再提交任务了 p.join()
结果:
(1, 2) {'a': 1}
池的其他操作
操作一: close - 关闭提交通道,不允许再提交任务
操作二: terminate - 中止进程池,中止所有任务
操作三: 结果操作
结果操作
from multiprocessing.pool import ThreadPool import time def func(n): if n == 1: return 1 elif n == 2: return 2 return func(n-1) + func(n-2) pool = ThreadPool() a_result = pool.apply_async(func, args=(35,)) print("note1:",time.asctime(time.localtime(time.time()))) result = a_result.get() # 会阻塞,知道结果产生了 print("note2:",time.asctime(time.localtime(time.time())))
结果:
note1: Mon Sep 17 00:07:31 2018
note2: Mon Sep 17 00:07:34 2018
使用池来实现并发服务器
使用线程池来实现并发服务器
import socket from multiprocessing.pool import ThreadPool # 线程池 from multiprocessing import Pool, cpu_count ''' 使用线程池来实现 并发服务器 ''' print(cpu_count()) server = socket.socket() server.bind(('0.0.0.0', 8080)) server.listen(1000) def work_thread(conn): while True: data = conn.recv(1000) if data: print(data) conn.send(data) else: conn.close() break if __name__ == '__main__': t_pool = ThreadPool(5) # 使用线程池, 通常分配2倍的cpu个数 while True: conn,addr = server.accept() t_pool.apply_async(work_thread, args=(conn,)) # 接收的是个任务, conn做为参数
使用进程池来实现并发服务器
import socket from multiprocessing.pool import ThreadPool # 线程池 from multiprocessing import Pool, cpu_count ''' 使用进程池来实现 并发服务器 ''' print(cpu_count()) server = socket.socket() server.bind(('0.0.0.0', 9000)) server.listen(1000) def work_process(server): t_pool = ThreadPool(cpu_count()*2) # 使用线程池, 通常分配2倍的cpu个数 while True: conn,addr = server.accept() t_pool.apply_async(work_thread, args=(conn,)) # 接收的是个任务, conn做为参数 def work_thread(conn): while True: data = conn.recv(1000) if data: print(data) conn.send(data) else: conn.close() break n = cpu_count() # 获取当前计算机的CPU核心数量 p = Pool(n) for i in range(n): # 充分利用CPU, 为每个CPU分配一个进程 p.apply_async(work_process, args=(server,)) p.close() p.join()
客户端:
import socket click = socket.socket() click.connect(('127.0.0.1', 8888)) while True: data = input("请输入你要发送的数据:") click.send(data.encode()) print("接收到的消息: {}".format(click.recv(1024).decode()))
总结完毕。
作者:含笑半步颠√
博客链接:https://www.cnblogs.com/lixy-88428977
声明:本文为博主学习感悟总结,水平有限,如果不当,欢迎指正。如果您认为还不错,欢迎转载。转载与引用请注明作者及出处。