并发编程 - IO模型 - 1.io模型/2.阻塞io/3.非阻塞io/4.多路复用io

时间:2022-05-16 00:00:49
1.io模型
提交任务得方式:
同步:提交完任务,等结果,执行下一个任务
异步:提交完,接着执行,异步 + 回调 异步不等结果,提交完任务,任务执行完后,会自动触发回调函数
同步不等于阻塞:
阻塞:遇到io,自己不处理,os会抢走cpu ,解决办法:监测到io,gevent切换到其他任务,类似欺骗os
非阻塞:cpu 运行
IO分类:
1.阻塞IO blocking IO
2.非阻塞IO nonblocking IO
3.IO多路复用 IO multiplexing
4.信号驱动IO signal driven IO 用得比较少
5.异步IO asynchronous IO
遇到IO: 卡
网络IO: 原地阻塞
1.server端什么样得操作属于IO行为
# accept recv send 阻塞操作 accept recv 明显得等 send 不会明显等,但是一种io行为
2.为什么IO行为会让有在原地等待的效果

2.阻塞io
server:
 1 from socket import *
 2 from threading import Thread
 3 
 4 def communicate(conn):
 5     while True:
 6         try:
 7             data = conn.recv(1024)
 8             if not data: break
 9             conn.send(data.upper())
10         except ConnectionResetError:
11             break
12 
13     conn.close()
14 
15 
16 
17 server = socket(AF_INET, SOCK_STREAM)
18 server.bind(('127.0.0.1',8080))
19 server.listen(5)
20 
21 while True:
22     print('starting...')
23     conn, addr = server.accept()  # io 阻塞 os拿走了cpu
24     print(addr)
25 
26     t=Thread(target=communicate,args=(conn,))
27     t.start()
28 
29 server.close()
client:
 1 from socket import *
 2 
 3 client=socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8080))
 5 
 6 
 7 while True:
 8     msg=input('>>: ').strip()
 9     if not msg:continue
10     client.send(msg.encode('utf-8'))
11     data=client.recv(1024)
12     print(data.decode('utf-8'))
13 
14 client.close()
3.非阻塞io:
自己监测io 遇到io 就切 并且把 单线程得效率提到最高
导致得问题:
1.当有数据来得时候,cpu 在做其他得事情,不会立即响应
2.服务端没有任何阻塞,说白了,就是死循环,cpu会一直运转,线程处于就绪状态,大量占用cpu ,做无用,这个线程会一直问cpu,有数据没,有数据没
不推荐使用
server:
 1 from socket import *
 2 
 3 server = socket(AF_INET, SOCK_STREAM)
 4 server.bind(('127.0.0.1',8083))
 5 server.listen(5)
 6 server.setblocking(False)   # 默认True 阻塞
 7 print('starting...')
 8 
 9 
10 rlist=[]
11 wlist=[]
12 while True:
13 
14     try:  # 服务端不停得建链接
15         conn, addr = server.accept()
16         rlist.append(conn)
17         print(rlist)
18     except BlockingIOError:  # 没阻塞
19         # print('干其他的活')
20 
21         #收消息
22         del_rlist = []
23         for conn in rlist:
24             try:
25                 data=conn.recv(1024)
26                 if not data:
27                     del_rlist.append(conn)
28                     continue
29                 wlist.append((conn,data.upper()))
30             except BlockingIOError:
31                 continue
32             except Exception:
33                 conn.close()
34                 del_rlist.append(conn)
35 
36         #发消息
37         del_wlist=[]
38         for item in wlist:
39             try:
40                 conn=item[0]
41                 data=item[1]
42                 conn.send(data)  # send 在数据量 过大时 也会阻塞
43                 del_wlist.append(item)
44             except BlockingIOError:
45                 pass
46 
47         for item in del_wlist:
48             wlist.remove(item)
49 
50         for conn in del_rlist:
51             rlist.remove(conn)
52 
53 
54 server.close()
client:
 1 from socket import *
 2 
 3 client=socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8083))
 5 
 6 
 7 while True:
 8     msg=input('>>: ').strip()
 9     if not msg:continue
10     client.send(msg.encode('utf-8'))
11     data=client.recv(1024)
12     print(data.decode('utf-8'))
13 
14 client.close()
4.多路复用io:
wait copy 还多了select 中间有个中介存在,帮问os 有没有数据
但是如果中介 只有1个 效率不如 阻塞效率
但是如果中介监测多个套接字 ,性能高就是:同时监测多个套接字问os系统好了没 就比阻塞io效率高
监测套接字得io行为
服务端得套接字有几类:server conn

select 阻塞io 效率高
比非阻塞io 效率也高 ,一直做无用

总结:
同时监测多个套接字
列表 循环 慢 假设列表数据多,循环 效率低 监测套接字好没好 从头到尾 循环1遍
select 列表循环 效率低
poll 可接收得列表数据多 效率也不高
epoll 效率最高得 异步操作 每个套接字身上绑定个回调函数,谁好了谁触发回调,(就不用去遍历了 效率低)
epoll windows 不支持
linux 支持
selectors 模块 自动根据操作系统选择
poll
epoll
server:
 1 from socket import *
 2 import select
 3 
 4 server = socket(AF_INET, SOCK_STREAM)
 5 server.bind(('127.0.0.1',8083))
 6 server.listen(5)
 7 server.setblocking(False)
 8 print('starting...')
 9 
10 rlist=[server,]
11 wlist=[]
12 wdata={}
13 
14 while True:
15     rl,wl,xl=select.select(rlist,wlist,[],0.5) # [] 异常列表 每隔0.5s 问一次
16     print('rl',rl)
17     print('wl',wl)
18 
19     for sock in rl:
20         if sock == server:
21             conn,addr=sock.accept()
22             rlist.append(conn)
23         else:
24             try:
25                 data=sock.recv(1024)
26                 if not data:
27                     sock.close()
28                     rlist.remove(sock)
29                     continue
30                 wlist.append(sock)
31                 wdata[sock]=data.upper()
32             except Exception:
33                 sock.close()
34                 rlist.remove(sock)
35 
36     for sock in wl:
37         data=wdata[sock]
38         sock.send(data)
39         wlist.remove(sock)
40         wdata.pop(sock)
41 
42 server.close()
client:
 1 from socket import *
 2 
 3 client=socket(AF_INET,SOCK_STREAM)
 4 client.connect(('127.0.0.1',8083))
 5 
 6 
 7 while True:
 8     msg=input('>>: ').strip()
 9     if not msg:continue
10     client.send(msg.encode('utf-8'))
11     data=client.recv(1024)
12     print(data.decode('utf-8'))
13 
14 client.close()