限制一次运行的最大线程数的正确方法?

时间:2022-04-06 20:43:27

I'd like to create a program that runs multiple light threads, but limits itself to a constant, predefined number of concurrent running tasks, like this (but with no risk of race condition):

我想创建一个运行多个轻量级线程的程序,但是将其自身限制为一个恒定的,预定义数量的并发运行任务,就像这样(但没有竞争条件的风险):

import threading

def f(arg):
    global running
    running += 1
    print("Spawned a thread. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    running -= 1
    print("Done")

running = 0
while True:
    if running < 8:
        arg = get_task()
        threading.Thread(target=f, args=[arg]).start()

What's the safest/fastest way to implement this?

实现这个的最安全/最快的方法是什么?

5 个解决方案

#1


26  

It sounds like you want to implement the producer/consumer pattern with eight workers. Python has a Queue class for this purpose, and it is thread-safe.

听起来你想要用八个工人来实现生产者/消费者模式。 Python有一个Queue类用于此目的,它是线程安全的。

Each worker should call get() on the queue to retrieve a task. This call will block if no tasks are available, causing the worker to go idle until one becomes available. Then the worker should execute the task and finally call task_done() on the queue.

每个worker应该调用队列上的get()来检索任务。如果没有任何可用任务,此调用将阻止,从而导致工作程序空闲,直到有一个可用。然后,worker应该执行该任务,最后在队列上调用task_done()。

You would put tasks in the queue by calling put() on the queue.

您可以通过在队列上调用put()将任务放入队列中。

From the main thread, you can call join() on the queue to wait until all pending tasks have been completed.

在主线程中,您可以在队列上调用join()以等待所有挂起的任务完成。

This approach has the benefit that you are not creating and destroying threads, which is expensive. The worker threads will run continuously, but will be asleep when no tasks are in the queue, using zero CPU time.

这种方法的好处是您不会创建和销毁线程,这是昂贵的。工作线程将连续运行,但在没有任务进入队列时将使用零CPU时间睡眠。

(The linked documentation page has an example of this very pattern.)

(链接的文档页面有一个这种模式的例子。)

#2


14  

semaphore is a variable or abstract data type that is used to control access to a common resource by multiple processes in a concurrent system such as a multiprogramming operating system; this can help you here.

信号量是一种变量或抽象数据类型,用于控制并发系统(如多道程序设计操作系统)中多个进程对公共资源的访问;这可以帮到你。

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class MyThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            self.Executemycode()
        finally:
            threadLimiter.release()

    def Executemycode(self):
        print(" Hello World!") 
        # <your code here>

This way you can easily limit the number of threads that will be executed concurrently during the program execution. Variable, 'maximumNumberOfThreads' can be used to define an upper limit on the maximum value of threads.

这样,您可以轻松地限制在程序执行期间将同时执行的线程数。变量'maximumNumberOfThreads'可用于定义线程最大值的上限。

credits

#3


3  

I've seen that most commonly written like:

我见过最常写的像:

threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
    thread.start()
...
for thread in threads:
    thread.join()

If you want to maintain a fixed-size pool of running threads that process short-lived tasks than ask for new work, consider a solution built around Queues, like "How to wait until only the first thread is finished in Python".

如果要维护一个固定大小的正在运行的线程池来处理短期任务而不是要求新工作,请考虑围绕队列构建的解决方案,例如“如何等到只有第一个线程在Python中完成”。

#4


2  

It would be much easier to implement this as a thread pool or executor, using either multiprocessing.dummy.Pool, or concurrent.futures.ThreadPoolExecutor (or, if using Python 2.x, the backport futures). For example:

使用multiprocessing.dummy.Pool或concurrent.futures.ThreadPoolExecutor(或者,如果使用Python 2.x,backport期货)将它作为线程池或执行程序实现会容易得多。例如:

import concurrent

def f(arg):
    print("Started a task. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    print("Done")

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    while True:
        arg = get_task()
        executor.submit(f, arg)

Of course if you can change the pull-model get_task to a push-model get_tasks that, e.g., yields tasks one at a time, this is even simpler:

当然,如果您可以将pull-model get_task更改为推送模型get_tasks,例如,一次生成一个任务,这甚至更简单:

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    for arg in get_tasks():
        executor.submit(f, arg)

When you run out of tasks (e.g., get_task raises an exception, or get_tasks runs dry), this will automatically tell the executor to stop after it drains the queue, wait for it to stop, and clean up everything.

当你的任务用完时(例如,get_task引发异常,或者get_tasks运行干涸),这将自动告诉执行程序在排空队列,等待它停止并清理所有内容后停止。

#5


-1  

For apply limitation on thread creating, follow this example (it really works):

对于线程创建的应用限制,请遵循此示例(它确实有效):

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread():
    try:
        for i in range(1, 555):  # trying to spawn 555 threads.
            thread = threading.Thread(target=some_process, args=(i,))
            thread.start()

            if threading.active_count() == 100:  # set maximum threads.
                thread.join()

            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread()

Or:

Another way to set a thread number checker mutex/lock such as below example:

设置线程号检查器互斥锁的另一种方法如下例所示:

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        # print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread2(number_of_desire_thread ):
    try:
        for i in range(1, 555):
            thread = threading.Thread(target=some_process, args=(i,)).start()

            while number_of_desire_thread <= threading.active_count():
                '''mutex for avoiding to additional thread creation.'''
                pass

            print 'unlock'
            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread2(100)

#1


26  

It sounds like you want to implement the producer/consumer pattern with eight workers. Python has a Queue class for this purpose, and it is thread-safe.

听起来你想要用八个工人来实现生产者/消费者模式。 Python有一个Queue类用于此目的,它是线程安全的。

Each worker should call get() on the queue to retrieve a task. This call will block if no tasks are available, causing the worker to go idle until one becomes available. Then the worker should execute the task and finally call task_done() on the queue.

每个worker应该调用队列上的get()来检索任务。如果没有任何可用任务,此调用将阻止,从而导致工作程序空闲,直到有一个可用。然后,worker应该执行该任务,最后在队列上调用task_done()。

You would put tasks in the queue by calling put() on the queue.

您可以通过在队列上调用put()将任务放入队列中。

From the main thread, you can call join() on the queue to wait until all pending tasks have been completed.

在主线程中,您可以在队列上调用join()以等待所有挂起的任务完成。

This approach has the benefit that you are not creating and destroying threads, which is expensive. The worker threads will run continuously, but will be asleep when no tasks are in the queue, using zero CPU time.

这种方法的好处是您不会创建和销毁线程,这是昂贵的。工作线程将连续运行,但在没有任务进入队列时将使用零CPU时间睡眠。

(The linked documentation page has an example of this very pattern.)

(链接的文档页面有一个这种模式的例子。)

#2


14  

semaphore is a variable or abstract data type that is used to control access to a common resource by multiple processes in a concurrent system such as a multiprogramming operating system; this can help you here.

信号量是一种变量或抽象数据类型,用于控制并发系统(如多道程序设计操作系统)中多个进程对公共资源的访问;这可以帮到你。

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class MyThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            self.Executemycode()
        finally:
            threadLimiter.release()

    def Executemycode(self):
        print(" Hello World!") 
        # <your code here>

This way you can easily limit the number of threads that will be executed concurrently during the program execution. Variable, 'maximumNumberOfThreads' can be used to define an upper limit on the maximum value of threads.

这样,您可以轻松地限制在程序执行期间将同时执行的线程数。变量'maximumNumberOfThreads'可用于定义线程最大值的上限。

credits

#3


3  

I've seen that most commonly written like:

我见过最常写的像:

threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
    thread.start()
...
for thread in threads:
    thread.join()

If you want to maintain a fixed-size pool of running threads that process short-lived tasks than ask for new work, consider a solution built around Queues, like "How to wait until only the first thread is finished in Python".

如果要维护一个固定大小的正在运行的线程池来处理短期任务而不是要求新工作,请考虑围绕队列构建的解决方案,例如“如何等到只有第一个线程在Python中完成”。

#4


2  

It would be much easier to implement this as a thread pool or executor, using either multiprocessing.dummy.Pool, or concurrent.futures.ThreadPoolExecutor (or, if using Python 2.x, the backport futures). For example:

使用multiprocessing.dummy.Pool或concurrent.futures.ThreadPoolExecutor(或者,如果使用Python 2.x,backport期货)将它作为线程池或执行程序实现会容易得多。例如:

import concurrent

def f(arg):
    print("Started a task. running=%s, arg=%s" % (running, arg))
    for i in range(100000):
        pass
    print("Done")

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    while True:
        arg = get_task()
        executor.submit(f, arg)

Of course if you can change the pull-model get_task to a push-model get_tasks that, e.g., yields tasks one at a time, this is even simpler:

当然,如果您可以将pull-model get_task更改为推送模型get_tasks,例如,一次生成一个任务,这甚至更简单:

with concurrent.futures.ThreadPoolExecutor(8) as executor:
    for arg in get_tasks():
        executor.submit(f, arg)

When you run out of tasks (e.g., get_task raises an exception, or get_tasks runs dry), this will automatically tell the executor to stop after it drains the queue, wait for it to stop, and clean up everything.

当你的任务用完时(例如,get_task引发异常,或者get_tasks运行干涸),这将自动告诉执行程序在排空队列,等待它停止并清理所有内容后停止。

#5


-1  

For apply limitation on thread creating, follow this example (it really works):

对于线程创建的应用限制,请遵循此示例(它确实有效):

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread():
    try:
        for i in range(1, 555):  # trying to spawn 555 threads.
            thread = threading.Thread(target=some_process, args=(i,))
            thread.start()

            if threading.active_count() == 100:  # set maximum threads.
                thread.join()

            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread()

Or:

Another way to set a thread number checker mutex/lock such as below example:

设置线程号检查器互斥锁的另一种方法如下例所示:

import threading
import time


def some_process(thread_num):
    count = 0
    while count < 5:
        time.sleep(0.5)
        count += 1
        # print "%s: %s" % (thread_num, time.ctime(time.time()))
        print 'number of alive threads:{}'.format(threading.active_count())


def create_thread2(number_of_desire_thread ):
    try:
        for i in range(1, 555):
            thread = threading.Thread(target=some_process, args=(i,)).start()

            while number_of_desire_thread <= threading.active_count():
                '''mutex for avoiding to additional thread creation.'''
                pass

            print 'unlock'
            print threading.active_count()  # number of alive threads.

    except Exception as e:
        print "Error: unable to start thread {}".format(e)


if __name__ == '__main__':
    create_thread2(100)