一、背景
大家都知道gevent的机制是单线程+协程机制,当遇到可能会阻塞的操作时,就切换到可运行的协程中继续运行,以此来实现提交系统运行效率的目标,但是具体是怎么实现的呢?让我们直接从代码中看一下吧。
二、切换机制
让我们从socket的send、recv方法入手:
1
2
3
4
5
6
7
8
9
10
|
def recv( self , * args):
while 1 :
try :
return self ._sock.recv( * args)
except error as ex:
if ex.args[ 0 ] ! = EWOULDBLOCK or self .timeout = = 0.0 :
raise
# QQQ without clearing exc_info test__refcount.test_clean_exit fails
sys.exc_clear()
self ._wait( self ._read_event)
|
这里会开启一个死循环,在循环中调用self._sock.recv()方法,并捕获异常,当错误是EWOULDBLOCK时,则调用self._wait(self._read_event)方法,该方法其实是:_wait = _wait_on_socket,_wait_on_socket方法的定义在文件:_hub_primitives.py中,如下:
1
2
3
4
5
6
7
8
9
|
# Suitable to be bound as an instance method
def wait_on_socket(socket, watcher, timeout_exc = None ):
if socket is None or watcher is None :
# test__hub TestCloseSocketWhilePolling, on Python 2; Python 3
# catches the EBADF differently.
raise ConcurrentObjectUseError( "The socket has already been closed by another greenlet" )
_primitive_wait(watcher, socket.timeout,
timeout_exc if timeout_exc is not None else _NONE,
socket.hub)
|
该方法其实是调用了函数:_primitive_wait(),其仍然在文件:_hub_primitives.py中定义,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def _primitive_wait(watcher, timeout, timeout_exc, hub):
if watcher.callback is not None :
raise ConcurrentObjectUseError( 'This socket is already used by another greenlet: %r'
% (watcher.callback, ))
if hub is None :
hub = get_hub()
if timeout is None :
hub.wait(watcher)
return
timeout = Timeout._start_new_or_dummy(
timeout,
(timeout_exc
if timeout_exc is not _NONE or timeout is None
else _timeout_error( 'timed out' )))
with timeout:
hub.wait(watcher)
|
这里其实是调用了hub.wait()函数,该函数的定义在文件_hub.py中,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
def wait( self , watcher):
"""
Wait until the *watcher* (which must not be started) is ready.
The current greenlet will be unscheduled during this time.
"""
waiter = Waiter( self ) # pylint:disable=undefined-variable
watcher.start(waiter.switch, waiter)
try :
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError(
'Invalid switch into %s: got %r (expected %r; waiting on %r with %r)' % (
getcurrent(), # pylint:disable=undefined-variable
result,
waiter,
self ,
watcher
)
)
finally :
watcher.stop()
|
watcher.stop()
该类WaitOperationsGreenlet是Hub的基类,其方法wait中的逻辑是:生成一个Waiter对象,并调用watcher.start(waiter.switch, waiter)方法,watcher是最开始recv方法中使用的self._read_event,watcher是gevent的底层事件框架libev中的概念;同时还有一个waiter对象,它类似与python中的future概念,该对象有一个switch()方法以及get()方法,当没有得到结果没有准备好时,调用waiter.get()方法回导致协程被挂起;get()函数的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def get( self ):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self ._exception is not _NONE:
if self ._exception is None :
return self .value
getcurrent().throw( * self ._exception) # pylint:disable=undefined-variable
else :
if self .greenlet is not None :
raise ConcurrentObjectUseError( 'This Waiter is already used by %r' % ( self .greenlet, ))
self .greenlet = getcurrent() # pylint:disable=undefined-variable
try :
return self .hub.switch()
finally :
self .greenlet = None
|
在get()中最关键的是self.hub.switch()函数,该函数将执行权转移到hub,并继续运行,至此已经分析完了当在worker协程中从网络获取数据遇到阻塞时,如何避免阻塞并切换到hub中的实现,至于何时再切换会worker协程,我们后续再继续分析。
总结
要记得gevent中一个重要的概念,协程切换不是调用而是执行权的转移,从可能会阻塞的协程切换到hub,并由hub在合适的时机切换到另一个可以继续运行的协程继续执行;gevent通过这种形式实现了提高io密集型应用吞吐率的目标。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/lit10050528/p/13549532.html