7.24 IO多路复用和协程代码笔记

时间:2022-04-10 05:13:50

1. 复习

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/23 11:49
# !@Author TrueNewBee
# 1.班级 姓名 作业的内容
# ftp
#
# 笔记 # 今天的内容
# 协程
# 网络IO模型

2.协程

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/23 11:52
# !@Author TrueNewBee
# 进程 启动多个进程 进程之间是由操作系统负责调用
# 线程 启动多个线程 真正被CPU执行的最小单位实际是线程
# 开启一个线程 创建一个线程 寄存器 堆栈
# 关闭一个线程
# 协程
# 本质上是一个线程
# 能够在多个任务之间切换来节省一些IO时间
# 实现并发的手段 # def consumer():
# """创建一个生成器"""
# while True:
# x = yield
# print('处理了数据', x)
#
#
# def producer():
# c = consumer()
# next(c)
# for i in range(10):
# print('生产了数据:', i)
# c.send(i)
#
#
# producer() # 真正的协程模块就是使用greenlet完成的切换
# from greenlet import greenlet
# # 不知道这个模块为何报错
# # 2018-7-23 12:34:39 吃饭去
#
#
# def eat():
# print('eating start')
# g2.switch() # 切换到play
# print('eating end')
# g2.switch()
#
#
# def play():
# print('playing start ')
# g1.switch()
# print('playing end')
#
#
# if __name__ == '__main__':
# # 用于切换线程
# g1 = greenlet(eat)
# g2 = greenlet(play)
# g1.switch() # 放在开头,是为了识别time (IO)
# from gevent import monkey; monkey.patch_all()
# import time
# import gevent
# import threading
#
#
# def eat():
# print(threading.current_thread()) # 查看线程名字
# print('eating start')
# time.sleep(1) # gevent 检测到停1s,则调到另外一个函数中
# print('eating end')
#
#
# def play():
# print(threading.current_thread())
# print('playing start ')
# time.sleep(1)
# print('playing end')
#
#
# if __name__ == '__main__':
# g1 = gevent.spawn(eat) # 开启协程
# g2 = gevent.spawn(play)
# g1.join()
# g2.join() # 进程和线程的任务切换由操作系统完成
# 协程任务之间的切换由程序(代码)完成 只有遇到协程模块能识别的IO操作的时候,程序才会进行任务切换实现并发效果 # 同步 和 异步 (网络操作常用协程)
# from gevent import monkey; monkey.patch_all()
# import time
# import gevent
#
#
# def task():
# time.sleep(1)
# print(12345)
#
#
# def sync():
# for i in range(10):
# task()
#
#
# def async():
# g_list = []
# for i in range(10):
# g = gevent.spawn(task)
# g_list.append(g)
# gevent.joinall(g_list) # for g in g_list :g.join()
#
#
# if __name__ == '__main__':
# sync()
# async() # 协程 : 能够在一个线程中实现并发效果的概念
# 能够规避一些任务中的IO操作
# 在任务的执行过程中,检测到IO就切换到其他任务 # 多线程 被弱化了
# 协程: 在一个线程上,提高cpu的利用率
# 协程相比于多线程的优势 切换的效率更快了 # 爬虫例子(正则基础)
# 请求过程中的IO等待
from gevent import monkey;monkey.patch_all()
import gevent
from urllib.request import urlopen def get_url(url1):
response = urlopen(url1)
content = response.read().decode('utf-8') # 有各式的
return len(content) url = {
'http://www.baidu.com',
'http://www.taobao.com',
'http://www.hao123.com',
}
g_list = []
for i in url:
g = gevent.spawn(get_url, i)
g_list.append(g)
gevent.joinall(g_list)
for g in g_list:
print(g.value)
# socket server

3.用协程写 socket_demo

socket

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/23 16:40
# !@Author TrueNewBee
# 用协程写 socket
# 用协程是最快最方便的 最省时间占用最小,代码间的转换
from gevent import monkey; monkey.patch_all()
import socket
import gevent def talk(conn1):
conn1.send(b'hello')
rec = conn.recv(1024).decode('utf-8')
print(rec)
conn.close() if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn, add = sk.accept()
gevent.spawn(talk, conn)
sk.close()

client

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/23 16:40
# !@Author TrueNewBee
import socket sk = socket.socket()
sk.connect(('127.0.0.1', 8080))
rec = sk.recv(1024).decode('utf-8')
print(rec)
msg = input('>>>>').encode('utf-8')
sk.send(msg)
sk.close()

4.IO模型 笔记

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/24 8:58
# !@Author TrueNewBee
# 同步 : 提交一个任务之后要等待这个任务执行完毕
# 异步 : 只管提交任务,不等待这个任务执行完毕就可以做其他事情
# 阻塞 : input urlopen() 在socket里面:recv() recvfrom() accept
# 非阻塞 : 除了阻塞的其他都是非阻塞 # 阻塞 线程 运行状态 --> 阻塞状态-->就绪
# 非阻塞 # IO多路复用
# select机制 Windows和linux 都是操作系统轮询每一个被监听的项,看是否读操作
# poll机制 linux 它可以监听的对象比select机制可以监听的多
# 随着监听项的增多,导致效率降低
# epoll机制 linux

5.非阻塞模型

socket

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/24 9:18
# !@Author TrueNewBee
# 非阻塞IO模型
# 单线程中非阻塞!(没有用协程!)
import socket sk = socket.socket()
sk.bind(('127.0.0.1', 8080))
sk.setblocking(False) # 默认True阻塞, False非阻塞
sk.listen()
conn_list = []
del_conn = [] # 存入失效连接的列表
while True:
# 接收异常 BlockingIOError 完成非阻塞
try:
conn, add = sk.accept() # 不阻塞,但没人连我会报错
print('建立连接了', add)
# msg = conn.recv(1024) # 不阻塞,但没有消息会报错
# print(msg)
conn_list.append(conn)
except BlockingIOError:
# 循环列表连接 看看是否有人发消息
for con in conn_list:
try:
msg = con.recv(1024) # 不阻塞,但没有消息会报错
if msg == b'':
del_conn.append(con) # 把失效的连接存到del_conn中
continue
print(msg)
con.send(b'bye bye')
except BlockingIOError:
pass
for con in del_conn:
con.close()
conn_list.remove(con) # 在conn_list中删除失效连接
del_conn.clear() # 清空删除列表

5. client

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/24 9:18
# !@Author TrueNewBee
# 非阻塞IO 多线程并发socket IO
import time
import socket
import threading def func():
sk = socket.socket()
sk.connect(('127.0.0.1', 8080))
sk.send(b'hello')
time.sleep(1)
msg = sk.recv(1024)
print(msg)
sk.close() for i in range(20):
threading .Thread(target=func).start()

6.IO多路复用

socket

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/24 11:51
# !@Author TrueNewBee
# IO多路复用 多并发!
import select
import socket sk = socket.socket()
sk.bind(('127.0.0.1', 8080))
sk.setblocking(False)
sk.listen() read_list = [sk] # 储存监听对象
while True: # [sk, conn] sk,发送链接 conn监听发送消息
r_list, w_list, x_list = select.select(read_list, [], [])
for i in r_list:
if i is sk:
conn, add = i.accept() # 没有sk, 有conn则会报错
read_list.append(conn)
else:
ret = i.recv(1024)
if ret == b'':
i.close()
read_list.remove(i)
continue
print(ret)
i.send(b'goodbye')

client

# !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/24 11:51
# !@Author TrueNewBee
import socket
import threading
import time def func():
sk = socket.socket()
sk.connect(('127.0.0.1', 8080))
sk.send(b'hello')
time.sleep(1)
sk.recv(1024)
sk.close() for i in range(20):
threading .Thread(target=func).start()

7.selector_dome

 # !/usr/bin/env python
# !--*--coding:utf-8 --*--
# !@Time :2018/7/24 17:15
# !@Author TrueNewBee
# 服务端
from socket import *
import selectors
sel = selectors.DefaultSelector() def accept(server_fileobj, mask):
conn, addr = server_fileobj.accept()
sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask):
try:
data = conn.recv(1024)
if not data:
print('closing', conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close() sk = socket(AF_INET, SOCK_STREAM)
sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sk.bind(('127.0.0.1', 8088))
sk.listen(5)
sk.setblocking(False) # 设置socket的接口为非阻塞
# 相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
sel.register(sk, selectors.EVENT_READ, accept)
# 说白了就是,如果有人请求连接sk,就调用accept方法 while True:
events = sel.select() # 检测所有的sk,conn,是否有完成wait data的
for sel_obj, mask in events: # [sk]
callback = sel_obj.data # callback = accept
callback(sel_obj.fileobjmask) # accept(server_fileobj,1) # #客户端
# from socket import *
# c=socket(AF_INET, SOCK_STREAM)
# c.connect(('127.0.0.1',8088))
#
# while True:
# msg=input('>>: ')
# if not msg:continue
# c.send(msg.encode('utf-8'))
# data = c.recv(1024)
# print(data.decode('utf-8'))
#
# 基于selectors模块实现聊天