python全栈开发 * 线程队列 线程池 协程 * 180731

时间:2024-01-16 11:30:56

一.线程队列

队列:
1.Queue
先进先出
自带锁 数据安全 
from queue import Queue   
from multiprocessing import Queue (IPC队列)
2.LifoQueue后进先出
后进先出
自带锁 数据安全
from queue import LifoQueue
lq=LifoQueue(5)
lq.put(123)
lq.put(666)
lq.put(888)
lq.put(999)
lq.put("love")
print(lq.put_nowait("miss")) #报错 queue.Full
print(lq) # <queue.LifoQueue object at 0x0000017901BC8C88>
print(lq.get()) #love
print(lq.get()) #
print(lq.get()) #
print(lq.get()) #
print(lq.get()) #
#print(lq.get_nowait()) #报错 queue.Empty
3.PriorityQueue优先级队列
(放元组,数字从小到大,英文字母按ASCII码先后顺序)
from queue import PriorityQueue
pq=PriorityQueue(4)
pq.put((10,"aaa"))
pq.put((5,"S"))
pq.put((5,"ccc"))
pq.put((10,"zzz")) #pq.put_nowait((10,"bbb")) #报错queue.Full
print(pq) # <queue.PriorityQueue object at 0x000001D6FEF38C50> print(pq.get())
print(pq.get()) #(5, 'ccc')
print(pq.get()) #(10, 'aaa')
print(pq.get()) #(10, 'zzz')
print(pq.get()) #(20, 'bbb')
# print(pq.get_nowait()) # 报错queue.Empty
二 线程池
Multiprocessing模块 自带进程池Pool
Threading 模块 没有Pool(没有线程池)
concurrent.futures帮助你管理线程池和进程池
高度封装
进程池/线程池的统一的统一的使用方法
import time
from threading import currentThread
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
def func(i):
time.sleep(1)
print("in %s %s"%(i,currentThread()))
return i**2
def back(fn):
print(fn.result(),currentThread()) t=ThreadPoolExecutor(5)
ret_l=[]
for i in range(20):
ret=t.submit(func,i).add_done_callback(back)
# ret_l.append(ret)
t.shutdown(wait=True) #括号里可以省略
# for ret in ret_l:
# print(ret.result())
print(666)
 ThreadPoolExecutor的相关方法:
1.t.map方法 启动多线程任务 # t.map(func,range(20)) 替代for submit
2.t.submit(func,*args,**kwargs) 异步提交任务
3.t.shutdown (wait=True) 相当于进程池的pool.close()+pool.join()操作 同步控制
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
submit和map必须在shutdown之前
4.result获取结果 ret.result()
5.回调函数 add_done_callback(back)
在回调函数内接收的参数是一个对象,需要通过result来获取返回值
在主进程中执行
三.协程
进程:资源分配的最小单位
线程 :CPU调度的最小单位
协程: 能在一条线程的基础上,在多个任务之间互相切换
节省线程开启的消耗
从python代码的级别调度
正常的线程是CPU调度的最小单位
协程的调度并不是由操作系统来完成的.
(一).yield的机制就是协程
 def func():
print(1)
x=yield "aaa"
print(x)
yield "bbb"
g=func()
print(next(g))
print(g.send("***"))
(二).在多个函数之间互相切换的功能--协程
 def consumer():
while True:
x=yield
print(x) def producer():
g=consumer()
next(g) for i in range(10):
g.send(i)
producer()
yeild 只有程序之间的切换,没有重利用任何IO操作的时间
greenlet(第三方模块) 程序上下文切换
cmd : pip3 install 模块名 安装第三方模块
(三).greenlet
协程模块 单纯的程序切换耗费时间
 import time
from greenlet import greenlet
def eat():
print('吃')
time.sleep(1)
g2.switch()
print("吃完了")
time.sleep(1)
g2.switch() def play():
print("玩")
time.sleep(1)
g1.switch()
print("玩美了") g1=greenlet(eat)
g2=greenlet(play)
g1.switch()
(四).gevent
遇到IO就切换 使用协程减少IO操作带来的时间消耗
greenlet 是gevent的底层
gevent是基于greenlet实现的
python代码在控制程序的切换
第一版:
import time
import gevent
from gevent import monkey
def eat():
print("吃")
gevent.sleep(2)
print("吃完了")
def play():
print("玩")
gevent.sleep(2)
print("玩美了") g1=gevent.spawn(eat)
g2=gevent.spawn(play)
g1.join() #等待g1结束
g2.join() #等待g2结束
第二版
要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
from gevent import monkey;monkey.patch_all()
import time
import gevent def eat(name):
print("吃")
time.sleep(2)
print("%s吃完了"%name) def play():
print("玩")
time.sleep(2)
print("玩美了") g1=gevent.spawn(eat,"alex") #括号里传参第一个是函数名,后面可以跟多个参数可以是位置参数,也可以是关键字参数,都是传给eat的
g2=gevent.spawn(play)
gevent.joinall([g1,g2])# g1.join()和g2.join()合并成一个.
print(g1.value) #None
四.协程起socket(tcp)
服务器代码
from gevent import monkey;monkey.patch_all()
import socket
import gevent
def talk(conn):
while True:
conn.send(b'hallo')
print(conn.recv(1024))
sk=socket.socket()
sk.bind(("127.0.0.1",9902))
sk.listen()
while True:
conn,addr=sk.accept()
gevent.spawn(talk,conn)
客户端代码
import socket
from threading import Thread
def client():
sk=socket.socket()
sk.connect(("127.0.0.1",9902))
while True:
print(sk.recv(1024))
sk.send(b'hi')
for i in range(5):
Thread(target=client).start()