关于gevent的一些理解(一)

时间:2023-12-13 23:46:50

前言:gevent是python的一个并发框架,以微线程greenlet为核心,使用了epoll事件监听机制以及诸多其他优化而变得高效.而且其中有个monkey类,

将现有基于Python线程直接转化为greenlet(类似于打patch).他和线程框架性能比高大概4倍(看下图,是gevent和paste的对比):

关于gevent的一些理解(一)

工作暂时没有用gevent的地方,这里就简单的对http://sdiehl.github.com/gevent-tutorial的一些例子和内容翻译:

1 同步和异步

import gevent
def foo():
print('Running in foo')
gevent.sleep(0) #让当前的greenlet睡眠N秒,这0标识控制其它协程而不会让其它进程睡眠
print('Explicit context switch to foo again') def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar') gevent.joinall([ #gevent.Greenlet实例,直到这个greenlet完成或者超时
gevent.spawn(foo), #spawn可以实现一个grennlet实例并且加到队列并且启动,效果类似于gevent.Greenlet(foo).start()
gevent.spawn(bar),
])

执行结果的效果图:

关于gevent的一些理解(一)

dongwm@localhost ~ $ python test.py
Explicit context to bar
Running in foo
Explicit context switch to foo again
Implicit context switch back to bar

import time

import gevent
from gevent import select #类似于内置的select.select()实现(请关注http://www.dongwm.com/archives/guanyuselectyanjiu/),只是将线程操作改成了greenlet start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start) def gr1():
print('Started Polling: ', tic())
select.select([], [], [], 2) #参数分别是,等待的可读列表,等待的可写列表,等待的可执行列表,超时时间(这里是2秒)
print('Ended Polling: ', tic()) def gr2():
print('Started Polling: ', tic())
select.select([], [], [], 2)
print('Ended Polling: ', tic()) def gr3():
print("Hey lets do some stuff while the greenlets poll, at", tic())
gevent.sleep(1) gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])

执行结果:

dongwm@localhost ~ $ python test.py
(‘Hey lets do some stuff while the greenlets poll, at’, ‘at 0.0 seconds’) #因为gr1和gr2开始是阻塞的,gr3直接打印
(‘Started Polling: ‘, ‘at 0.0 seconds’)
(‘Started Polling: ‘, ‘at 0.0 seconds’)
(‘Ended Polling: ‘, ‘at 2.0 seconds’)
(‘Ended Polling: ‘, ‘at 2.0 seconds’)

import gevent
import random def task(pid):
gevent.sleep(random.randint(0,2)*0.001)
print('Task', pid, 'done') def synchronous(): #同步
for i in range(1,10):
task(i) def asynchronous(): #异步
threads = [gevent.spawn(task, i) for i in xrange(10)]
gevent.joinall(threads) print('Synchronous:')
synchronous() print('Asynchronous:')
asynchronous()

执行结果:

dongwm@localhost ~ $ python test.py
Synchronous: #协程不会控制其它进程睡眠,所以挨个执行
(‘Task’, 1, ‘done’)
(‘Task’, 2, ‘done’)
(‘Task’, 3, ‘done’)
(‘Task’, 4, ‘done’)
(‘Task’, 5, ‘done’)
(‘Task’, 6, ‘done’)
(‘Task’, 7, ‘done’)
(‘Task’, 8, ‘done’)
(‘Task’, 9, ‘done’)
Asynchronous: #他们放在grennlet里面,sleep的时间是随机的,完成顺序也就不同了
(‘Task’, 2, ‘done’)
(‘Task’, 3, ‘done’)
(‘Task’, 5, ‘done’)
(‘Task’, 7, ‘done’)
(‘Task’, 9, ‘done’)
(‘Task’, 6, ‘done’)
(‘Task’, 1, ‘done’)
(‘Task’, 0, ‘done’)
(‘Task’, 8, ‘done’)
(‘Task’, 4, ‘done’)

import gevent
from gevent import Greenlet def foo(message, n):
gevent.sleep(n)
print(message) thread1 = Greenlet.spawn(foo, "Hello", 1) #实例化Greenlet
thread2 = gevent.spawn(foo, "I live!", 2) #实例化gevent,其实也是创建Greenlet实例,只是包装了一下
thread3 = gevent.spawn(lambda x: (x+1), 2) #一个lambda表达式 threads = [thread1, thread2, thread3]
gevent.joinall(threads) #等待所有greenlet完成

执行结果:

dongwm@localhost ~ $ python test.py
Hello
I live! #打印出来效果不明显,事实上等待一秒打印第一行,再等待一秒打印第二行,然后马上完成(lambda没有显示)

import gevent
from gevent import Greenlet class MyGreenlet(Greenlet): #重载Greenlet类 def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n def _run(self): #重写_run方法
print(self.message)
gevent.sleep(self.n) g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

import gevent

def win():
return 'You win!' def fail():
raise Exception('You fail at failing.') winner = gevent.spawn(win)
loser = gevent.spawn(fail) print(winner.started) # started表示的Greenlet是否已经开始,返回布尔值
print(loser.started) # True try:
gevent.joinall([winner, loser])
except Exception as e:
print('This will never be reached') print(winner.value) # value表示greenlet实例返回值:'You win!'
print(loser.value) # None print(winner.ready()) # 是否已停止Greenlet的布尔值,True
print(loser.ready()) # True print(winner.successful()) # 表示的Greenlet是否已成功停止,而不是抛出异常,True
print(loser.successful()) # False
print(loser.exception) #打印异常的报错信息

执行结果:

dongwm@localhost ~ $ python test.py
True
True
Traceback (most recent call last):
File “/usr/lib/python2.7/site-packages/gevent-1.0dev-py2.7-linux-i686.egg/gevent/greenlet.py”, line 328, in run
result = self._run(*self.args, **self.kwargs)
File “test.py”, line 7, in fail
raise Exception(‘You fail at failing.’)
Exception: You fail at failing.
<Greenlet at 0xb73cd39cL: fail> failed with Exception

You win!
None
True
True
True
False
You fail at failing.

import gevent
from gevent import Timeout seconds = 10 timeout = Timeout(seconds)
timeout.start() def wait():
gevent.sleep(10) try:
gevent.spawn(wait).join()
except Timeout:
print 'Could not complete'

上面的例子是可以执行完成的,但是假如修改seconds = 5,让数值少入sleep,那么就会有超时被捕捉到

还可以使用with关键字处理上下文:

import gevent
from gevent import Timeout time_to_wait = 5 # seconds class TooLong(Exception):
pass with Timeout(time_to_wait, TooLong):
gevent.sleep(10)

以及其他的方式的:

import gevent
from gevent import Timeout def wait():
gevent.sleep(2) timer = Timeout(1).start()
thread1 = gevent.spawn(wait) #这种超时类型前面讲过 try:
thread1.join(timeout=timer)
except Timeout:
print('Thread 1 timed out') timer = Timeout.start_new(1) #start_new是一个快捷方式
thread2 = gevent.spawn(wait) try:
thread2.get(timeout=timer) #get返回greenlet的结果,包含异常
except Timeout:
print('Thread 2 timed out') try:
gevent.with_timeout(1, wait) #如果超时前返回异常,取消这个方法
except Timeout:
print('Thread 3 timed out')

2 数据结构

import gevent
from gevent.event import AsyncResult a = AsyncResult() #保存一个值或者一个异常的事件实例 def setter():
gevent.sleep(3) #3秒后唤起所有线程的a的值
a.set() #保存值,唤起等待线程 def waiter():
a.get() # 3秒后get方法不再阻塞,返回存贮的值或者异常
print 'I live!' gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])

更清晰的例子:

import gevent
from gevent.event import AsyncResult
a = AsyncResult() def setter():
gevent.sleep(3)
a.set('Hello!') def waiter():
print a.get() gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])
import gevent
from gevent.queue import Queue #类似于内置的Queue tasks = Queue() #队列实例 def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0) print('Quitting time!') def boss():
for i in xrange(1,25):
tasks.put_nowait(i) #非阻塞的把数据放到队列里面 gevent.spawn(boss).join() gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])

执行结果:

[root@248_STAT ~]# python !$
python test.py
Worker steve got task 1 #3个用户循环的取出数据
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!

一个更复杂的例子:

import gevent
from gevent.queue import Queue, Empty tasks = Queue(maxsize=3) #限制队列的长度 def worker(n):
try:
while True:
task = tasks.get(timeout=1) # 减少队列,超时为1秒
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
except Empty:
print('Quitting time!') def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
""" for i in xrange(1,10):
tasks.put(i) #这里boss没有盲目的不停放入数据,而是在当最大三个队列数有空余才放入数据,事实上方法转换过程中,boss放入三个数据,worker取出三个数据,boss再放入数据....
print('Assigned all work in iteration 1') for i in xrange(10,20):
tasks.put(i)
print('Assigned all work in iteration 2') gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])

import gevent

from gevent.pool import Group
def talk(msg):
for i in xrange(3):
print(msg) g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz') group = Group() #保持greenlet实例的组运行,连接到没个项目,在其完成后删除
group.add(g1)
group.add(g2)
group.join() group.add(g3)
group.join()

看更加明确的例子:

import gevent
from gevent import getcurrent
from gevent.pool import Group group = Group() def hello_from(n):
print('Size of group', len(group))
print('Hello from Greenlet %s' % id(getcurrent())) #获取当前gevent实例的id group.map(hello_from, xrange(3)) #map迭代方法,参数为方法和其参数 def intensive(n):
gevent.sleep(3 - n)
return 'task', n print('Ordered') ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)): #相当于 itertools.imap,返回一个迭代器, 它是调用了一个其值在输入迭代器上的函数, 返回结果. 它类似于函数 map() , 只是前者在
#任意输入迭代器结束后就停止(而不是插入None值来补全所有的输入)
print(i) print('Unordered') igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
print(i)

执行结果:

[root@248_STAT ~]# python test.py
(‘Size of group’, 3)
Hello from Greenlet 314818960
(‘Size of group’, 3)
Hello from Greenlet 314819280
(‘Size of group’, 3)
Hello from Greenlet 314819440
Ordered
(‘task’, 0)
(‘task’, 1)
(‘task’, 2)
Unordered
(‘task’, 2)
(‘task’, 1)
(‘task’, 0)

还能限制pool池的大小

import gevent
from gevent import getcurrent
from gevent.pool import Pool pool = Pool(2) def hello_from(n):
print('Size of pool', len(pool)) pool.map(hello_from, xrange(3))

返回结果:

[root@248_STAT ~]# python test.py
(‘Size of pool’, 2)
(‘Size of pool’, 2)
(‘Size of pool’, 1) #因为上面的pool容纳不了第三个,这是一个新的pool

以下是作者写的一个pool操作类:

from gevent.pool import Pool

class SocketPool(object):

    def __init__(self):
self.pool = Pool(1000) #设置池容量1000
self.pool.start() def listen(self, socket):
while True:
socket.recv() def add_handler(self, socket):
if self.pool.full(): #容量慢报错
raise Exception("At maximum pool size")
else: #否则执行在新的grenlet里面执行listen方法
self.pool.spawn(self.listen, socket) def shutdown(self):
self.pool.kill() #关闭pool

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore sem = BoundedSemaphore(2) #设定对共享资源的访问数量 def worker1(n):
sem.acquire() #获取资源
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release() #释放资源
print('Worker %i released semaphore' % n) def worker2(n):
with sem: #使用with关键字
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n) pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))

执行结果:

[root@248_STAT ~]# python test.py
Worker 0 acquired semaphore
Worker 1 acquired semaphore #因为pool能容纳这2个请求,所以同时获取,再释放
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore #因为只能接收2个,那么5就要到下一轮
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

一个gevent教材上面说过的ping pong的那个协程例子的另一个实现:

import gevent

from gevent.queue import Queue
from gevent import Greenlet class Actor(gevent.Greenlet): #自定义actor类 def __init__(self):
self.inbox = Queue() #收件箱作为一个队列
Greenlet.__init__(self) def receive(self, message):
raise NotImplemented() #内置常量,表面意为没有实施 def _run(self): #
self.running = True while self.running:
message = self.inbox.get() #获取队列数据
self.receive(message) class Pinger(Actor):
def receive(self, message): #重写方法
print message
pong.inbox.put('ping') #当获取收件箱有数据,获取数据,再放入数据(注意:是ping中放pong数据),其中pong是一个局部变量,它是Ponger的实例,以下的同理
gevent.sleep(0) class Ponger(Actor):
def receive(self, message):
print message
ping.inbox.put('pong')
gevent.sleep(0) ping = Pinger()
pong = Ponger() ping.start()
pong.start() ping.inbox.put('start') #最开始都是阻塞的,给一个触发
gevent.joinall([ping, pong])

from:http://www.dongwm.com/archives/guanyugeventdeyixielijieyi-2/

英文原版:http://sdiehl.github.io/gevent-tutorial/