12 并发编程-(线程)-线程queue&进程池与线程池

时间:2021-10-24 07:56:37
queue 英 /kjuː/  美 /kju/ 队列

1、class queue.Queue(maxsize=0) #队列:先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third') print(q.get())
print(q.get())
print(q.get()) '''
结果(先进先出):
first
second
third
'''

2、class queue.LifoQueue(maxsize=0) #堆栈:last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third') print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

3、class queue.PriorityQueue(maxsize=0) #优先级队列:存储数据时可设置优先级的队列

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c')) print(q.get())
print(q.get())
print(q.get()) '''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

二、进程池与线程池

1、基本的概念:

在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:

服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,

于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,

例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制

官网:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
1、submit(fn, *args, **kwargs)
异步提交任务 2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作 取代for + submit 3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前 4、result(timeout=None)
取得结果 5、add_done_callback(fn)
回调函数

2.1、没有shutdown(wait=True)

import os,time,random
def task(name):
print(f"{name} {os.getpid()} run")
time.sleep(random.randint(1,3))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 进程池的容量设定,
for i in range(10):
pool.submit(task,'alex %s'%i) #pool.shutdown(waite= True)
print('主')

alex 0 9412 run
alex 1 9624 run
alex 2 9904 run
alex 3 1452 run
alex 4 9904 run
alex 5 9412 run
alex 6 1452 run
alex 7 9624 run
alex 8 9624 run
alex 9 9904 run

2.2、有shutdown(wait=True)

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# 为什么建池 :我们必须对服务器开启的进程数或者线程数加以控制,让机器在一个自己可以承受的范围内运行
# 这就是进程池或线程池的作用
import os,time,random
def task(name):
print(f"{name} pid:{os.getpid()} run")
time.sleep(random.randint(1,3))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 进程池的容量设定,
for i in range(10):
pool.submit(task,'alex %s'%i) pool.shutdown(waite= True)#等待池内所有任务执行完毕回收完资源后才继续
print('主') alex 0 pid:10228 run
alex 1 pid:9584 run
alex 2 pid:7768 run
alex 3 pid:9464 run
alex 4 pid:9584 run
alex 5 pid:10228 run
alex 6 pid:7768 run
alex 7 pid:9464 run
alex 8 pid:9584 run
alex 9 pid:7768 run

2.3、线程池的用法

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# 为什么建池 :我们必须对服务器开启的进程数或者线程数加以控制,让机器在一个自己可以承受的范围内运行
# 这就是进程池或线程池的作用
import os,time,random
from threading import currentThread
def task(name):
print(f"{name} 线程:{currentThread().getName()} pid:{os.getpid()} run")
time.sleep(random.randint(1,3))
if __name__ == '__main__':
pool = ThreadPoolExecutor(4) # 4个线程池的容量设定,
for i in range(10):
pool.submit(task,'alex %s'%i) pool.shutdown(wait=True)#等待池内所有任务执行完毕回收完资源后才继续 print('主') alex 0 线程:ThreadPoolExecutor-0_0 pid:11164 run
alex 1 线程:ThreadPoolExecutor-0_1 pid:11164 run
alex 2 线程:ThreadPoolExecutor-0_2 pid:11164 run
alex 3 线程:ThreadPoolExecutor-0_3 pid:11164 run
alex 4 线程:ThreadPoolExecutor-0_2 pid:11164 run
alex 5 线程:ThreadPoolExecutor-0_3 pid:11164 run
alex 6 线程:ThreadPoolExecutor-0_0 pid:11164 run
alex 7 线程:ThreadPoolExecutor-0_0 pid:11164 run
alex 8 线程:ThreadPoolExecutor-0_1 pid:11164 run
alex 9 线程:ThreadPoolExecutor-0_3 pid:11164 run

12 并发编程-(线程)-线程queue&进程池与线程池

2.4 回调函数 add_done_callback(fn)

可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数