Parallel.ForEach不断产生新线程

时间:2022-04-04 20:59:33

While I was using Parallel.ForEach in my program, I found that some threads never seemed to finish. In fact, it kept spawning new threads over and over, a behaviour that I wasn't expecting and definitely don't want.

当我在我的程序中使用Parallel.ForEach时,我发现有些线程似乎永远不会完成。实际上,它一直在反复产生新的线程,这种行为是我没想到的,绝对不想要的。

I was able to reproduce this behaviour with the following code which, just like my 'real' program, both uses processor and memory a lot (.NET 4.0 code):

我能够使用以下代码重现此行为,就像我的“真实”程序一样,它们都使用处理器和内存(.NET 4.0代码):

public class Node
{
    public Node Previous { get; private set; }

    public Node(Node previous)
    {
        Previous = previous;
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        DateTime startMoment = DateTime.Now;
        int concurrentThreads = 0;

        var jobs = Enumerable.Range(0, 2000);
        Parallel.ForEach(jobs, delegate(int jobNr)
        {
            Interlocked.Increment(ref concurrentThreads);

            int heavyness = jobNr % 9;

            //Give the processor and the garbage collector something to do...
            List<Node> nodes = new List<Node>();
            Node current = null;
            for (int y = 0; y < 1024 * 1024 * heavyness; y++)
            {
                current = new Node(current);
                nodes.Add(current);
            }

            TimeSpan elapsed = DateTime.Now - startMoment;
            int threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
            Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining);
        });
    }
}

When run on my quad-core, it initially starts of with 4 concurrent threads, just as you would expect. However, over time more and more threads are being created. Eventually, this program then throws an OutOfMemoryException:

在我的四核上运行时,它最初启动时有4个并发线程,正如您所期望的那样。但是,随着时间的推移,越来越多的线程被创建。最终,该程序然后抛出OutOfMemoryException:

[00:00] Job    0 complete. 3 threads remaining.
[00:01] Job    1 complete. 4 threads remaining.
[00:01] Job    2 complete. 4 threads remaining.
[00:02] Job    3 complete. 4 threads remaining.
[00:05] Job    9 complete. 5 threads remaining.
[00:05] Job    4 complete. 5 threads remaining.
[00:05] Job    5 complete. 5 threads remaining.
[00:05] Job   10 complete. 5 threads remaining.
[00:08] Job   11 complete. 5 threads remaining.
[00:08] Job    6 complete. 5 threads remaining.
...
[00:55] Job   67 complete. 7 threads remaining.
[00:56] Job   81 complete. 8 threads remaining.
...
[01:54] Job  107 complete. 11 threads remaining.
[02:00] Job  121 complete. 12 threads remaining.
..
[02:55] Job  115 complete. 19 threads remaining.
[03:02] Job  166 complete. 21 threads remaining.
...
[03:41] Job  113 complete. 28 threads remaining.
<OutOfMemoryException>

The memory usage graph for the experiment above is as follows:

上述实验的内存使用情况图如下:

Parallel.ForEach不断产生新线程

(The screenshot is in Dutch; the top part represents processor usage, the bottom part memory usage.) As you can see, it looks like a new thread is being spawned almost every time the garbage collector gets in the way (as can be seen in the dips of memory usage).

(截图是荷兰语;顶部代表处理器使用情况,底部代表内存使用情况。)正如您所看到的,几乎每次垃圾收集器阻塞时都会产生一个新线程(可以看出)在内存使用的萧条)。

Can anyone explain why this is happening, and what I can do about it? I just want .NET to stop spawning new threads, and finish the existing threads first...

任何人都可以解释为什么会这样,我能做些什么呢?我只是希望.NET停止生成新线程,并首先完成现有线程......

3 个解决方案

#1


16  

You can limit the maximum number of threads that get created by specifying a ParallelOptions instance with the MaxDegreeOfParallelism property set:

您可以通过使用MaxDegreeOfParallelism属性集指定ParallelOptions实例来限制创建的最大线程数:

var jobs = Enumerable.Range(0, 2000);
ParallelOptions po = new ParallelOptions
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(jobs, po, jobNr =>
{
    // ...
});

As to why you're getting the behaviour you're observing: The TPL (which underlies PLINQ) is, by default, at liberty to guess the optimal number of threads to use. Whenever a parallel task blocks, the task scheduler may create a new thread in order to maintain progress. In your case, the blocking might be happening implicitly; for example, through the Console.WriteLine call, or (as you observed) during garbage collection.

至于为什么你会得到你正在观察的行为:默认情况下,TPL(它是PLINQ的基础)可以*地猜测要使用的最佳线程数。每当并行任务阻塞时,任务调度程序可以创建新线程以保持进度。在你的情况下,阻塞可能会隐含发生;例如,通过Console.WriteLine调用,或(在您观察时)垃圾回收期间。

From Concurrency Levels Tuning with Task Parallel Library (How Many Threads to Use?):

从并发级别调整任务并行库(要使用多少线程?):

Since the TPL default policy is to use one thread per processor, we can conclude that TPL initially assumes that the workload of a task is ~100% working and 0% waiting, and if the initial assumption fails and the task enters a waiting state (i.e. starts blocking) - TPL with take the liberty to add threads as appropriate.

由于TPL默认策略是每个处理器使用一个线程,我们可以得出结论,TPL最初假定任务的工作负载是~100%工作和0%等待,并且如果初始假设失败并且任务进入等待状态(即开始阻止) - TPL可以*地添加线程。

#2


6  

You should probably read a bit about the how the task scheduler works.

您可能应该阅读一下任务调度程序的工作原理。

http://msdn.microsoft.com/en-us/library/ff963549.aspx (latter half of the page)

http://msdn.microsoft.com/en-us/library/ff963549.aspx(页面的后半部分)

"The .NET thread pool automatically manages the number of worker threads in the pool. It adds and removes threads according to built-in heuristics. The .NET thread pool has two main mechanisms for injecting threads: a starvation-avoidance mechanism that adds worker threads if it sees no progress being made on queued items and a hill-climbing heuristic that tries to maximize throughput while using as few threads as possible.

“.NET线程池自动管理池中的工作线程数。它根据内置的启发式方法添加和删除线程..NET线程池有两个主要的注入线程机制:一个添加工作者的饥饿避免机制线程如果它看不到排队项目没有进展和爬山试探,试图在尽可能少的线程使用时最大化吞吐量。

The goal of starvation avoidance is to prevent deadlock. This kind of deadlock can occur when a worker thread waits for a synchronization event that can only be satisfied by a work item that is still pending in the thread pool's global or local queues. If there were a fixed number of worker threads, and all of those threads were similarly blocked, the system would be unable to ever make further progress. Adding a new worker thread resolves the problem.

避免饥饿的目的是防止僵局。当工作线程等待同步事件时,可能会发生这种死锁,该同步事件只能由线程池的全局或本地队列中仍未处理的工作项来满足。如果存在固定数量的工作线程,并且所有这些线程都被类似地阻止,则系统将无法进一步取得进一步进展。添加新的工作线程可以解决问题。

A goal of the hill-climbing heuristic is to improve the utilization of cores when threads are blocked by I/O or other wait conditions that stall the processor. By default, the managed thread pool has one worker thread per core. If one of these worker threads becomes blocked, there's a chance that a core might be underutilized, depending on the computer's overall workload. The thread injection logic doesn't distinguish between a thread that's blocked and a thread that's performing a lengthy, processor-intensive operation. Therefore, whenever the thread pool's global or local queues contain pending work items, active work items that take a long time to run (more than a half second) can trigger the creation of new thread pool worker threads."

爬山启发式的目标是在线程被I / O或其他停止处理器的等待条件阻塞时提高核心的利用率。默认情况下,托管线程池每个核心有一个工作线程。如果其中一个工作线程被阻塞,则核心可能未充分利用,具体取决于计算机的总体工作负载。线程注入逻辑不区分被阻塞的线程和执行冗长的处理器密集型操作的线程。因此,每当线程池的全局或本地队列包含挂起的工作项时,需要很长时间才能运行的活动工作项(超过半秒)可以触发创建新的线程池工作线程。

You can mark a task as LongRunning but this has the side effect of allocating a thread for it from outside the thread pool which means that the task cannot be inlined.

您可以将任务标记为LongRunning,但这会产生从线程池外部为其分配线程的副作用,这意味着无法内联任务。

Remember that the ParallelFor treats the work it is given as blocks so even if the work in one loop is fairly small the overall work done by the task invoked by the look may appear longer to the scheduler.

请记住,ParallelFor将作为块给出的工作视为块,因此即使一个循环中的工作相当小,由外观调用的任务所完成的整体工作对于调度程序来说可能看起来更长。

Most calls to the GC in and of them selves aren't blocking (it runs on a separate thread) but if you wait for GC to complete then this does block. Remember also that the GC is rearranging memory so this may have some side effects (and blocking) if you are trying to allocate memory while running GC. I don't have specifics here but I know the PPL has some memory allocation features specifically for concurrent memory management for this reason.

大多数对GC及其自身的调用都没有阻塞(它在一个单独的线程上运行)但是如果你等待GC完成则会阻塞。还要记住,GC正在重新排列内存,因此如果您在运行GC时尝试分配内存,则可能会产生一些副作用(和阻塞)。我没有具体细节,但我知道PPL有一些专门用于并发内存管理的内存分配功能。

Looking at your code's output it seems that things are running for many seconds. So I'm not surprised that you are seeing thread injection. However I seem to remember that the default thread pool size is roughly 30 threads (probably depending on the number of cores on your system). A thread takes up roughly a MB of memory before your code allocates any more so I'm not clear why you could get an out of memory exception here.

查看代码的输出,似乎事情正在运行很多秒。所以我看到线程注入并不奇怪。但是我似乎记得默认的线程池大小大约是30个线程(可能取决于系统上的核心数)。在你的代码分配之前,一个线程占用大约一MB的内存,所以我不清楚你为什么会在这里得到一个内存不足的例外。

#3


1  

I've posted the follow-up question "How to count the amount of concurrent threads in .NET application?"

我发布了后续问题“如何计算.NET应用程序中并发线程的数量?”

If to count the threads directly, their number in Parallel.For() mostly ((very rarely and insignificantly decreasing) only increases and is not releleased after loop completion.

如果要直接计算线程,它们在Parallel.For()中的数量大多数((非常罕见且不显着地减少)仅增加并且在循环完成后不会释放。

Checked this in both Release and Debug mode, with

使用,在发布和调试模式下检查此项

ParallelOptions po = new ParallelOptions
{
  MaxDegreeOfParallelism = Environment.ProcessorCount
};

and without

The digits vary but conclusions are the same.

数字各不相同,但结论相同。

Here is the ready code I was using, if someone wants to play with:

这是我正在使用的准备好的代码,如果有人想玩:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Edit4Posting
{
public class Node
{

  public Node Previous { get; private set; }
  public Node(Node previous)
  {
    Previous = previous;
    }
  }
  public class Edit4Posting
  {

    public static void Main(string[] args)
    {
      int concurrentThreads = 0;
      int directThreadsCount = 0;
      int diagThreadCount = 0;

      var jobs = Enumerable.Range(0, 160);
      ParallelOptions po = new ParallelOptions
      {
        MaxDegreeOfParallelism = Environment.ProcessorCount
      };
      Parallel.ForEach(jobs, po, delegate(int jobNr)
      //Parallel.ForEach(jobs, delegate(int jobNr)
      {
        int threadsRemaining = Interlocked.Increment(ref concurrentThreads);

        int heavyness = jobNr % 9;

        //Give the processor and the garbage collector something to do...
        List<Node> nodes = new List<Node>();
        Node current = null;
        //for (int y = 0; y < 1024 * 1024 * heavyness; y++)
        for (int y = 0; y < 1024 * 24 * heavyness; y++)
        {
          current = new Node(current);
          nodes.Add(current);
        }
        //*******************************
        directThreadsCount = Process.GetCurrentProcess().Threads.Count;
        //*******************************
        threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
        Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}",
          jobNr, threadsRemaining, directThreadsCount);
      });
      Console.WriteLine("FINISHED");
      Console.ReadLine();
    }
  }
}

#1


16  

You can limit the maximum number of threads that get created by specifying a ParallelOptions instance with the MaxDegreeOfParallelism property set:

您可以通过使用MaxDegreeOfParallelism属性集指定ParallelOptions实例来限制创建的最大线程数:

var jobs = Enumerable.Range(0, 2000);
ParallelOptions po = new ParallelOptions
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(jobs, po, jobNr =>
{
    // ...
});

As to why you're getting the behaviour you're observing: The TPL (which underlies PLINQ) is, by default, at liberty to guess the optimal number of threads to use. Whenever a parallel task blocks, the task scheduler may create a new thread in order to maintain progress. In your case, the blocking might be happening implicitly; for example, through the Console.WriteLine call, or (as you observed) during garbage collection.

至于为什么你会得到你正在观察的行为:默认情况下,TPL(它是PLINQ的基础)可以*地猜测要使用的最佳线程数。每当并行任务阻塞时,任务调度程序可以创建新线程以保持进度。在你的情况下,阻塞可能会隐含发生;例如,通过Console.WriteLine调用,或(在您观察时)垃圾回收期间。

From Concurrency Levels Tuning with Task Parallel Library (How Many Threads to Use?):

从并发级别调整任务并行库(要使用多少线程?):

Since the TPL default policy is to use one thread per processor, we can conclude that TPL initially assumes that the workload of a task is ~100% working and 0% waiting, and if the initial assumption fails and the task enters a waiting state (i.e. starts blocking) - TPL with take the liberty to add threads as appropriate.

由于TPL默认策略是每个处理器使用一个线程,我们可以得出结论,TPL最初假定任务的工作负载是~100%工作和0%等待,并且如果初始假设失败并且任务进入等待状态(即开始阻止) - TPL可以*地添加线程。

#2


6  

You should probably read a bit about the how the task scheduler works.

您可能应该阅读一下任务调度程序的工作原理。

http://msdn.microsoft.com/en-us/library/ff963549.aspx (latter half of the page)

http://msdn.microsoft.com/en-us/library/ff963549.aspx(页面的后半部分)

"The .NET thread pool automatically manages the number of worker threads in the pool. It adds and removes threads according to built-in heuristics. The .NET thread pool has two main mechanisms for injecting threads: a starvation-avoidance mechanism that adds worker threads if it sees no progress being made on queued items and a hill-climbing heuristic that tries to maximize throughput while using as few threads as possible.

“.NET线程池自动管理池中的工作线程数。它根据内置的启发式方法添加和删除线程..NET线程池有两个主要的注入线程机制:一个添加工作者的饥饿避免机制线程如果它看不到排队项目没有进展和爬山试探,试图在尽可能少的线程使用时最大化吞吐量。

The goal of starvation avoidance is to prevent deadlock. This kind of deadlock can occur when a worker thread waits for a synchronization event that can only be satisfied by a work item that is still pending in the thread pool's global or local queues. If there were a fixed number of worker threads, and all of those threads were similarly blocked, the system would be unable to ever make further progress. Adding a new worker thread resolves the problem.

避免饥饿的目的是防止僵局。当工作线程等待同步事件时,可能会发生这种死锁,该同步事件只能由线程池的全局或本地队列中仍未处理的工作项来满足。如果存在固定数量的工作线程,并且所有这些线程都被类似地阻止,则系统将无法进一步取得进一步进展。添加新的工作线程可以解决问题。

A goal of the hill-climbing heuristic is to improve the utilization of cores when threads are blocked by I/O or other wait conditions that stall the processor. By default, the managed thread pool has one worker thread per core. If one of these worker threads becomes blocked, there's a chance that a core might be underutilized, depending on the computer's overall workload. The thread injection logic doesn't distinguish between a thread that's blocked and a thread that's performing a lengthy, processor-intensive operation. Therefore, whenever the thread pool's global or local queues contain pending work items, active work items that take a long time to run (more than a half second) can trigger the creation of new thread pool worker threads."

爬山启发式的目标是在线程被I / O或其他停止处理器的等待条件阻塞时提高核心的利用率。默认情况下,托管线程池每个核心有一个工作线程。如果其中一个工作线程被阻塞,则核心可能未充分利用,具体取决于计算机的总体工作负载。线程注入逻辑不区分被阻塞的线程和执行冗长的处理器密集型操作的线程。因此,每当线程池的全局或本地队列包含挂起的工作项时,需要很长时间才能运行的活动工作项(超过半秒)可以触发创建新的线程池工作线程。

You can mark a task as LongRunning but this has the side effect of allocating a thread for it from outside the thread pool which means that the task cannot be inlined.

您可以将任务标记为LongRunning,但这会产生从线程池外部为其分配线程的副作用,这意味着无法内联任务。

Remember that the ParallelFor treats the work it is given as blocks so even if the work in one loop is fairly small the overall work done by the task invoked by the look may appear longer to the scheduler.

请记住,ParallelFor将作为块给出的工作视为块,因此即使一个循环中的工作相当小,由外观调用的任务所完成的整体工作对于调度程序来说可能看起来更长。

Most calls to the GC in and of them selves aren't blocking (it runs on a separate thread) but if you wait for GC to complete then this does block. Remember also that the GC is rearranging memory so this may have some side effects (and blocking) if you are trying to allocate memory while running GC. I don't have specifics here but I know the PPL has some memory allocation features specifically for concurrent memory management for this reason.

大多数对GC及其自身的调用都没有阻塞(它在一个单独的线程上运行)但是如果你等待GC完成则会阻塞。还要记住,GC正在重新排列内存,因此如果您在运行GC时尝试分配内存,则可能会产生一些副作用(和阻塞)。我没有具体细节,但我知道PPL有一些专门用于并发内存管理的内存分配功能。

Looking at your code's output it seems that things are running for many seconds. So I'm not surprised that you are seeing thread injection. However I seem to remember that the default thread pool size is roughly 30 threads (probably depending on the number of cores on your system). A thread takes up roughly a MB of memory before your code allocates any more so I'm not clear why you could get an out of memory exception here.

查看代码的输出,似乎事情正在运行很多秒。所以我看到线程注入并不奇怪。但是我似乎记得默认的线程池大小大约是30个线程(可能取决于系统上的核心数)。在你的代码分配之前,一个线程占用大约一MB的内存,所以我不清楚你为什么会在这里得到一个内存不足的例外。

#3


1  

I've posted the follow-up question "How to count the amount of concurrent threads in .NET application?"

我发布了后续问题“如何计算.NET应用程序中并发线程的数量?”

If to count the threads directly, their number in Parallel.For() mostly ((very rarely and insignificantly decreasing) only increases and is not releleased after loop completion.

如果要直接计算线程,它们在Parallel.For()中的数量大多数((非常罕见且不显着地减少)仅增加并且在循环完成后不会释放。

Checked this in both Release and Debug mode, with

使用,在发布和调试模式下检查此项

ParallelOptions po = new ParallelOptions
{
  MaxDegreeOfParallelism = Environment.ProcessorCount
};

and without

The digits vary but conclusions are the same.

数字各不相同,但结论相同。

Here is the ready code I was using, if someone wants to play with:

这是我正在使用的准备好的代码,如果有人想玩:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Edit4Posting
{
public class Node
{

  public Node Previous { get; private set; }
  public Node(Node previous)
  {
    Previous = previous;
    }
  }
  public class Edit4Posting
  {

    public static void Main(string[] args)
    {
      int concurrentThreads = 0;
      int directThreadsCount = 0;
      int diagThreadCount = 0;

      var jobs = Enumerable.Range(0, 160);
      ParallelOptions po = new ParallelOptions
      {
        MaxDegreeOfParallelism = Environment.ProcessorCount
      };
      Parallel.ForEach(jobs, po, delegate(int jobNr)
      //Parallel.ForEach(jobs, delegate(int jobNr)
      {
        int threadsRemaining = Interlocked.Increment(ref concurrentThreads);

        int heavyness = jobNr % 9;

        //Give the processor and the garbage collector something to do...
        List<Node> nodes = new List<Node>();
        Node current = null;
        //for (int y = 0; y < 1024 * 1024 * heavyness; y++)
        for (int y = 0; y < 1024 * 24 * heavyness; y++)
        {
          current = new Node(current);
          nodes.Add(current);
        }
        //*******************************
        directThreadsCount = Process.GetCurrentProcess().Threads.Count;
        //*******************************
        threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
        Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}",
          jobNr, threadsRemaining, directThreadsCount);
      });
      Console.WriteLine("FINISHED");
      Console.ReadLine();
    }
  }
}