How to get the value 'foo'
which is returned from the thread?
如何获取从线程返回的值“foo”?
from threading import Thread
def foo(bar):
print 'hello {}'.format(bar)
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret
The one obvious way to do it, shown above, returns None
.
最明显的方法是,上面显示的,返回None。
18 个解决方案
#1
173
FWIW, the multiprocessing
module has a nice interface for this using the Pool
class. And if you want to stick with threads rather than processes, you can just use the multiprocessing.pool.ThreadPool
class as a drop-in replacement.
FWIW,多处理模块使用池类有一个很好的接口。如果您想要使用线程而不是进程,您可以只使用多进程。ThreadPool类作为插入替换。
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
#2
125
One way I've seen is to pass a mutable object, such as a list or a dictionary, to the thread's constructor, along with a an index or other identifier of some sort. The thread can then store its results in its dedicated slot in that object. For example:
我看到的一种方法是将一个可变对象(如列表或字典)传递给线程的构造函数,以及一个索引或某种其他标识符。然后线程可以将其结果存储在该对象的专用插槽中。例如:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
If you really want join()
to return the return value of the called function, you can do this with a Thread
subclass like the following:
如果您确实希望join()返回被调用函数的返回值,那么可以使用如下所示的线程子类来完成此操作:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
That gets a little hairy because of some name mangling, and it accesses "private" data structures that are specific to Thread
implementation... but it works.
这有点麻烦,因为它的名称是“mangling”,它访问特定于线程实现的“私有”数据结构。但它的工作原理。
#3
43
Jake's answer is good, but if you don't want to use a threadpool (you don't know how many threads you'll need, but create them as needed) then a good way to transmit information between threads is the built-in Queue.Queue class, as it offers thread safety.
Jake的回答很好,但是如果您不想使用threadpool(您不知道您需要多少线程,但是根据需要创建它们),那么在线程之间传输信息的一个好方法就是内置队列。队列类,因为它提供线程安全性。
I created the following decorator to make it act in a similar fashion to the threadpool:
我创建了下面的decorator,以使它以类似于threadpool的方式运行:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Then you just use it as:
然后你可以把它当作:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
The decorated function creates a new thread each time it's called and returns a Thread object that contains the queue that will receive the result.
修饰函数每次调用时都会创建一个新线程,并返回一个包含将接收结果的队列的线程对象。
#4
14
Another solution that doesn't require changing your existing code:
另一种不需要更改现有代码的解决方案:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result
It can be also easily adjusted to a multi-threaded environment:
它还可以很容易地适应多线程环境:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result
#5
12
I stole kindall's answer and cleaned it up just a little bit.
我偷了kindall的答案,把它清理了一下。
The key part is adding *args and **kwargs to join() in order to handle the timeout
关键部分是添加*args和**kwargs来连接()以处理超时。
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
#6
11
Parris / kindall's answer join
/return
answer ported to Python 3:
Parris / kindall的答案加入/返回的答案移植到Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
Note, the Thread
class is implemented differently in Python 3.
注意,在python3中,线程类的实现方式不同。
#7
6
My solution to the problem is to wrap the function and thread in a class. Does not require using pools,queues, or c type variable passing. It is also non blocking. You check status instead. See example of how to use it at end of code.
我解决这个问题的方法是在一个类中包装函数和线程。不需要使用池、队列或c类型变量传递。它也是非阻塞的。你检查状态。参见如何在代码末尾使用它的示例。
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
#8
3
Using Queue :
使用队列:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
#9
2
You can use Pool as a pool of worker processes as below:
您可以将池作为工作流程的池,如下所示:
from multiprocessing import Pool
def f1(x, y):
return x*y
if __name__ == '__main__':
with Pool(processes=10) as pool:
result = pool.apply(f1, (2, 3))
print(result)
#10
1
join
always return None
, i think you should subclass Thread
to handle return codes and so.
join总是返回None,我认为您应该子类化线程来处理返回代码。
#11
1
You can define a mutable above the scope of the threaded function, and add the result to that. (I also modified the code to be python3 compatible)
您可以在线程函数的范围内定义一个可变的,并将结果添加到该函数中。(我还将代码修改为python3兼容)
returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
This returns {'world!': 'foo'}
这将返回{的世界!”:“foo”}
If you use the function input as the key to your results dict, every unique input is guaranteed to give an entry in the results
如果您使用函数输入作为结果的关键,那么每个唯一的输入都保证在结果中给出一个条目。
#12
1
Taking into consideration @iman comment on @JakeBiesinger answer I have recomposed it to have various number of threads:
考虑到@iman评论@JakeBiesinger的回答,我重新组合了它,它有不同数量的线程:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
Cheers,
欢呼,
Guy.
的家伙。
#13
0
Define your target to
1) take an argument q
2) replace any statements return foo
with q.put(foo); return
定义你的目标到1)使用参数q 2)替换任何用q.put(foo)返回的语句;返回
so a function
所以一个函数
def func(a):
ans = a * a
return ans
would become
将成为
def func(a, q):
ans = a * a
q.put(ans)
return
and then you would proceed as such
然后你就这样继续下去。
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
And you can use function decorators/wrappers to make it so you can use your existing functions as target
without modifying them, but follow this basic scheme.
您可以使用函数decorator /wrappers来实现它,这样您就可以在不修改它们的情况下使用现有的功能作为目标,但是要遵循这个基本的方案。
#14
0
I'm using this wrapper, which comfortably turns any function for running in a Thread
- taking care of its return value or exception. It doesn't add Queue
overhead.
我正在使用这个包装器,它可以轻松地将运行在线程中的任何函数转换为处理它的返回值或异常。它不添加队列开销。
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
Usage Examples
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
Notes on threading
module
Comfortable return value & exception handling of a threaded function is a frequent "Pythonic" need and should indeed already be offered by the threading
module - possibly directly in the standard Thread
class. ThreadPool
has way too much overhead for simple tasks - 3 managing threads, lots of bureaucracy. Unfortunately Thread
's layout was copied from Java originally - which you see e.g. from the still useless 1st (!) constructor parameter group
.
舒适的返回值和线程函数的异常处理是一个常见的“python”需求,而且应该已经由线程模块提供了——可能直接在标准线程类中。对于简单的任务,ThreadPool的开销太大了——3个管理线程,大量的官僚程序。不幸的是,线程的布局是从Java中复制过来的——您可以从仍然没用的1(!)构造函数参数组中看到。
#15
0
One usual solution is to wrap your function foo
with a decorator like
通常的解决方案是将函数foo与decorator类似。
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
Then the whole code may looks like that
然后整个代码看起来是这样的。
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
Note
One important issue is that the return values may be unorderred. (In fact, the return value
is not necessarily saved to the queue
, since you can choose arbitrary thread-safe data structure )
一个重要的问题是返回值可能是无序的。(实际上,返回值不一定保存到队列中,因为您可以选择任意的线程安全数据结构)
#16
0
Why don't just use global variable?
为什么不直接使用全局变量呢?
import threading
class myThread(threading.Thread):
def __init__(self, ind, lock):
threading.Thread.__init__(self)
self.ind = ind
self.lock = lock
def run(self):
global results
with self.lock:
results.append(self.ind)
results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
#17
0
Very simple way to get this done for such dummies like me:
很简单的方法让这样的傻瓜像我一样:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self):
# in this class and function we will put our test target function
test()
t = AnyThread()
# having our test target function
def test():
# do something in this function:
result = 3 + 2
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run()
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 5
>>> 5
main thing here - is queue
module. We create queue.Queue()
instance and include it in our function. We feed it with our result which later we get beyond the thread.
这里的主要内容是队列模块。我们创建queue.Queue()实例,并将其包含在我们的函数中。我们用我们的结果来喂养它,之后我们就越过了线程。
Please see one more example with arguments passed to our test function:
请再看一个带有参数传递给我们测试函数的例子:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self, a, b):
# in this class and function we will put our execution test function
test(a, b)
t = AnyThread()
# having our test target function
def test(a, b):
# do something in this function:
result = a + b
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run(3+i, 2+i)
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 7
>>> 9
#18
0
As mentioned multiprocessing pool is much slower than basic threading. Using queues as proposeded in some answers here is a very effective alternative. I have use it with dictionaries in order to be able run a lot of small threads and recuperate multiple answers by combining them with dictionaries:
正如前面提到的,多处理池比基本线程要慢得多。在一些答案中使用队列是一个非常有效的选择。我在字典中使用它,以便能够运行许多小的线程,并通过将它们与字典相结合来恢复多个答案:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)
#1
173
FWIW, the multiprocessing
module has a nice interface for this using the Pool
class. And if you want to stick with threads rather than processes, you can just use the multiprocessing.pool.ThreadPool
class as a drop-in replacement.
FWIW,多处理模块使用池类有一个很好的接口。如果您想要使用线程而不是进程,您可以只使用多进程。ThreadPool类作为插入替换。
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
#2
125
One way I've seen is to pass a mutable object, such as a list or a dictionary, to the thread's constructor, along with a an index or other identifier of some sort. The thread can then store its results in its dedicated slot in that object. For example:
我看到的一种方法是将一个可变对象(如列表或字典)传递给线程的构造函数,以及一个索引或某种其他标识符。然后线程可以将其结果存储在该对象的专用插槽中。例如:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
If you really want join()
to return the return value of the called function, you can do this with a Thread
subclass like the following:
如果您确实希望join()返回被调用函数的返回值,那么可以使用如下所示的线程子类来完成此操作:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
That gets a little hairy because of some name mangling, and it accesses "private" data structures that are specific to Thread
implementation... but it works.
这有点麻烦,因为它的名称是“mangling”,它访问特定于线程实现的“私有”数据结构。但它的工作原理。
#3
43
Jake's answer is good, but if you don't want to use a threadpool (you don't know how many threads you'll need, but create them as needed) then a good way to transmit information between threads is the built-in Queue.Queue class, as it offers thread safety.
Jake的回答很好,但是如果您不想使用threadpool(您不知道您需要多少线程,但是根据需要创建它们),那么在线程之间传输信息的一个好方法就是内置队列。队列类,因为它提供线程安全性。
I created the following decorator to make it act in a similar fashion to the threadpool:
我创建了下面的decorator,以使它以类似于threadpool的方式运行:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Then you just use it as:
然后你可以把它当作:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
The decorated function creates a new thread each time it's called and returns a Thread object that contains the queue that will receive the result.
修饰函数每次调用时都会创建一个新线程,并返回一个包含将接收结果的队列的线程对象。
#4
14
Another solution that doesn't require changing your existing code:
另一种不需要更改现有代码的解决方案:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result
It can be also easily adjusted to a multi-threaded environment:
它还可以很容易地适应多线程环境:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result
#5
12
I stole kindall's answer and cleaned it up just a little bit.
我偷了kindall的答案,把它清理了一下。
The key part is adding *args and **kwargs to join() in order to handle the timeout
关键部分是添加*args和**kwargs来连接()以处理超时。
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
#6
11
Parris / kindall's answer join
/return
answer ported to Python 3:
Parris / kindall的答案加入/返回的答案移植到Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
Note, the Thread
class is implemented differently in Python 3.
注意,在python3中,线程类的实现方式不同。
#7
6
My solution to the problem is to wrap the function and thread in a class. Does not require using pools,queues, or c type variable passing. It is also non blocking. You check status instead. See example of how to use it at end of code.
我解决这个问题的方法是在一个类中包装函数和线程。不需要使用池、队列或c类型变量传递。它也是非阻塞的。你检查状态。参见如何在代码末尾使用它的示例。
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
#8
3
Using Queue :
使用队列:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
#9
2
You can use Pool as a pool of worker processes as below:
您可以将池作为工作流程的池,如下所示:
from multiprocessing import Pool
def f1(x, y):
return x*y
if __name__ == '__main__':
with Pool(processes=10) as pool:
result = pool.apply(f1, (2, 3))
print(result)
#10
1
join
always return None
, i think you should subclass Thread
to handle return codes and so.
join总是返回None,我认为您应该子类化线程来处理返回代码。
#11
1
You can define a mutable above the scope of the threaded function, and add the result to that. (I also modified the code to be python3 compatible)
您可以在线程函数的范围内定义一个可变的,并将结果添加到该函数中。(我还将代码修改为python3兼容)
returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
This returns {'world!': 'foo'}
这将返回{的世界!”:“foo”}
If you use the function input as the key to your results dict, every unique input is guaranteed to give an entry in the results
如果您使用函数输入作为结果的关键,那么每个唯一的输入都保证在结果中给出一个条目。
#12
1
Taking into consideration @iman comment on @JakeBiesinger answer I have recomposed it to have various number of threads:
考虑到@iman评论@JakeBiesinger的回答,我重新组合了它,它有不同数量的线程:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
Cheers,
欢呼,
Guy.
的家伙。
#13
0
Define your target to
1) take an argument q
2) replace any statements return foo
with q.put(foo); return
定义你的目标到1)使用参数q 2)替换任何用q.put(foo)返回的语句;返回
so a function
所以一个函数
def func(a):
ans = a * a
return ans
would become
将成为
def func(a, q):
ans = a * a
q.put(ans)
return
and then you would proceed as such
然后你就这样继续下去。
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
And you can use function decorators/wrappers to make it so you can use your existing functions as target
without modifying them, but follow this basic scheme.
您可以使用函数decorator /wrappers来实现它,这样您就可以在不修改它们的情况下使用现有的功能作为目标,但是要遵循这个基本的方案。
#14
0
I'm using this wrapper, which comfortably turns any function for running in a Thread
- taking care of its return value or exception. It doesn't add Queue
overhead.
我正在使用这个包装器,它可以轻松地将运行在线程中的任何函数转换为处理它的返回值或异常。它不添加队列开销。
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
Usage Examples
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
Notes on threading
module
Comfortable return value & exception handling of a threaded function is a frequent "Pythonic" need and should indeed already be offered by the threading
module - possibly directly in the standard Thread
class. ThreadPool
has way too much overhead for simple tasks - 3 managing threads, lots of bureaucracy. Unfortunately Thread
's layout was copied from Java originally - which you see e.g. from the still useless 1st (!) constructor parameter group
.
舒适的返回值和线程函数的异常处理是一个常见的“python”需求,而且应该已经由线程模块提供了——可能直接在标准线程类中。对于简单的任务,ThreadPool的开销太大了——3个管理线程,大量的官僚程序。不幸的是,线程的布局是从Java中复制过来的——您可以从仍然没用的1(!)构造函数参数组中看到。
#15
0
One usual solution is to wrap your function foo
with a decorator like
通常的解决方案是将函数foo与decorator类似。
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
Then the whole code may looks like that
然后整个代码看起来是这样的。
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
Note
One important issue is that the return values may be unorderred. (In fact, the return value
is not necessarily saved to the queue
, since you can choose arbitrary thread-safe data structure )
一个重要的问题是返回值可能是无序的。(实际上,返回值不一定保存到队列中,因为您可以选择任意的线程安全数据结构)
#16
0
Why don't just use global variable?
为什么不直接使用全局变量呢?
import threading
class myThread(threading.Thread):
def __init__(self, ind, lock):
threading.Thread.__init__(self)
self.ind = ind
self.lock = lock
def run(self):
global results
with self.lock:
results.append(self.ind)
results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
#17
0
Very simple way to get this done for such dummies like me:
很简单的方法让这样的傻瓜像我一样:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self):
# in this class and function we will put our test target function
test()
t = AnyThread()
# having our test target function
def test():
# do something in this function:
result = 3 + 2
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run()
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 5
>>> 5
main thing here - is queue
module. We create queue.Queue()
instance and include it in our function. We feed it with our result which later we get beyond the thread.
这里的主要内容是队列模块。我们创建queue.Queue()实例,并将其包含在我们的函数中。我们用我们的结果来喂养它,之后我们就越过了线程。
Please see one more example with arguments passed to our test function:
请再看一个带有参数传递给我们测试函数的例子:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self, a, b):
# in this class and function we will put our execution test function
test(a, b)
t = AnyThread()
# having our test target function
def test(a, b):
# do something in this function:
result = a + b
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run(3+i, 2+i)
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 7
>>> 9
#18
0
As mentioned multiprocessing pool is much slower than basic threading. Using queues as proposeded in some answers here is a very effective alternative. I have use it with dictionaries in order to be able run a lot of small threads and recuperate multiple answers by combining them with dictionaries:
正如前面提到的,多处理池比基本线程要慢得多。在一些答案中使用队列是一个非常有效的选择。我在字典中使用它,以便能够运行许多小的线程,并通过将它们与字典相结合来恢复多个答案:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)