如何限制python中活动线程的数量?

时间:2022-06-24 23:14:36

Am new to python and making some headway with threading - am doing some music file conversion and want to be able to utilize the multiple cores on my machine (one active conversion thread per core).

我是python的新手,并且在线程方面取得了一些进展——我正在进行一些音乐文件转换,并希望能够利用我的机器上的多个内核(每个内核有一个活动的转换线程)。

class EncodeThread(threading.Thread):
    # this is hacked together a bit, but should give you an idea
    def run(self):
        decode = subprocess.Popen(["flac","--decode","--stdout",self.src],
                            stdout=subprocess.PIPE)
        encode = subprocess.Popen(["lame","--quiet","-",self.dest],
                                stdin=decode.stdout)
        encode.communicate()

# some other code puts these threads with various src/dest pairs in a list

for proc in threads: # `threads` is my list of `threading.Thread` objects
    proc.start()

Everything works, all the files get encoded, bravo! ... however, all the processes spawn immediately, yet I only want to run two at a time (one for each core). As soon as one is finished, I want it to move on to the next on the list until it is finished, then continue with the program.

一切正常,所有文件都被编码,太棒了!…但是,所有的进程都会立即生成,但是我只希望每次运行两个进程(每个内核一个进程)。一旦一个程序完成,我希望它继续到下一个列表,直到它完成,然后继续这个程序。

How do I do this?

我该怎么做呢?

(I've looked at the thread pool and queue functions but I can't find a simple answer.)

(我查看了线程池和队列函数,但找不到一个简单的答案。)

Edit: maybe I should add that each of my threads is using subprocess.Popen to run a separate command line decoder (flac) piped to stdout which is fed into a command line encoder (lame/mp3).

编辑:也许我应该补充一下,我的每个线程都在使用子进程。Popen要运行一个单独的命令行解码器(flac),并将其输入到命令行编码器(lame/mp3)中。

6 个解决方案

#1


4  

"Each of my threads is using subprocess.Popen to run a separate command line [process]".

我的每个线程都使用子进程。Popen运行一个单独的命令行[进程]"。

Why have a bunch of threads manage a bunch of processes? That's exactly what an OS does that for you. Why micro-manage what the OS already manages?

为什么一堆线程要管理一堆进程?这正是操作系统为你做的。为什么要对操作系统已经管理的内容进行微管理?

Rather than fool around with threads overseeing processes, just fork off processes. Your process table probably can't handle 2000 processes, but it can handle a few dozen (maybe a few hundred) pretty easily.

与其让线程监视进程,不如将进程分开。您的进程表可能无法处理2000个进程,但是它可以很容易地处理几十个(可能几百个)进程。

You want to have more work than your CPU's can possibly handle queued up. The real question is one of memory -- not processes or threads. If the sum of all the active data for all the processes exceeds physical memory, then data has to be swapped, and that will slow you down.

您希望有比CPU可能处理的更多的工作。真正的问题是内存,而不是进程或线程。如果所有进程的所有活动数据之和超过物理内存,则必须交换数据,这将使您慢下来。

If your processes have a fairly small memory footprint, you can have lots and lots running. If your processes have a large memory footprint, you can't have very many running.

如果您的进程的内存占用很小,那么您可以运行大量的进程。如果您的进程有很大的内存占用,那么您不可能有很多的运行。

#2


30  

If you want to limit the number of parallel threads, use a semaphore:

如果您想限制并行线程的数量,请使用信号量:

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class EncodeThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            <your code here>
        finally:
            threadLimiter.release()

Start all threads at once. All but maximumNumberOfThreads will wait in threadLimiter.acquire() and a waiting thread will only continue once another thread goes through threadLimiter.release().

同时启动所有线程。除了maximumNumberOfThreads之外,所有线程都将在threadLimiter.acquire()中等待,而等待的线程只有在另一个线程通过threadLimiter.release()时才会继续。

#3


1  

If you're using the default "cpython" version then this won't help you, because only one thread can execute at a time; look up Global Interpreter Lock. Instead, I'd suggest looking at the multiprocessing module in Python 2.6 -- it makes parallel programming a cinch. You can create a Pool object with 2*num_threads processes, and give it a bunch of tasks to do. It will execute up to 2*num_threads tasks at a time, until all are done.

如果您正在使用默认的“cpython”版本,那么这对您没有帮助,因为一次只能执行一个线程;查找全局解释器锁。相反,我建议查看Python 2.6中的多处理模块——它使并行编程变得非常容易。您可以使用2*num_threads进程创建一个池对象,并为其提供一系列任务。它将一次执行最多2*num_threads任务,直到全部完成。

At work I have recently migrated a bunch of Python XML tools (a differ, xpath grepper, and bulk xslt transformer) to use this, and have had very nice results with two processes per processor.

在工作中,我最近迁移了一堆Python XML工具(不同的是,xpath grepper和大容量xslt transformer)来使用它,每个处理器有两个进程,得到了非常好的结果。

#4


1  

It looks to me that what you want is a pool of some sort, and in that pool you would like the have n threads where n == the number of processors on your system. You would then have another thread whose only job was to feed jobs into a queue which the worker threads could pick up and process as they became free (so for a dual code machine, you'd have three threads but the main thread would be doing very little).

在我看来,您想要的是某种类型的池,在这个池中,您希望有n个线程,其中n ==系统上的处理器数量。然后,您将有另一个线程,其惟一的任务是将作业输入到队列中,工作线程可以在它们变得空闲时对其进行处理(因此,对于双代码机,您将有三个线程,但主线程将做得很少)。

As you are new to Python though I'll assume you don't know about the GIL and it's side-effects with regard to threading. If you read the article I linked you will soon understand why traditional multithreading solutions are not always the best in the Python world. Instead you should consider using the multiprocessing module (new in Python 2.6, in 2.5 you can use this backport) to achieve the same effect. It side-steps the issue of the GIL by using multiple processes as if they were threads within the same application. There are some restrictions about how you share data (you are working in different memory spaces) but actually this is no bad thing: they just encourage good practice such as minimising the contact points between threads (or processes in this case).

作为Python的新手,我假设您不了解GIL,它是线程的副作用。如果您阅读我所链接的文章,您将很快理解为什么传统的多线程解决方案并不总是Python世界中最好的。相反,您应该考虑使用多处理模块(Python 2.6中的新模块,在2.5中可以使用这个backport)来实现相同的效果。它绕过了GIL的问题,使用多个进程,就好像它们是同一个应用程序中的线程一样。关于如何共享数据(在不同的内存空间中工作)有一些限制,但实际上这并不是坏事:它们只是鼓励良好的实践,比如最小化线程之间的接触点(在本例中是进程)。

In your case you are probably intersted in using a pool as specified here.

在您的例子中,您可能对使用这里指定的池感兴趣。

#5


1  

Short answer: don't use threads.

简短的回答:不要使用线程。

For a working example, you can look at something I've recently tossed together at work. It's a little wrapper around ssh which runs a configurable number of Popen() subprocesses. I've posted it at: Bitbucket: classh (Cluster Admin's ssh Wrapper).

作为一个工作示例,您可以查看我最近在工作中遇到的一些问题。它是一个关于ssh的小包装器,它运行可配置数量的Popen()子进程。我将它发布在:Bitbucket: classh(集群管理员的ssh包装)。

As noted, I don't use threads; I just spawn off the children, loop over them calling their .poll() methods and checking for timeouts (also configurable) and replenish the pool as I gather the results. I've played with different sleep() values and in the past I've written a version (before the subprocess module was added to Python) which used the signal module (SIGCHLD and SIGALRM) and the os.fork() and os.execve() functions --- which my on pipe and file descriptor plumbing, etc).

如前所述,我不使用线程;我只是衍生出子元素,循环它们调用它们的.poll()方法,检查超时(也可配置),并在收集结果时补充池。我使用了不同的sleep()值,在过去我编写了一个版本(在将子流程模块添加到Python之前),它使用信号模块(SIGCHLD和SIGALRM)和os.fork()和os.execve()函数——我的管道和文件描述符管道等等)。

In my case I'm incrementally printing results as I gather them ... and remembering all of them to summarize at the end (when all the jobs have completed or been killed for exceeding the timeout).

在我的情况下,当我收集结果时,我正在逐步打印结果……并在结束时记住所有的任务以进行总结(当所有的任务已经完成或由于超时而被终止)。

I ran that, as posted, on a list of 25,000 internal hosts (many of which are down, retired, located internationally, not accessible to my test account etc). It completed the job in just over two hours and had no issues. (There were about 60 of them that were timeouts due to systems in degenerate/thrashing states -- proving that my timeout handling works correctly).

我在一个25000个内部主机的列表上运行了这个程序(其中很多都是关闭的,退役的,位于国际上的,我的测试账户是无法访问的)。它只用了两个多小时就完成了任务,没有任何问题。(其中大约有60个是由于系统的退化/抖动状态而超时的,这证明了我的超时处理工作是正确的)。

So I know this model works reliably. Running 100 current ssh processes with this code doesn't seem to cause any noticeable impact. (It's a moderately old FreeBSD box). I used to run the old (pre-subprocess) version with 100 concurrent processes on my old 512MB laptop without problems, too).

我知道这个模型是可靠的。使用此代码运行100个当前ssh进程似乎不会产生任何明显的影响。(这是一个比较旧的FreeBSD系统)。我曾经在旧的512MB笔记本电脑上运行带有100个并发进程的旧版本(pre-subprocess),没有任何问题)。

(BTW: I plan to clean this up and add features to it; feel free to contribute or to clone off your own branch of it; that's what Bitbucket.org is for).

(顺便说一句:我打算清理一下,并增加一些功能;*地贡献或克隆你自己的分支;这就是Bitbucket.org的目的)。

#6


0  

I am not an expert in this, but I have read something about "Lock"s. This article might help you out

我不是这方面的专家,但我读过一些关于“锁”的文章。这篇文章可能对你有所帮助

Hope this helps

希望这有助于

#1


4  

"Each of my threads is using subprocess.Popen to run a separate command line [process]".

我的每个线程都使用子进程。Popen运行一个单独的命令行[进程]"。

Why have a bunch of threads manage a bunch of processes? That's exactly what an OS does that for you. Why micro-manage what the OS already manages?

为什么一堆线程要管理一堆进程?这正是操作系统为你做的。为什么要对操作系统已经管理的内容进行微管理?

Rather than fool around with threads overseeing processes, just fork off processes. Your process table probably can't handle 2000 processes, but it can handle a few dozen (maybe a few hundred) pretty easily.

与其让线程监视进程,不如将进程分开。您的进程表可能无法处理2000个进程,但是它可以很容易地处理几十个(可能几百个)进程。

You want to have more work than your CPU's can possibly handle queued up. The real question is one of memory -- not processes or threads. If the sum of all the active data for all the processes exceeds physical memory, then data has to be swapped, and that will slow you down.

您希望有比CPU可能处理的更多的工作。真正的问题是内存,而不是进程或线程。如果所有进程的所有活动数据之和超过物理内存,则必须交换数据,这将使您慢下来。

If your processes have a fairly small memory footprint, you can have lots and lots running. If your processes have a large memory footprint, you can't have very many running.

如果您的进程的内存占用很小,那么您可以运行大量的进程。如果您的进程有很大的内存占用,那么您不可能有很多的运行。

#2


30  

If you want to limit the number of parallel threads, use a semaphore:

如果您想限制并行线程的数量,请使用信号量:

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class EncodeThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            <your code here>
        finally:
            threadLimiter.release()

Start all threads at once. All but maximumNumberOfThreads will wait in threadLimiter.acquire() and a waiting thread will only continue once another thread goes through threadLimiter.release().

同时启动所有线程。除了maximumNumberOfThreads之外,所有线程都将在threadLimiter.acquire()中等待,而等待的线程只有在另一个线程通过threadLimiter.release()时才会继续。

#3


1  

If you're using the default "cpython" version then this won't help you, because only one thread can execute at a time; look up Global Interpreter Lock. Instead, I'd suggest looking at the multiprocessing module in Python 2.6 -- it makes parallel programming a cinch. You can create a Pool object with 2*num_threads processes, and give it a bunch of tasks to do. It will execute up to 2*num_threads tasks at a time, until all are done.

如果您正在使用默认的“cpython”版本,那么这对您没有帮助,因为一次只能执行一个线程;查找全局解释器锁。相反,我建议查看Python 2.6中的多处理模块——它使并行编程变得非常容易。您可以使用2*num_threads进程创建一个池对象,并为其提供一系列任务。它将一次执行最多2*num_threads任务,直到全部完成。

At work I have recently migrated a bunch of Python XML tools (a differ, xpath grepper, and bulk xslt transformer) to use this, and have had very nice results with two processes per processor.

在工作中,我最近迁移了一堆Python XML工具(不同的是,xpath grepper和大容量xslt transformer)来使用它,每个处理器有两个进程,得到了非常好的结果。

#4


1  

It looks to me that what you want is a pool of some sort, and in that pool you would like the have n threads where n == the number of processors on your system. You would then have another thread whose only job was to feed jobs into a queue which the worker threads could pick up and process as they became free (so for a dual code machine, you'd have three threads but the main thread would be doing very little).

在我看来,您想要的是某种类型的池,在这个池中,您希望有n个线程,其中n ==系统上的处理器数量。然后,您将有另一个线程,其惟一的任务是将作业输入到队列中,工作线程可以在它们变得空闲时对其进行处理(因此,对于双代码机,您将有三个线程,但主线程将做得很少)。

As you are new to Python though I'll assume you don't know about the GIL and it's side-effects with regard to threading. If you read the article I linked you will soon understand why traditional multithreading solutions are not always the best in the Python world. Instead you should consider using the multiprocessing module (new in Python 2.6, in 2.5 you can use this backport) to achieve the same effect. It side-steps the issue of the GIL by using multiple processes as if they were threads within the same application. There are some restrictions about how you share data (you are working in different memory spaces) but actually this is no bad thing: they just encourage good practice such as minimising the contact points between threads (or processes in this case).

作为Python的新手,我假设您不了解GIL,它是线程的副作用。如果您阅读我所链接的文章,您将很快理解为什么传统的多线程解决方案并不总是Python世界中最好的。相反,您应该考虑使用多处理模块(Python 2.6中的新模块,在2.5中可以使用这个backport)来实现相同的效果。它绕过了GIL的问题,使用多个进程,就好像它们是同一个应用程序中的线程一样。关于如何共享数据(在不同的内存空间中工作)有一些限制,但实际上这并不是坏事:它们只是鼓励良好的实践,比如最小化线程之间的接触点(在本例中是进程)。

In your case you are probably intersted in using a pool as specified here.

在您的例子中,您可能对使用这里指定的池感兴趣。

#5


1  

Short answer: don't use threads.

简短的回答:不要使用线程。

For a working example, you can look at something I've recently tossed together at work. It's a little wrapper around ssh which runs a configurable number of Popen() subprocesses. I've posted it at: Bitbucket: classh (Cluster Admin's ssh Wrapper).

作为一个工作示例,您可以查看我最近在工作中遇到的一些问题。它是一个关于ssh的小包装器,它运行可配置数量的Popen()子进程。我将它发布在:Bitbucket: classh(集群管理员的ssh包装)。

As noted, I don't use threads; I just spawn off the children, loop over them calling their .poll() methods and checking for timeouts (also configurable) and replenish the pool as I gather the results. I've played with different sleep() values and in the past I've written a version (before the subprocess module was added to Python) which used the signal module (SIGCHLD and SIGALRM) and the os.fork() and os.execve() functions --- which my on pipe and file descriptor plumbing, etc).

如前所述,我不使用线程;我只是衍生出子元素,循环它们调用它们的.poll()方法,检查超时(也可配置),并在收集结果时补充池。我使用了不同的sleep()值,在过去我编写了一个版本(在将子流程模块添加到Python之前),它使用信号模块(SIGCHLD和SIGALRM)和os.fork()和os.execve()函数——我的管道和文件描述符管道等等)。

In my case I'm incrementally printing results as I gather them ... and remembering all of them to summarize at the end (when all the jobs have completed or been killed for exceeding the timeout).

在我的情况下,当我收集结果时,我正在逐步打印结果……并在结束时记住所有的任务以进行总结(当所有的任务已经完成或由于超时而被终止)。

I ran that, as posted, on a list of 25,000 internal hosts (many of which are down, retired, located internationally, not accessible to my test account etc). It completed the job in just over two hours and had no issues. (There were about 60 of them that were timeouts due to systems in degenerate/thrashing states -- proving that my timeout handling works correctly).

我在一个25000个内部主机的列表上运行了这个程序(其中很多都是关闭的,退役的,位于国际上的,我的测试账户是无法访问的)。它只用了两个多小时就完成了任务,没有任何问题。(其中大约有60个是由于系统的退化/抖动状态而超时的,这证明了我的超时处理工作是正确的)。

So I know this model works reliably. Running 100 current ssh processes with this code doesn't seem to cause any noticeable impact. (It's a moderately old FreeBSD box). I used to run the old (pre-subprocess) version with 100 concurrent processes on my old 512MB laptop without problems, too).

我知道这个模型是可靠的。使用此代码运行100个当前ssh进程似乎不会产生任何明显的影响。(这是一个比较旧的FreeBSD系统)。我曾经在旧的512MB笔记本电脑上运行带有100个并发进程的旧版本(pre-subprocess),没有任何问题)。

(BTW: I plan to clean this up and add features to it; feel free to contribute or to clone off your own branch of it; that's what Bitbucket.org is for).

(顺便说一句:我打算清理一下,并增加一些功能;*地贡献或克隆你自己的分支;这就是Bitbucket.org的目的)。

#6


0  

I am not an expert in this, but I have read something about "Lock"s. This article might help you out

我不是这方面的专家,但我读过一些关于“锁”的文章。这篇文章可能对你有所帮助

Hope this helps

希望这有助于