基于协程实现并发的套接字通信:
server
1 from gevent import monkey,spawn;monkey.patch_all() 2 from threading import Thread 3 from socket import * 4 5 def talk(conn): 6 while True: 7 try: 8 data=conn.recv(1024) 9 if not data:break 10 conn.send(data.upper()) 11 except ConnectionResetError: 12 break 13 conn.close() 14 def server(ip,port,backlog=5): 15 s = socket() 16 s.bind((ip,port)) 17 s.listen(backlog) 18 while True: 19 conn, addr = s.accept() 20 print(addr) 21 # 通信 22 g=spawn(talk,conn) 23 s.close() 24 25 if __name__ == '__main__': 26 spawn(server,'127.0.0.1',8080).join() 27 # server(('127.0.0.1',8080))
client
1 from threading import Thread,current_thread 2 from socket import * 3 def client(): 4 client = socket() 5 client.connect(('127.0.0.1', 8080)) 6 while True: 7 data = '%s hello' % current_thread().name 8 client.send(data.encode('utf-8')) 9 res = client.recv(1024) 10 print(res.decode('utf-8')) 11 if __name__ == '__main__': 12 for i in range(1000): 13 t=Thread(target=client) 14 t.start()
网络IO操作:
server
1 from socket import * 2 3 s = socket() 4 s.bind(('127.0.0.1',8080)) 5 s.listen(5) 6 7 while True: 8 conn, addr = s.accept() 9 print(addr) 10 while True: 11 try: 12 data = conn.recv(1024) 13 if not data: break 14 print('from client msg: ',data) 15 except ConnectionResetError: 16 break 17 conn.close()
client
1 from socket import * 2 3 client = socket() 4 client.connect(('127.0.0.1', 8080)) 5 6 while True: 7 8 data = input('>>: ').strip() 9 if not data:continue 10 client.send(data.encode('utf-8')) 11 print('has send')
阻塞IO模型:blockingIOError!
阻塞IO模型:NonblockingIOError!
server
1 from socket import * 2 import time 3 4 s = socket() 5 s.bind(('127.0.0.1',8080)) 6 s.listen(5) 7 s.setblocking(False) 8 r_list = [] 9 w_list = [] 10 while True: 11 try: 12 conn,addr = s.accept() 13 r_list.append(conn) 14 15 except BlockingIOError: 16 # time.sleep(0.5) 17 print('干其他活了!') 18 19 print('rlist:',len(r_list)) 20 21 #收消息 22 del_rlist = [] 23 for conn in r_list: 24 try: 25 data = conn.recv(1024) 26 27 #linux 系统 28 if not data: 29 conn.close() 30 del_rlist.append(conn) 31 continue 32 33 # conn.send(data.upper()) 34 # w_list.append((conn,data.upper())) 35 except BlockingIOError: 36 continue 37 38 except ConnectionResetError: 39 conn.close() 40 del_rlist.append(conn) 41 42 43 #发消息 44 del_wlist = [] 45 for item in w_list: 46 try: 47 conn= item[0] 48 res= item[1] 49 conn.send(res) 50 del_wlist.append(item) 51 except BlockingIOError: 52 continue 53 except ConnectionResetError: 54 conn.close() 55 del_wlist.append(conn) 56 57 for conn in del_rlist: 58 r_list.remove(conn) 59 60 for item in del_wlist: 61 w_list.remove(item)
client
1 from socket import * 2 import os 3 from threading import Thread 4 5 client = socket() 6 client.connect(('127.0.0.1',8080)) 7 8 while True: 9 data = '%s say hello' %os.getpid() 10 client.send(data.encode('utf-8')) 11 res = client.recv(1024) 12 print(res.decode('utf-8'))
异步IO:
1 from concurrent.futures import ThreadPoolExecutor 2 from threading import current_thread 3 import time 4 import os 5 6 def task(n): 7 print('%s is running' %current_thread().name) 8 time.sleep(2) 9 return n**2 10 11 def parse(obj): 12 res=obj.result() 13 print(res) 14 15 if __name__ == '__main__': 16 t=ThreadPoolExecutor(4) 17 18 future1=t.submit(task,1) 19 future1.add_done_callback(parse) #parse函数会在future1对应的任务执行完毕后自动执行,会把future1自动传给parse 20 21 future2=t.submit(task,2) 22 future2.add_done_callback(parse) 23 24 future3=t.submit(task,3) 25 future3.add_done_callback(parse) 26 27 future4=t.submit(task,4) 28 future4.add_done_callback(parse)
IO多路复用模型:multiplexing
server
1 from socket import * 2 import select 3 4 s = socket() 5 s.bind(('127.0.0.1',8080)) 6 s.listen(5) 7 s.setblocking(False) 8 # print(s) 9 10 r_list=[s,] 11 w_list=[] 12 w_data={} 13 while True: 14 print('被检测r_list: ',len(r_list)) 15 print('被检测w_list: ',len(w_list)) 16 rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn] 17 18 # print('rl: ',len(rl)) #rl=[conn,] 19 # print('wl: ',len(wl)) 20 21 # 收消息 22 for r in rl: #r=conn 23 if r == s: 24 conn,addr=r.accept() 25 r_list.append(conn) 26 else: 27 try: 28 data=r.recv(1024) 29 if not data: 30 r.close() 31 r_list.remove(r) 32 continue 33 # r.send(data.upper()) 34 w_list.append(r) 35 w_data[r]=data.upper() 36 except ConnectionResetError: 37 r.close() 38 r_list.remove(r) 39 continue 40 41 # 发消息 42 for w in wl: 43 w.send(w_data[w]) 44 w_list.remove(w) 45 w_data.pop(w)
client
1 from socket import * 2 import os 3 4 client = socket() 5 client.connect(('127.0.0.1', 8080)) 6 7 while True: 8 data='%s say hello' %os.getpid() 9 client.send(data.encode('utf-8')) 10 res=client.recv(1024) 11 print(res.decode('utf-8'))