I am building a multi threading application.
我正在构建一个多线程应用程序。
I have setup a threadPool. [ A Queue of size N and N Workers that get data from the queue]
我设置了一个threadPool。(从队列中获取数据的N和N大小的队列)
When all tasks are done I use
当所有的任务都完成时,我使用。
tasks.join()
where tasks is the queue .
任务是队列。
The application seems to run smoothly until suddently at some point (after 20 minutes in example) it terminates with the error
应用程序似乎在某个点(例如20分钟后)突然运行,并以错误结束。
thread.error: can't start new thread
Any ideas?
什么好主意吗?
Edit: The threads are daemon Threads and the code is like:
编辑:线程是守护线程,代码如下:
while True:
t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(100)
if keyword_statuses.count() == 0:
DBSession.commit()
break
for kw_status in keyword_statuses:
kw_status.status = 1
DBSession.commit()
t0 = time.time()
w = SWorker(threads_no=32, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')
w.work()
print 'finished'
When the daemon threads are killed? When the application finishes or when the work() finishes?
当守护进程线程被杀死?当应用程序完成或工作()结束时?
Look at the thread pool and the worker (it's from a recipe )
看看线程池和工作人员(它来自一个食谱)
from Queue import Queue
from threading import Thread, Event, current_thread
import time
event = Event()
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
'''Start processing tasks from the queue'''
while True:
event.wait()
#time.sleep(0.1)
try:
func, args, callback = self.tasks.get()
except Exception, e:
print str(e)
return
else:
if callback is None:
func(args)
else:
callback(func(args))
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): Worker(self.tasks)
def add_task(self, func, args=None, callback=None):
''''Add a task to the queue'''
self.tasks.put((func, args, callback))
def wait_completion(self):
'''Wait for completion of all the tasks in the queue'''
self.tasks.join()
def broadcast_block_event(self):
'''blocks running threads'''
event.clear()
def broadcast_unblock_event(self):
'''unblocks running threads'''
event.set()
def get_event(self):
'''returns the event object'''
return event
ALSo maybe the problem it's because I create SWorker objects in a loop? What happens with the old SWorker (garbage collection ?) ?
也可能是因为我在循环中创建了SWorker对象?旧的SWorker(垃圾收集?)会发生什么?
1 个解决方案
#1
4
There is still not enough code for localize the problem, but I'm sure that this is because you don't utilize the threads and start too much of them. Did you see canonical example from Queue python documentation http://docs.python.org/library/queue.html (bottom of the page)?
仍然没有足够的代码来本地化这个问题,但是我确信这是因为您没有使用线程,并且启动了太多的线程。您是否看到了来自队列python文档http://docs.python.org/library/queue.html(页面底部)的典型示例?
I can reproduce your problem with the following code:
我可以用以下代码重现您的问题:
import threading
import Queue
q = Queue.Queue()
def worker():
item = q.get(block=True) # sleeps forever for now
do_work(item)
q.task_done()
# create infinite number of workers threads and fails
# after some time with "error: can't start new thread"
while True:
t = threading.Thread(target=worker)
t.start()
q.join() # newer reached this
Instead you must create the poll of threads with known number of threads and put your data to queue like:
相反,您必须创建具有已知线程数量的线程的轮询,并将您的数据放入队列:
q = Queue()
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
UPD: In case you need to stop some thread, you can add a flag to it or send a special mark means "stop" for break while
loop:
UPD:如果您需要停止某些线程,您可以向它添加一个标志,或者发送一个特殊的标志表示“停止”,以便在while循环中:
class Worker(Thread):
break_msg = object() # just uniq mark sign
def __init__(self):
self.continue = True
def run():
while self.continue: # can stop and destroy thread, (var 1)
msg = queue.get(block=True)
if msg == self.break_msg:
return # will stop and destroy thread (var 2)
do_work()
queue.task_done()
workers = [Worker() for _ in xrange(num_workers)]
for w in workers:
w.start()
for task in tasks:
queue.put(task)
for _ in xrange(num_workers):
queue.put(Worker.break_msg) # stop thread after all tasks done. Need as many messages as many threads you have
OR
queue.join() # wait until all tasks done
for w in workers:
w.continue = False
w.put(None)
#1
4
There is still not enough code for localize the problem, but I'm sure that this is because you don't utilize the threads and start too much of them. Did you see canonical example from Queue python documentation http://docs.python.org/library/queue.html (bottom of the page)?
仍然没有足够的代码来本地化这个问题,但是我确信这是因为您没有使用线程,并且启动了太多的线程。您是否看到了来自队列python文档http://docs.python.org/library/queue.html(页面底部)的典型示例?
I can reproduce your problem with the following code:
我可以用以下代码重现您的问题:
import threading
import Queue
q = Queue.Queue()
def worker():
item = q.get(block=True) # sleeps forever for now
do_work(item)
q.task_done()
# create infinite number of workers threads and fails
# after some time with "error: can't start new thread"
while True:
t = threading.Thread(target=worker)
t.start()
q.join() # newer reached this
Instead you must create the poll of threads with known number of threads and put your data to queue like:
相反,您必须创建具有已知线程数量的线程的轮询,并将您的数据放入队列:
q = Queue()
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
UPD: In case you need to stop some thread, you can add a flag to it or send a special mark means "stop" for break while
loop:
UPD:如果您需要停止某些线程,您可以向它添加一个标志,或者发送一个特殊的标志表示“停止”,以便在while循环中:
class Worker(Thread):
break_msg = object() # just uniq mark sign
def __init__(self):
self.continue = True
def run():
while self.continue: # can stop and destroy thread, (var 1)
msg = queue.get(block=True)
if msg == self.break_msg:
return # will stop and destroy thread (var 2)
do_work()
queue.task_done()
workers = [Worker() for _ in xrange(num_workers)]
for w in workers:
w.start()
for task in tasks:
queue.put(task)
for _ in xrange(num_workers):
queue.put(Worker.break_msg) # stop thread after all tasks done. Need as many messages as many threads you have
OR
queue.join() # wait until all tasks done
for w in workers:
w.continue = False
w.put(None)