day 35 关于线程

时间:2022-01-16 19:34:28

并发编程之协程

 

  对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。

协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行;可以检测io操作,在遇到io操作的情况下才发生切换。

一、协程介绍

1、定义

  协程是单线程下的并发,又称微线程。协程是一种用户态的轻量级的线程,即协程是由用户程序自己控制调度的。强调如下:

(1)python的线程属于内核级别的,即由操作系统控制调度,如果遇到io或者执行时间过长,操作系统就会强迫要求交出cpu的执行权限,执行其他线程;

(2)单线程内开启协程,一旦遇到io就会从应用程序的级别进行切换,从而提升效率。注意:一定是遇到IO才切

greenlet模块

2、优点

  优点如下:

  (1)协程的切换的开销较小,属于程序级别的切换,操作系统无感知,因而更加的轻量级;

  (2)实现单线程内的并发,最大限度的利用cpu。

3、缺点

  缺点如下:

  (1)协程的本质是在单线程下,无法利用多核;

  (2)协程指的是单个线程,一旦协程遇到阻塞,则会阻塞整个线程。

4、总结协程特点:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

二、greenlet模块

  greenlet模块提供一种在多种任务之间来回切换的功能,这种切换手段非常便捷。如下例:

 
from greenlet import greenlet
def eat(name):
print('%s is eating1' %name)
g2.switch('alex') #2.切换执行play('alex')
print('%s is eating2' %name)
g2.switch() #4.切换继续执行play('alex')
def play(name):
print('%s is playing1' %name)
g1.switch() #3.切换继续执行eat('alex')
print('%s is playing2' %name)
g1=greenlet(eat)
g2=greenlet(play)
'''switch只有第一次切换时需要传参'''
g1.switch('alex') #1.切换执行eat('alex') '''
输出结果:
alex is eating1
alex is playing1
alex is eating2
alex is playing2
'''
day 35 关于线程

  以上实例中gevent.sleep()模拟gevent可以识别的IO,对于实际的应用没有意义。对于time.sleep()等gevent不可识别的IO的阻塞该如何解决呢?请看下例。

2、gevent不可识别的IO阻塞情况

  实例:

day 35 关于线程
from gevent import monkey;monkey.patch_all()
'''from gevent import monkey;monkey.patch_all()必须放在文件的最开头'''
import time,gevent
from threading import current_thread
def eat(name):
print('%s is running1:%s' %(name,current_thread().getName()))
time.sleep(2)
print('%s is running2:%s' %(name,current_thread().getName())) def play(name):
print('%s is playing1:%s' %(name,current_thread().getName()))
time.sleep(1)
print('%s is playing2:%s' % (name, current_thread().getName())) start=time.time()
g1=gevent.spawn(eat,'alex')
g2=gevent.spawn(play,name='alex')
gevent.joinall([g1,g2])
stop=time.time()
print(stop-start)
'''
alex is running1:DummyThread-1
alex is playing1:DummyThread-2
alex is playing2:DummyThread-2
alex is running2:DummyThread-1
2.0029842853546143
'''
day 35 关于线程

  上例才是我们最终实现的协程,为解决识别正常阻塞的问题,我们采用打补丁的方式:from gevent import monkey;monkey.patch_all(),补丁必须放在文件的最开头。结果中

DummyThread-n翻译为假线程的意思,并不是开启了多线程。

四、协程实例

1、协程实现爬虫实例

day 35 关于线程
from gevent import monkey
monkey.patch_all()
import gevent
import requests
from threading import current_thread
def get(url):
print('%s get %s' %(current_thread().getName(),url))
response=requests.get(url) #存在IO
if response.status_code==2:
return {'url': len(response.text)} g1=gevent.spawn(get,'www.baidu.com')
g2=gevent.spawn(get,'www.qq.com')
g3=gevent.spawn(get,'www.jd.com') g1.join()
g2.join()
g3.join() print(g1.value) #value为取值
print(g2.value)
print(g3.value)
day 35 关于线程

2、协程实现socket通信

  服务端:

day 35 关于线程
from gevent import monkey
monkey.patch_all()
import gevent
from socket import *
from threading import current_thread sever = socket(AF_INET,SOCK_STREAM) # 默认参数可以不传
sever.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sever.bind(('127.0.0.1',8090))
sever.listen()
def talk(conn,addr):
print('%s got a connect from %s:%s'%(current_thread().getName(),addr[0],addr[1]))
while True:
data = conn.recv(1024).decode('utf-8')
if not data:break
conn.send(data.upper().encode()) if __name__ == '__main__':
while True:
conn,addr = sever.accept()
g = gevent.spawn(talk,conn,addr)
day 35 关于线程

客户端

day 35 关于线程
from socket import *
client=socket()
client.connect(('127.0.0.1',8090))
while True:
msg=input('>>>').strip()
if not msg:continue
client.send(msg.encode())
res=client.recv(1024).decode()
print(res) #多线程开客户端
# from threading import Thread
# from socket import *
# client=socket()
# client.connect(('127.0.0.1',8090))
# def talk(client):
# client.send('hello'.encode())
# res=client.recv(1024).decode()
# print(res)
#
# if __name__ == '__main__':
# for i in range(100):
# t=Thread(target=talk,args=(client,))
# t.start()