I am using the I/O non-blocking python server Tornado. I have a class of GET
requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.
我正在使用I/O非阻塞python服务器旋风。我有一个GET请求类,它可能需要大量的时间来完成(请考虑在5-10秒的范围内)。问题是Tornado阻塞了这些请求,以便在慢速请求完成之前阻塞后续快速请求。
I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).
我查看了:https://github.com/facebook/tornado/wiki/threading -并发,并得出了我想要一些#3(其他进程)和#4(其他线程)的组合的结论。#4在它自己的问题上,我无法得到可靠的控制回到ioloop,当有另一个线程在做“重量级举重”的时候。(我认为这是由于GIL和heavy_lift任务具有较高的CPU负载以及不断将控制从主ioloop中转移的事实,但这只是猜测)。
So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET
requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.
因此,我一直在尝试如何通过在一个单独的进程中在这些缓慢的GET请求中执行“繁重的任务”来解决这个问题,然后在进程完成以完成请求时将回调放到Tornado ioloop中。这将释放ioloop来处理其他请求。
I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.
我已经创建了一个简单的示例,演示了一种可能的解决方案,但是我很想从社区获得关于它的反馈。
My question is two-fold: How can this current approach be simplified? What pitfalls potentially exist with it?
我的问题有两个方面:如何简化当前的方法?它可能存在什么缺陷?
The Approach
-
Utilize Tornado's builtin
asynchronous
decorator which allows a request to stay open and for the ioloop to continue.使用Tornado的内建异步decorator,它允许请求保持打开状态,并允许ioloop继续。
-
Spawn a separate process for "heavy lifting" tasks using python's
multiprocessing
module. I first attempted to use thethreading
module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears thatmutliprocessing
would also take advantage of multicores.使用python的多处理模块为“繁重的”任务生成一个单独的进程。我最初尝试使用线程模块,但无法将任何可靠的控制权交给ioloop。mutliprocessing似乎也会利用多核的优势。
-
Start a 'watcher' thread in the main ioloop process using the
threading
module who's job it is to watch amultiprocessing.Queue
for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.在主ioloop进程中使用线程模块启动一个“监视者”线程,该模块的任务是监视多进程。当“繁重的任务”完成时,排队等待结果。这是必需的,因为我需要一种方法来知道heavy_lift任务已经完成,同时仍然能够通知ioloop这个请求已经完成。
-
Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with
time.sleep(0)
calls so that other requests continue to get readily processed.请确保“监视者”线程经常随时间将控制权交给主ioloop循环。sleep(0)调用,以便其他请求继续得到方便的处理。
-
When there is a result in the queue then add a callback from the "watcher" thread using
tornado.ioloop.IOLoop.instance().add_callback()
which is documented to be the only safe way to call ioloop instances from other threads.当队列中有结果时,使用tornado.ioloop.IOLoop.instance().add_callback()从“监视者”线程中添加回调,这被记录为从其他线程调用ioloop实例的惟一安全方法。
-
Be sure to then call
finish()
in the callback to complete the request and hand over a reply.确保然后在回调中调用finish()来完成请求并提交一个回复。
Below is some sample code showing this approach. multi_tornado.py
is the server implementing the above outline and call_multi.py
is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET
requests followed by 20 fast GET
requests. The results are shown for both running with and without the threading turned on.
下面是展示这种方法的一些示例代码。multi_tornado。py是实现上述大纲和call_multi的服务器。py是一个示例脚本,它以两种不同的方式调用服务器来测试服务器。两个测试都用3个缓慢的GET请求调用服务器,然后是20个快速的GET请求。结果显示了在启用线程和不启用线程的情况下运行的结果。
In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.
在不使用“无线程”运行它的情况下,3个缓慢请求块(每一个都需要超过一秒的时间来完成)。20个快速请求中的一些在ioloop中的一些缓慢请求之间挤进来(不完全确定这是如何发生的—但是可能是我正在同一台机器上运行服务器和客户端测试脚本的工件)。这里的要点是,所有的快速请求都被不同程度地占用。
In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.
在使用线程运行的情况下,20个快速的请求会立即全部完成,而三个缓慢的请求会在同一时间完成,因为它们都是并行运行的。这是期望的行为。三个慢请求并行完成需要2.5秒,而在非线程情况下,三个慢请求总共需要3.5秒。因此整体上有大约35%的速度提高(我认为是因为多核共享)。但更重要的是,快速请求立即在慢速请求的leu中处理。
I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:
我在多线程编程方面没有太多经验——因此,尽管在这里看起来很有用,但我很想知道:
Is there a simpler way to accomplish this? What monster's may lurk within this approach?
有更简单的方法来实现这个目标吗?在这种方法中,什么怪物可能潜伏着?
(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)
(注意:未来的折衷方案可能是使用反向代理(比如nginx执行负载平衡)来运行更多的龙卷风实例。无论如何,我将使用负载均衡器运行多个实例——但我担心的是,在这个问题上只会抛出硬件,因为从阻塞的角度来看,硬件似乎与问题直接耦合在一起。
Sample Code
multi_tornado.py
(sample server):
multi_tornado。py(服务器)举例:
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(client tester):
call_multi。py(客户端测试):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
Test Results
By running python call_multi.py slow
(the blocking behavior):
通过运行python call_multi。py慢(阻塞行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
By running python call_multi.py slow_threaded
(the desired behavior):
通过运行python call_multi。py慢线程(期望的行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
3 个解决方案
#1
29
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
如果你愿意使用concurrent.futures。ProcessPoolExecutor而不是多处理,这实际上非常简单。Tornado应用已经支持concurrent.futures。未来,所以他们会很好地合作。并发。期货包含在Python 3.2+中,并已被反向移植到Python 2.x中。
Here's an example:
这里有一个例子:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
Output:
输出:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
ProcessPoolExecutor比多处理具有更有限的API。如果不需要多处理的更高级的特性,可以使用Pool。池是值得使用的,因为集成要简单得多。
#2
16
multiprocessing.Pool
can be integrated into the tornado
I/O loop, but it's a bit messy. A much cleaner integration can be done using concurrent.futures
(see my other answer for details), but if you're stuck on Python 2.x and can't install the concurrent.futures
backport, here is how you can do it strictly using multiprocessing
:
多处理。池可以集成到tornado I/O循环中,但是有点混乱。可以使用concurrent实现更清晰的集成。期货(详见我的另一个答案),但是如果你被困在Python 2中。不能安装并行程序。期货backport,这里是你如何严格使用多处理:
The multiprocessing.Pool.apply_async
and multiprocessing.Pool.map_async
methods both have an optional callback
parameter, which means that both can potentially be plugged into a tornado.gen.Task
. So in most cases, running code asynchronously in a sub-process is as simple as this:
multiprocessing.Pool。apply_async multiprocessing.Pool。map_async方法都有一个可选的回调参数,这意味着这两个方法都可能被插入到tornado.gen.Task中。因此,在大多数情况下,在子进程中异步运行代码是如此简单:
import multiprocessing
import contextlib
from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial
def worker():
print "async work here"
@gen.coroutine
def async_run(func, *args, **kwargs):
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
func = partial(async_run, worker)
IOLoop().run_sync(func)
As I mentioned, this works well in most cases. But if worker()
throws an exception, callback
is never called, which means the gen.Task
never finishes, and you hang forever. Now, if you know that your work will never throw an exception (because you wrapped the whole thing in a try
/except
, for example), you can happily use this approach. However, if you want to let exceptions escape from your worker, the only solution I found was to subclass some multiprocessing components, and make them call callback
even if the worker sub-process raised an exception:
正如我提到的,这在大多数情况下都很有效。但是,如果worker()抛出一个异常,回调就不会被调用,这意味着gen.Task永远不会完成,而您将永远挂起。现在,如果您知道您的工作永远不会抛出异常(因为您在try/except中包装了整个内容),您可以很高兴地使用这种方法。但是,如果您想让异常从worker中逃逸,我找到的唯一解决方案是对一些多处理组件进行子类化,并让它们调用callback,即使worker子进程引发了异常:
from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
...
if __name__ == "__main__":
pool = TornadoPool(multiprocessing.cpu_count())
...
With these changes, the exception object will be returned by the gen.Task
, rather than the gen.Task
hanging indefinitely. I also updated my async_run
method to re-raise the exception when its returned, and made some other changes to provide better tracebacks for exceptions thrown in the worker sub-processes. Here's the full code:
通过这些更改,异常对象将由gen.Task返回,而不是无限期挂起的gen.Task。我还更新了async_run方法,以便在异常返回时重新引发异常,并做了其他一些更改,以便为在worker子进程中抛出的异常提供更好的回溯。完整的代码:
import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps
import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen
class WrapException(Exception):
def __init__(self):
exc_type, exc_value, exc_tb = sys.exc_info()
self.exception = exc_value
self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
def __str__(self):
return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
if isinstance(result, Exception):
raise result
raise Return(result)
def handle_exceptions(func):
""" Raise a WrapException so we get a more meaningful traceback"""
@wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
raise WrapException()
return inner
# Test worker functions
@handle_exceptions
def test2(x):
raise Exception("eeee")
@handle_exceptions
def test(x):
print x
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield async_run(test, "inside get")
self.write("%s\n" % result)
result = yield async_run(test2, "hi2")
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool(4)
app.listen(8888)
IOLoop.instance().start()
Here's how it behaves for the client:
以下是它对客户的行为:
dan@dan:~$ curl localhost:8888/test
done
Caught an exception:
Original traceback:
Traceback (most recent call last):
File "./mutli.py", line 123, in inner
return func(*args, **kwargs)
File "./mutli.py", line 131, in test2
raise Exception("eeee")
Exception: eeee
And if I send two simultaneous curl requests, we can see they're handled asynchronously on the server-side:
如果我同时发送两个curl请求,我们可以看到它们在服务器端是异步处理的:
dan@dan:~$ ./mutli.py
inside get
inside get
caught exception inside get
caught exception inside get
Edit:
编辑:
Note that this code becomes simpler with Python 3, because it introduces an error_callback
keyword argument to all asynchronous multiprocessing.Pool
methods. This makes it much easier to integrate with Tornado:
注意,这段代码在Python 3中变得更简单,因为它为所有异步多处理引入了error_callback关键字参数。池的方法。这使得与Tornado集成更加容易:
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
super().apply_async(func, args, kwds, callback=callback,
error_callback=callback)
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
All we need to do in our overridden apply_async
is call the parent with the error_callback
keyword argument, in addition to the callback
kwarg. No need to override ApplyResult
.
在重写的apply_async中,除了回调kwarg外,我们只需要使用error_callback关键字参数调用父进程。不需要重写ApplyResult。
We can get even fancier by using a MetaClass in our TornadoPool
, to allow its *_async
methods to be called directly as if they were coroutines:
我们可以通过在我们的TornadoPool中使用一个元类来获得更高级的功能,让它的*_async方法被直接调用,就像它们是coroutines一样:
import time
from functools import wraps
from multiprocessing.pool import Pool
import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future
def _argument_adapter(callback):
def wrapper(*args, **kwargs):
if kwargs or len(args) > 1:
callback(Arguments(args, kwargs))
elif args:
callback(args[0])
else:
callback(None)
return wrapper
def PoolTask(func, *args, **kwargs):
""" Task function for use with multiprocessing.Pool methods.
This is very similar to tornado.gen.Task, except it sets the
error_callback kwarg in addition to the callback kwarg. This
way exceptions raised in pool worker methods get raised in the
parent when the Task is yielded from.
"""
future = Future()
def handle_exception(typ, value, tb):
if future.done():
return False
future.set_exc_info((typ, value, tb))
return True
def set_result(result):
if future.done():
return
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)
with stack_context.ExceptionStackContext(handle_exception):
cb = _argument_adapter(set_result)
func(*args, callback=cb, error_callback=cb)
return future
def coro_runner(func):
""" Wraps the given func in a PoolTask and returns it. """
@wraps(func)
def wrapper(*args, **kwargs):
return PoolTask(func, *args, **kwargs)
return wrapper
class MetaPool(type):
""" Wrap all *_async methods in Pool with coro_runner. """
def __new__(cls, clsname, bases, dct):
pdct = bases[0].__dict__
for attr in pdct:
if attr.endswith("async") and not attr.startswith('_'):
setattr(bases[0], attr, coro_runner(pdct[attr]))
return super().__new__(cls, clsname, bases, dct)
class TornadoPool(Pool, metaclass=MetaPool):
pass
# Test worker functions
def test2(x):
print("hi2")
raise Exception("eeee")
def test(x):
print(x)
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield pool.apply_async(test, ("inside get",))
self.write("%s\n" % result)
result = yield pool.apply_async(test2, ("hi2",))
self.write("%s\n" % result)
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
raise
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool()
app.listen(8888)
IOLoop.instance().start()
#3
0
If your get requests are taking that long then tornado is the wrong framework.
如果你的get请求持续了那么长时间,那么龙卷风是一个错误的框架。
I suggest you use nginx to route the fast gets to tornado and the slower ones to a different server.
我建议您使用nginx将快速到达tornado和较慢的路由到另一个服务器。
PeterBe has an interesting article where he runs multiple Tornado servers and sets one of them to be 'the slow one' for handling the long running requests see: worrying-about-io-blocking I would try this method.
PeterBe有一篇有趣的文章,他在其中运行多个Tornado服务器,并将其中一个设置为用于处理长时间运行的请求的“慢”服务器。
#1
29
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
如果你愿意使用concurrent.futures。ProcessPoolExecutor而不是多处理,这实际上非常简单。Tornado应用已经支持concurrent.futures。未来,所以他们会很好地合作。并发。期货包含在Python 3.2+中,并已被反向移植到Python 2.x中。
Here's an example:
这里有一个例子:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
Output:
输出:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
ProcessPoolExecutor比多处理具有更有限的API。如果不需要多处理的更高级的特性,可以使用Pool。池是值得使用的,因为集成要简单得多。
#2
16
multiprocessing.Pool
can be integrated into the tornado
I/O loop, but it's a bit messy. A much cleaner integration can be done using concurrent.futures
(see my other answer for details), but if you're stuck on Python 2.x and can't install the concurrent.futures
backport, here is how you can do it strictly using multiprocessing
:
多处理。池可以集成到tornado I/O循环中,但是有点混乱。可以使用concurrent实现更清晰的集成。期货(详见我的另一个答案),但是如果你被困在Python 2中。不能安装并行程序。期货backport,这里是你如何严格使用多处理:
The multiprocessing.Pool.apply_async
and multiprocessing.Pool.map_async
methods both have an optional callback
parameter, which means that both can potentially be plugged into a tornado.gen.Task
. So in most cases, running code asynchronously in a sub-process is as simple as this:
multiprocessing.Pool。apply_async multiprocessing.Pool。map_async方法都有一个可选的回调参数,这意味着这两个方法都可能被插入到tornado.gen.Task中。因此,在大多数情况下,在子进程中异步运行代码是如此简单:
import multiprocessing
import contextlib
from tornado import gen
from tornado.gen import Return
from tornado.ioloop import IOLoop
from functools import partial
def worker():
print "async work here"
@gen.coroutine
def async_run(func, *args, **kwargs):
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
func = partial(async_run, worker)
IOLoop().run_sync(func)
As I mentioned, this works well in most cases. But if worker()
throws an exception, callback
is never called, which means the gen.Task
never finishes, and you hang forever. Now, if you know that your work will never throw an exception (because you wrapped the whole thing in a try
/except
, for example), you can happily use this approach. However, if you want to let exceptions escape from your worker, the only solution I found was to subclass some multiprocessing components, and make them call callback
even if the worker sub-process raised an exception:
正如我提到的,这在大多数情况下都很有效。但是,如果worker()抛出一个异常,回调就不会被调用,这意味着gen.Task永远不会完成,而您将永远挂起。现在,如果您知道您的工作永远不会抛出异常(因为您在try/except中包装了整个内容),您可以很高兴地使用这种方法。但是,如果您想让异常从worker中逃逸,我找到的唯一解决方案是对一些多处理组件进行子类化,并让它们调用callback,即使worker子进程引发了异常:
from multiprocessing.pool import ApplyResult, Pool, RUN
import multiprocessing
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
...
if __name__ == "__main__":
pool = TornadoPool(multiprocessing.cpu_count())
...
With these changes, the exception object will be returned by the gen.Task
, rather than the gen.Task
hanging indefinitely. I also updated my async_run
method to re-raise the exception when its returned, and made some other changes to provide better tracebacks for exceptions thrown in the worker sub-processes. Here's the full code:
通过这些更改,异常对象将由gen.Task返回,而不是无限期挂起的gen.Task。我还更新了async_run方法,以便在异常返回时重新引发异常,并做了其他一些更改,以便为在worker子进程中抛出的异常提供更好的回溯。完整的代码:
import multiprocessing
from multiprocessing.pool import Pool, ApplyResult, RUN
from functools import wraps
import tornado.web
from tornado.ioloop import IOLoop
from tornado.gen import Return
from tornado import gen
class WrapException(Exception):
def __init__(self):
exc_type, exc_value, exc_tb = sys.exc_info()
self.exception = exc_value
self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
def __str__(self):
return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted)
class TornadoApplyResult(ApplyResult):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
assert self._state == RUN
result = TornadoApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
if isinstance(result, Exception):
raise result
raise Return(result)
def handle_exceptions(func):
""" Raise a WrapException so we get a more meaningful traceback"""
@wraps(func)
def inner(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
raise WrapException()
return inner
# Test worker functions
@handle_exceptions
def test2(x):
raise Exception("eeee")
@handle_exceptions
def test(x):
print x
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield async_run(test, "inside get")
self.write("%s\n" % result)
result = yield async_run(test2, "hi2")
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool(4)
app.listen(8888)
IOLoop.instance().start()
Here's how it behaves for the client:
以下是它对客户的行为:
dan@dan:~$ curl localhost:8888/test
done
Caught an exception:
Original traceback:
Traceback (most recent call last):
File "./mutli.py", line 123, in inner
return func(*args, **kwargs)
File "./mutli.py", line 131, in test2
raise Exception("eeee")
Exception: eeee
And if I send two simultaneous curl requests, we can see they're handled asynchronously on the server-side:
如果我同时发送两个curl请求,我们可以看到它们在服务器端是异步处理的:
dan@dan:~$ ./mutli.py
inside get
inside get
caught exception inside get
caught exception inside get
Edit:
编辑:
Note that this code becomes simpler with Python 3, because it introduces an error_callback
keyword argument to all asynchronous multiprocessing.Pool
methods. This makes it much easier to integrate with Tornado:
注意,这段代码在Python 3中变得更简单,因为它为所有异步多处理引入了error_callback关键字参数。池的方法。这使得与Tornado集成更加容易:
class TornadoPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
''' Asynchronous equivalent of `apply()` builtin
This version will call `callback` even if an exception is
raised by `func`.
'''
super().apply_async(func, args, kwds, callback=callback,
error_callback=callback)
@gen.coroutine
def async_run(func, *args, **kwargs):
""" Runs the given function in a subprocess.
This wraps the given function in a gen.Task and runs it
in a multiprocessing.Pool. It is meant to be used as a
Tornado co-routine. Note that if func returns an Exception
(or an Exception sub-class), this function will raise the
Exception, rather than return it.
"""
result = yield gen.Task(pool.apply_async, func, args, kwargs)
raise Return(result)
All we need to do in our overridden apply_async
is call the parent with the error_callback
keyword argument, in addition to the callback
kwarg. No need to override ApplyResult
.
在重写的apply_async中,除了回调kwarg外,我们只需要使用error_callback关键字参数调用父进程。不需要重写ApplyResult。
We can get even fancier by using a MetaClass in our TornadoPool
, to allow its *_async
methods to be called directly as if they were coroutines:
我们可以通过在我们的TornadoPool中使用一个元类来获得更高级的功能,让它的*_async方法被直接调用,就像它们是coroutines一样:
import time
from functools import wraps
from multiprocessing.pool import Pool
import tornado.web
from tornado import gen
from tornado.gen import Return
from tornado import stack_context
from tornado.ioloop import IOLoop
from tornado.concurrent import Future
def _argument_adapter(callback):
def wrapper(*args, **kwargs):
if kwargs or len(args) > 1:
callback(Arguments(args, kwargs))
elif args:
callback(args[0])
else:
callback(None)
return wrapper
def PoolTask(func, *args, **kwargs):
""" Task function for use with multiprocessing.Pool methods.
This is very similar to tornado.gen.Task, except it sets the
error_callback kwarg in addition to the callback kwarg. This
way exceptions raised in pool worker methods get raised in the
parent when the Task is yielded from.
"""
future = Future()
def handle_exception(typ, value, tb):
if future.done():
return False
future.set_exc_info((typ, value, tb))
return True
def set_result(result):
if future.done():
return
if isinstance(result, Exception):
future.set_exception(result)
else:
future.set_result(result)
with stack_context.ExceptionStackContext(handle_exception):
cb = _argument_adapter(set_result)
func(*args, callback=cb, error_callback=cb)
return future
def coro_runner(func):
""" Wraps the given func in a PoolTask and returns it. """
@wraps(func)
def wrapper(*args, **kwargs):
return PoolTask(func, *args, **kwargs)
return wrapper
class MetaPool(type):
""" Wrap all *_async methods in Pool with coro_runner. """
def __new__(cls, clsname, bases, dct):
pdct = bases[0].__dict__
for attr in pdct:
if attr.endswith("async") and not attr.startswith('_'):
setattr(bases[0], attr, coro_runner(pdct[attr]))
return super().__new__(cls, clsname, bases, dct)
class TornadoPool(Pool, metaclass=MetaPool):
pass
# Test worker functions
def test2(x):
print("hi2")
raise Exception("eeee")
def test(x):
print(x)
time.sleep(2)
return "done"
class TestHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
try:
result = yield pool.apply_async(test, ("inside get",))
self.write("%s\n" % result)
result = yield pool.apply_async(test2, ("hi2",))
self.write("%s\n" % result)
except Exception as e:
print("caught exception in get")
self.write("Caught an exception: %s" % e)
raise
finally:
self.finish()
app = tornado.web.Application([
(r"/test", TestHandler),
])
if __name__ == "__main__":
pool = TornadoPool()
app.listen(8888)
IOLoop.instance().start()
#3
0
If your get requests are taking that long then tornado is the wrong framework.
如果你的get请求持续了那么长时间,那么龙卷风是一个错误的框架。
I suggest you use nginx to route the fast gets to tornado and the slower ones to a different server.
我建议您使用nginx将快速到达tornado和较慢的路由到另一个服务器。
PeterBe has an interesting article where he runs multiple Tornado servers and sets one of them to be 'the slow one' for handling the long running requests see: worrying-about-io-blocking I would try this method.
PeterBe有一篇有趣的文章,他在其中运行多个Tornado服务器,并将其中一个设置为用于处理长时间运行的请求的“慢”服务器。