转自:http://www.elias.cn/Python/PyConcurrency
1. 前言
偶然看到Erlang vs. Stackless python: a first benchmark,对Erlang和Stackless Python的并发处理性能进行了实验比较,基本结论认为二者有比较相近的性能。我看完产生的问题是,Stackless Python与Python的其他并发实现机制性能又会有多大区别呢,比如线程和进程。因此我采用与这篇文章相同的办法来对Stackless Python、普通Python的thread模块、普通Python的threading模块、普通Python的processing模块这四种并发实现方案进行了性能实验,并将实验过程和基本结果记录在这里。
后来看到了基于greenlet实现的高性能网络框架Eventlet,因而更新了实验方案,将greenlet也加入了比较,虽然greenlet并非是一种真正意义上的并发处理,而是在单个线程下对程序块进行切换轮流执行。
2. 实验方案
实验方案与Erlang vs. Stackless python: a first benchmark是相同的,用每种方案分别给出如下问题的实现,记录完成整个处理过程的总时间来作为评判性能的依据:
- 由n个节点组成一个环状网络,在上面传送共m个消息。
- 将每个消息(共m个),逐个发送给1号节点。
- 第1到n-1号节点在接收到消息后,都转发给下一号节点。
- 第n号节点每次收到消息后,不再继续转发。
- 当m个消息都从1号逐个到达第n号节点时,认为全部处理结束。
2.1 硬件平台
Macbook Pro 3,1上的Vmware Fusion 1.0虚拟机中,注意这里给虚拟机只启用了cpu的单个核心:
- 原始Cpu:Core 2 Duo,2.4 GHz,2核心,4 MB L2 缓存,总线速度800 MHz
- 分配给虚拟机的内存:796M
2.2 软件平台
Vmware Fusion 1.0下的Debian etch:
- 原始Python:Debian发行版自带Python 2.4.4
- Python 2.4.4 Stackless 3.1b3 060516
- processing-0.52-py2.4-linux-i686.egg
- 原始Python下的greenlet实现:py lib 0.9.2
3. 实验过程及结果
各方案的实现代码见后文。实验时使用time指令记录每次运行的总时间,选用的都是不做任何输出的no_io实现(Python的print指令还是挺耗资源的,如果不注释掉十有八九得影响测试结果),每次执行时设定n=300,m=10000(Erlang vs. Stackless python: a first benchmark文章中认为n可以设置为300,m则可以取10000到90000之间的数值分别进行测试)。
3.1 Stackless Python的实验结果
real0m1.651s
user0m1.628s
sys0m0.020s
即使将m扩大到30000,实验结果仍然很突出:
real0m4.749s
user0m4.716s
sys0m0.028s
3.2 使用thread模块的实验结果
real1m13.009s
user0m2.476s
sys0m59.028s
3.3 使用threading模块配合Queue模块的实验结果
不太稳定,有时候这样:
real1m9.222s
user0m34.418s
sys0m34.622s
也有时这样:
real2m14.016s
user0m6.644s
sys2m7.260s
3.4 使用processing模块配合Queue模块的实验结果
real3m43.539s
user0m15.345s
sys3m27.953s
3.5 greenlet模块的实验结果
real0m9.225s
user0m0.644s
sys0m8.581s
3.6 eventlet模块的实验结果
注意!eventlet 的这个实验结果是后来增补的,硬件平台没变,但是是直接在 OSX 自带 Python 2.5 环境下执行出来的,同时系统中还有 Firefox 等很多程序也在争夺系统资源。因此只能作为大致参考,不能与其他几组数据作直接对比。(其中 eventlet 的版本是 0.9.5)
real 0m21.610s
user 0m20.713s
sys 0m0.215s
4. 结论与分析
4.1 Stackless Python
毫无疑问,Stackless Python几乎有匪夷所思的并发性能,比其他方案快上几十倍,而且借助Stackless Python提供的channel机制,实现也相当简单。也许这个结果向我们部分揭示了沈仙人基于Stackless Python实现的Eurasia3能够提供相当于c语言效果的恐怖并发性能的原因。
4.2 Python线程
从道理上来讲,thread模块似乎应该和threading提供基本相同的性能,毕竟threading只是对thread的一种封装嘛,后台机制应该是一致的。或许threading由于本身类实例维护方面的开销,应该会比直接用thread慢一点。从实验结果来看,二者性能也确实差不多。只是不大明白为何threading方案的测试结果不是很稳定,即使对其他方案的测试运行多次,误差也不会像threading这么飘。从代码实现体验来说,用threading配合Queue比直接用thread实在是轻松太多了,并且出错的机会也要少很多。
4.3 Python进程
processing模块给出的进程方案大致比thread线程要慢一倍,并且这是在我特意调整虚拟机给它预备了足够空闲内存、避免使用交换分区的情况下取得的(特意分给虚拟机700多M内存就是为了这个)。而其他方案仅仅占用数M内存,完全无需特意调大可用内存总量。当然,如果给虚拟机多启用几个核心的话,processing也许会占上点便宜,毕竟目前thread模块是不能有效利用多cpu资源的(经实验,Stackless Python在开启双核的情况下表现的性能和单核是一样的,说明也是不能有效利用多cpu)。因此一种比较合理的做法是根据cpu的数量,启用少量几个进程,而在进程内部再开启线程进行实际业务处理,这也是目前Python社区推荐的有效利用多cpu资源的办法。好在processing配合其自身提供的Queue模块,编程体验还是比较轻松的。
4.4 greenlet超轻量级方案
基于greenlet的实现则性能仅次于Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一个数量级。其实greenlet不是一种真正的并发机制,而是在同一线程内,在不同函数的执行代码块之间切换,实施“你运行一会、我运行一会”,并且在进行切换时必须指定何时切换以及切换到哪。greenlet的接口是比较简单易用的,但是使用greenlet时的思考方式与其他并发方案存在一定区别。线程/进程模型在大逻辑上通常从并发角度开始考虑,把能够并行处理的并且值得并行处理的任务分离出来,在不同的线程/进程下运行,然后考虑分离过程可能造成哪些互斥、冲突问题,将互斥的资源加锁保护来保证并发处理的正确性。greenlet则是要求从避免阻塞的角度来进行开发,当出现阻塞时,就显式切换到另一段没有被阻塞的代码段执行,直到原先的阻塞状况消失以后,再人工切换回原来的代码段继续处理。因此,greenlet本质是一种合理安排了的串行,实验中greenlet方案能够得到比较好的性能表现,主要也是因为通过合理的代码执行流程切换,完全避免了死锁和阻塞等情况(执行带屏幕输出的ring_greenlet.py我们会看到脚本总是一个一个地处理消息,把一个消息在环上从头传到尾之后,再开始处理下一个消息)。因为greenlet本质是串行,因此在没有进行显式切换时,代码的其他部分是无法被执行到的,如果要避免代码长时间占用运算资源造成程序假死,那么还是要将greenlet与线程/进程机制结合使用(每个线程、进程下都可以建立多个greenlet,但是跨线程/进程时greenlet之间无法切换或通讯)。
Stackless则比较特别,对很多资源从底层进行了并发改造,并且提供了channel等更适合“并发”的通讯机制实现,使得资源互斥冲突的可能性大大减小,并发性能自然得以提高。粗糙来讲,greenlet是“阻塞了我就先干点儿别的,但是程序员得明确告诉greenlet能先干点儿啥以及什么时候回来”;Stackless则是“东西我已经改造好了,你只要用我的东西,并发冲突就不用操心,只管放心大胆地并发好了”。greenlet应该是学习了Stackless的上下文切换机制,但是对底层资源没有进行适合并发的改造。并且实际上greenlet也没有必要改造底层资源的并发性,因为它本质是串行的单线程,不与其他并发模型混合使用的话是无法造成对资源的并发访问的。
greenlet 封装后的 eventlet 方案
eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其他 Python 线程、进程模型非常相似的 api,并且提供了对 Python 发行版自带库及其他模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。并且这个解决方案源自著名虚拟现实游戏“第二人生”,可以说是久经考验的新兴并发处理模型。其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其他 greenlet 执行,这样来保证资源的有效利用。需要注意的是:
- eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然需要把调用模块的代码封装在 Python 标准线程调用中,之后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协作。
- 再有,虽然 eventlet 把 api 封装成了非常类似标准线程库的形式,但两者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源。所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有什么帮助。
在性能测试结果方面,eventlet 消耗的运行时间大致是 greenlet 方案的 3 到 5 倍,而 Python 标准线程模型的 thread 方式消耗的运行时间大致是 eventlet 测试代码的 8 到 10 倍。其中前者可能是因为我们在 eventlet 的测试代码中,使用队列机制来完成所有的消息传递,而队列上的访问互斥保护可能额外消耗了一些运算资源。总体而言,eventlet 模型的并发性能虽然比 Stackless Python 和直接使用 greenlet 有一定差距,但仍然比标准线程模型有大约一个数量级的优势,这也就不奇怪近期很多强调并发性能的网络服务器实现采取 eventlet 、线程、进程三者组合使用的实现方案。
5. 实验代码
实验代码下载:
- 版本3 下载:增加了 eventlet 方案的实验代码。
- 版本2 下载:增加了 greenlet 方案的实验代码。
- 版本1 下载:包括 Stackless Python 、 thread 、 threading 、 processing 四种方案的实验代码。
为方便阅读,将实验中用到的几个脚本的代码粘贴如下,其中Stackless Python方案的代码实现直接取自Erlang vs. Stackless python: a first benchmark:
5.1 ring_no_io_slp.py
# !/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import stackless as SL
def run_benchmark(n, m):
# print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
firstP = cin = SL.channel()
for s in xrange( 1 , n):
seqn = s
cout = SL.channel()
# # print("*> s = %d" % (seqn, ))
t = SL.tasklet(loop)(seqn, cin, cout)
cin = cout
else :
seqn = s + 1
# # print("$> s = %d" % (seqn, ))
t = SL.tasklet(mloop)(seqn, cin)
for r in xrange(m - 1 , - 1 , - 1 ):
# # print("+ sending Msg# %d" % r)
firstP.send(r)
SL.schedule()
def loop(s, cin, cout):
while True:
r = cin.receive()
cout.send(r)
if r > 0:
# print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
break
def mloop(s, cin):
while True:
r = cin.receive()
if r > 0:
# print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
break
def pid(): return repr(SL.getcurrent()).split()[ - 1 ][ 2 : - 1 ]
if __name__ == ' __main__ ' :
run_benchmark(int(sys.argv[ 1 ]), int(sys.argv[ 2 ]))
# !/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys, time
import thread
SLEEP_TIME = 0.0001
def run_benchmark(n, m):
# print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
locks = [thread.allocate_lock() for i in xrange(n)]
firstP = cin = []
cin_lock_id = 0
for s in xrange( 1 , n):
seqn = s
cout = []
cout_lock_id = s
# print("*> s = %d" % (seqn, ))
thread.start_new_thread(loop, (seqn, locks, cin, cin_lock_id, cout, cout_lock_id))
cin = cout
cin_lock_id = cout_lock_id
else :
seqn = s + 1
# print("$> s = %d" % (seqn, ))
thread.start_new_thread(mloop, (seqn, locks, cin, cin_lock_id))
for r in xrange(m - 1 , - 1 , - 1 ):
# print("+ sending Msg# %d" % r)
lock = locks[0]
lock.acquire()
firstP.append(r)
lock.release()
time.sleep(SLEEP_TIME)
try :
while True:
time.sleep(SLEEP_TIME)
except :
pass
def loop(s, locks, cin, cin_lock_id, cout, cout_lock_id):
while True:
lock = locks[cin_lock_id]
lock.acquire()
if len(cin) > 0:
r = cin.pop(0)
lock.release()
else :
lock.release()
time.sleep(SLEEP_TIME)
continue
lock = locks[cout_lock_id]
lock.acquire()
cout.append(r)
lock.release()
if r > 0:
# print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
break
def mloop(s, locks, cin, cin_lock_id):
while True:
lock = locks[cin_lock_id]
lock.acquire()
if len(cin) > 0:
r = cin.pop(0)
lock.release()
else :
lock.release()
time.sleep(SLEEP_TIME)
continue
if r > 0:
# print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
break
thread.interrupt_main()
def pid(): return thread.get_ident()
if __name__ == ' __main__ ' :
run_benchmark(int(sys.argv[ 1 ]), int(sys.argv[ 2 ]))
# !/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import threading, Queue
def run_benchmark(n, m):
# print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
firstP = cin = Queue.Queue()
for s in xrange( 1 , n):
seqn = s
cout = Queue.Queue()
# print("*> s = %d" % (seqn, ))
t = Loop(seqn, cin, cout)
t.setDaemon(False)
t.start()
cin = cout
else :
seqn = s + 1
# print("$> s = %d" % (seqn, ))
t = MLoop(seqn, cin)
t.setDaemon(False)
t.start()
for r in xrange(m - 1 , - 1 , - 1 ):
# print("+ sending Msg# %d" % r)
firstP.put(r)
class Loop(threading.Thread):
def __init__ (self, s, cin, cout):
threading.Thread. __init__ (self)
self.cin = cin
self.cout = cout
self.s = s
def run(self):
while True:
r = self.cin.get()
self.cout.put(r)
if r > 0:
# print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r))
pass
else :
# print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), self.s))
break
class MLoop(threading.Thread):
def __init__ (self, s, cin):
threading.Thread. __init__ (self)
self.cin = cin
self.s = s
def run(self):
while True:
r = self.cin.get()
if r > 0:
# print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), self.s, r))
pass
else :
# print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), self.s))
break
def pid(): return threading.currentThread()
if __name__ == ' __main__ ' :
run_benchmark(int(sys.argv[ 1 ]), int(sys.argv[ 2 ]))
# !/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import processing, Queue
def run_benchmark(n, m):
# print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
firstP = cin = processing.Queue()
for s in xrange( 1 , n):
seqn = s
cout = processing.Queue()
# print("*> s = %d" % (seqn, ))
p = processing.Process(target = loop, args = [seqn, cin, cout])
p.start()
cin = cout
else :
seqn = s + 1
# print("$> s = %d" % (seqn, ))
p = processing.Process(target = mloop, args = [seqn, cin])
p.start()
for r in xrange(m - 1 , - 1 , - 1 ):
# print("+ sending Msg# %d" % r)
firstP.put(r)
p.join()
def loop(s, cin, cout):
while True:
r = cin.get()
cout.put(r)
if r > 0:
# print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
break
def mloop(s, cin):
while True:
r = cin.get()
if r > 0:
# print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
break
def pid(): return processing.currentProcess()
if __name__ == ' __main__ ' :
run_benchmark(int(sys.argv[ 1 ]), int(sys.argv[ 2 ]))
# !/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
from py.magic import greenlet
def run_benchmark(n, m):
# print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
glets = [greenlet.getcurrent()]
for s in xrange( 1 , n):
seqn = s
glets.append(greenlet(loop))
# print("*> s = %d" % (seqn, ))
else :
seqn = s + 1
glets.append(greenlet(mloop))
# print("$> s = %d" % (seqn, ))
glets[ - 1 ].switch(seqn, glets)
for r in xrange(m - 1 , - 1 , - 1 ):
# print("+ sending Msg# %d" % r)
glets[ 1 ].switch(r)
def loop(s, glets):
previous = glets[s - 1 ]
next = glets[s + 1 ]
if s > 1 :
r = previous.switch(s - 1 , glets)
else :
r = previous.switch()
while True:
if r > 0:
# print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("loop", s), s, r))
pass
else :
# print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid("loop", s), s))
break
next.switch(r)
r = previous.switch()
next.switch(r)
def mloop(s, glets):
previous = glets[s - 1 ]
r = previous.switch(s - 1 , glets)
while True:
if r > 0:
# print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid("mloop", s), s, r))
pass
else :
# print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid("mloop", s), s))
break
r = previous.switch()
def pid(func, s): return " <<%s(Greenlet-%d, started)>> " % (func, s)
if __name__ == ' __main__ ' :
run_benchmark(int(sys.argv[ 1 ]), int(sys.argv[ 2 ]))
# !/Library/Frameworks/Python.framework/Versions/2.5/bin/python
# encoding: utf-8
import sys
import eventlet
def run_benchmark(n, m):
# print(">> Python 2.5.1, stackless 3.1b3 here (N=%d, M=%d)!\n" % (n, m))
firstP = cin = eventlet.Queue()
for s in xrange( 1 , n):
seqn = s
cout = eventlet.Queue()
# print("*> s = %d" % (seqn, ))
eventlet.spawn_n(loop, seqn, cin, cout)
cin = cout
else :
seqn = s + 1
# print("$> s = %d" % (seqn, ))
for r in xrange(m - 1 , - 1 , - 1 ):
# print("+ sending Msg# %d" % r)
firstP.put(r)
mloop(seqn, cin)
def loop(s, cin, cout):
while True:
r = cin.get()
cout.put(r)
if r > 0:
# print(": Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("* Proc: <%s>, Seq#: %s, Msg#: terminate!" % (pid(), s))
break
def mloop(s, cin):
while True:
r = cin.get()
if r > 0:
# print("> Proc: <%s>, Seq#: %s, Msg#: %s .." % (pid(), s, r))
pass
else :
# print("@ Proc: <%s>, Seq#: %s, ring terminated." % (pid(), s))
break
def pid(): return eventlet.greenthread.getcurrent()
if __name__ == ' __main__ ' :
run_benchmark(int(sys.argv[ 1 ]), int(sys.argv[ 2 ]))