有没有更好的方法来等待排队的线程?

时间:2022-01-12 11:04:29

Is there a better way to wait for queued threads before execute another process?

有没有更好的方法在执行另一个进程之前等待排队的线程?

Currently I'm doing:

目前我在做:

this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable

// Initiate process

foreach (string someString in arrayStrings)
{
     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     Thread.Sleep(100);
}

// Waiting execution for all queued threads
lock (this.workerLocker)  // Global variable (object)
{
     while (this.RunningWorkers > 0)
     {
          Monitor.Wait(this.workerLocker);
     }
}

// Do anything else    
Console.WriteLine("END");

// Method DoSomething() definition
public void DoSomething(object data)
{
    // Do a slow process...
    .
    .
    .

    lock (this.workerLocker)
    {
        this.RunningWorkers--;
        Monitor.Pulse(this.workerLocker);
    }
}

8 个解决方案

#1


You likely want to take a look at AutoResetEvent and ManualResetEvent.

您可能想要查看AutoResetEvent和ManualResetEvent。

These are meant for exactly this situation (waiting for a ThreadPool thread to finish, prior to doing "something").

这些都是针对这种情况的(在做“某事”之前等待ThreadPool线程完成)。

You'd do something like this:

你会做这样的事情:

static void Main(string[] args)
{
    List<ManualResetEvent> resetEvents = new List<ManualResetEvent>();
    foreach (var x in Enumerable.Range(1, WORKER_COUNT))
    {
        ManualResetEvent resetEvent = new ManualResetEvent();
        ThreadPool.QueueUserWorkItem(DoSomething, resetEvent);
        resetEvents.Add(resetEvent);
    }

    // wait for all ManualResetEvents
    WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-)
}

public static void DoSomething(object data)
{
    ManualResetEvent resetEvent = data as ManualResetEvent;

    // Do something

    resetEvent.Set();
}

Edit: Forgot to mention you can wait for a single thread, any thread and so forth as well. Also depending on your situation, AutoResetEvent can simplify things a bit, since it (as the name implies) can signal events automatically :-)

编辑:忘记提及你可以等待单个线程,任何线程等等。另外,根据您的情况,AutoResetEvent可以简化一些事情,因为它(顾名思义)可以自动发出事件信号:-)

#2


How about a Fork and Join that uses just Monitor ;-p

如何使用仅使用Monitor的Fork和Join ;-p

Forker p = new Forker();
foreach (var obj in collection)
{
    var tmp = obj;
    p.Fork(delegate { DoSomeWork(tmp); });
}
p.Join();

Full code shown on this earlier answer.

这个早期答案中显示的完整代码。

Or for a producer/consumer queue of capped size (thread-safe etc), here.

或者对于上限大小的生产者/消费者队列(线程安全等),这里。

#3


In addition to Barrier, pointed out by Henk Holterman (BTW his is a very bad usage of Barrier, see my comment to his answer), .NET 4.0 provides whole bunch of other options (to use them in .NET 3.5 you need to download an extra DLL from Microsoft). I blogged a post that lists them all, but my favorite is definitely Parallel.ForEach:

除了Barrier之外,Henk Holterman指出(BTW他对Barrier的使用非常糟糕,请参阅我的回答),.NET 4.0提供了大量其他选项(在.NET 3.5中使用它们需要下载来自Microsoft的额外DLL)。我在博客上写了一篇文章,列出了所有内容,但我最喜欢的是Parallel.ForEach:

Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Behind the scenes, Parallel.ForEach queues to the new and improved thread pool and waits until all threads are done.

在幕后,Parallel.ForEach排队到新的和改进的线程池,并等待所有线程完成。

#4


I really like the Begin- End- Async Pattern when I have to wait for the tasks to finish.

当我不得不等待任务完成时,我真的很喜欢Begin-End-Async Pattern。

I would advice you to wrap the BeginEnd in a worker class:

我建议你将BeginEnd包装在一个worker类中:

public class StringWorker
{
    private string m_someString;
    private IAsyncResult m_result;

    private Action DoSomethingDelegate;

    public StringWorker(string someString)
    {
        DoSomethingDelegate = DoSomething;
    }

    private void DoSomething()
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginDoSomething()
    {
        if (m_result != null) { throw new InvalidOperationException(); }
        m_result = DoSomethingDelegate.BeginInvoke(null, null);
        return m_result;
    }

    public void EndDoSomething()
    {
        DoSomethingDelegate.EndInvoke(m_result);
    }
}

To do your starting and working use this code snippet:

要开始和工作,请使用以下代码段:

List<StringWorker> workers = new List<StringWorker>();

foreach (var someString in arrayStrings)
{
    StringWorker worker = new StringWorker(someString);
    worker.BeginDoSomething();
    workers.Add(worker);
}

foreach (var worker in workers)
{
    worker.EndDoSomething();
}

Console.WriteLine("END");

And that's it.

就是这样。

Sidenote: If you want to get a result back from the BeginEnd then change the "Action" to Func and change the EndDoSomething to return a type.

旁注:如果要从BeginEnd返回结果,则将“Action”更改为Func并更改EndDoSomething以返回类型。

public class StringWorker
{
    private string m_someString;
    private IAsyncResult m_result;

    private Func<string> DoSomethingDelegate;

    public StringWorker(string someString)
    {
        DoSomethingDelegate = DoSomething;
    }

    private string DoSomething()
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginDoSomething()
    {
        if (m_result != null) { throw new InvalidOperationException(); }
        m_result = DoSomethingDelegate.BeginInvoke(null, null);
        return m_result;
    }

    public string EndDoSomething()
    {
        return DoSomethingDelegate.EndInvoke(m_result);
    }
}

#5


Yes, there is.

就在这里。

Suggested approach

1) a counter and a wait handle

1)计数器和等待句柄

int ActiveCount = 1; // 1 (!) is important
EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);

2) adding loop

2)添加循环

foreach (string someString in arrayStrings)
{
     Interlocked.Increment(ref ActiveCount);

     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     // Thread.Sleep(100); // you really need this sleep ?
}

PostActionCheck();
ewhAllDone.Wait();

3) DoSomething should look like

3)DoSomething应该是什么样的

{
    try
    {
        // some long executing code
    }
    finally
    {
        // ....
        PostActionCheck();
    }
} 

4) where PostActionCheck is

4)PostActionCheck在哪里

void PostActionCheck()
{
    if (Interlocked.Decrement(ref ActiveCount) == 0)
        ewhAllDone.Set();
}

Idea

ActiveCount is initialized with 1, and then get incremented n times.

ActiveCount初始化为1,然后增加n次。

PostActionCheck is called n + 1 times. The last one will trigger the event.

PostActionCheck被称为n + 1次。最后一个将触发事件。

The benefit of this solution is that is uses a single kernel object (which is an event), and 2 * n + 1 calls of lightweight API's. (Can there be less ?)

这个解决方案的好处是使用单个内核对象(这是一个事件),以及轻量级API的2 * n + 1次调用。 (可以少吗?)

P.S.

I wrote the code here, I might have misspelled some class names.

我在这里写了代码,我可能拼错了一些类名。

#6


.NET 4.0 has a new class for that, Barrier.

.NET 4.0有一个新类,Barrier。

Aside from that your method isn't that bad, and you could optimize a little by only Pulsing if RunningWorkers is 0 after the decrement. That would look like:

除此之外,您的方法并不是那么糟糕,如果在减量后RunningWorkers为0,您可以仅通过Pulsing进行优化。那看起来像是:

this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable

foreach (string someString in arrayStrings)
{
     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     //Thread.Sleep(100);
}

// Waiting execution for all queued threads
Monitor.Wait(this.workerLocker);

// Method DoSomething() definition
public void DoSomething(object data)
{
    // Do a slow process...
    // ...

    lock (this.workerLocker)
    {
        this.RunningWorkers--;
        if (this.RunningWorkers == 0)
           Monitor.Pulse(this.workerLocker);
    }
}

You could use an EventWaithandle or AutoResetEvent but they are all wrapped Win32 API's. Since the Monitor class is pure managed code I would prefer a Monitor for this situation.

您可以使用EventWaithandle或AutoResetEvent,但它们都包含在Win32 API中。由于Monitor类是纯托管代码,因此我更喜欢Monitor。

#7


I'm not sure there is really, I've recently done something similar to scan each IP of a subnet for accepting a particular port.

我不确定是否真的,我最近做了类似的事情来扫描子网的每个IP以接受特定的端口。

A couple of things I can suggest that may improve performance:

我可以建议的一些事情可能会提高性能:

  • Use the SetMaxThreads method of ThreadPool to tweak performance (i.e. balancing having lots of threads running at once, against the locking time on such a large number of threads).

    使用ThreadPool的SetMaxThreads方法来调整性能(即平衡有大量线程一次运行,相对于如此大量线程的锁定时间)。

  • Don't sleep when setting up all the work items, there is no real need (that I am immediately aware of. Do however sleep inside the DoSomething method, perhaps only for a millisecond, to allow other threads to jump in there if need be.

    在设置所有工作项时不要睡觉,没有真正的需要(我立即意识到。但是在DoSomething方法中睡觉,可能只有一毫秒,以允许其他线程在那里跳入,如果需要的话。

I'm sure you could implement a more customised method yourself, but I doubt it would be more efficient than using the ThreadPool.

我相信你自己可以实现更自定义的方法,但我怀疑它比使用ThreadPool更有效。

P.S I'm not 100% clear on the reason for using monitor, since you are locking anyway? Note that the question is asked only because I haven't used the Monitor class previously, not because I'm actually doubting it's use.

P.S我不是100%清楚使用显示器的原因,因为你还在锁定?请注意,问题只是因为我之前没有使用过Monitor类,而不是因为我实际上怀疑它是否正在使用。

#8


Use Spring Threading. It has Barrier implementations built in.

使用Spring Threading。它内置了Barrier实现。

http://www.springsource.org/extensions/se-threading-net

#1


You likely want to take a look at AutoResetEvent and ManualResetEvent.

您可能想要查看AutoResetEvent和ManualResetEvent。

These are meant for exactly this situation (waiting for a ThreadPool thread to finish, prior to doing "something").

这些都是针对这种情况的(在做“某事”之前等待ThreadPool线程完成)。

You'd do something like this:

你会做这样的事情:

static void Main(string[] args)
{
    List<ManualResetEvent> resetEvents = new List<ManualResetEvent>();
    foreach (var x in Enumerable.Range(1, WORKER_COUNT))
    {
        ManualResetEvent resetEvent = new ManualResetEvent();
        ThreadPool.QueueUserWorkItem(DoSomething, resetEvent);
        resetEvents.Add(resetEvent);
    }

    // wait for all ManualResetEvents
    WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-)
}

public static void DoSomething(object data)
{
    ManualResetEvent resetEvent = data as ManualResetEvent;

    // Do something

    resetEvent.Set();
}

Edit: Forgot to mention you can wait for a single thread, any thread and so forth as well. Also depending on your situation, AutoResetEvent can simplify things a bit, since it (as the name implies) can signal events automatically :-)

编辑:忘记提及你可以等待单个线程,任何线程等等。另外,根据您的情况,AutoResetEvent可以简化一些事情,因为它(顾名思义)可以自动发出事件信号:-)

#2


How about a Fork and Join that uses just Monitor ;-p

如何使用仅使用Monitor的Fork和Join ;-p

Forker p = new Forker();
foreach (var obj in collection)
{
    var tmp = obj;
    p.Fork(delegate { DoSomeWork(tmp); });
}
p.Join();

Full code shown on this earlier answer.

这个早期答案中显示的完整代码。

Or for a producer/consumer queue of capped size (thread-safe etc), here.

或者对于上限大小的生产者/消费者队列(线程安全等),这里。

#3


In addition to Barrier, pointed out by Henk Holterman (BTW his is a very bad usage of Barrier, see my comment to his answer), .NET 4.0 provides whole bunch of other options (to use them in .NET 3.5 you need to download an extra DLL from Microsoft). I blogged a post that lists them all, but my favorite is definitely Parallel.ForEach:

除了Barrier之外,Henk Holterman指出(BTW他对Barrier的使用非常糟糕,请参阅我的回答),.NET 4.0提供了大量其他选项(在.NET 3.5中使用它们需要下载来自Microsoft的额外DLL)。我在博客上写了一篇文章,列出了所有内容,但我最喜欢的是Parallel.ForEach:

Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Behind the scenes, Parallel.ForEach queues to the new and improved thread pool and waits until all threads are done.

在幕后,Parallel.ForEach排队到新的和改进的线程池,并等待所有线程完成。

#4


I really like the Begin- End- Async Pattern when I have to wait for the tasks to finish.

当我不得不等待任务完成时,我真的很喜欢Begin-End-Async Pattern。

I would advice you to wrap the BeginEnd in a worker class:

我建议你将BeginEnd包装在一个worker类中:

public class StringWorker
{
    private string m_someString;
    private IAsyncResult m_result;

    private Action DoSomethingDelegate;

    public StringWorker(string someString)
    {
        DoSomethingDelegate = DoSomething;
    }

    private void DoSomething()
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginDoSomething()
    {
        if (m_result != null) { throw new InvalidOperationException(); }
        m_result = DoSomethingDelegate.BeginInvoke(null, null);
        return m_result;
    }

    public void EndDoSomething()
    {
        DoSomethingDelegate.EndInvoke(m_result);
    }
}

To do your starting and working use this code snippet:

要开始和工作,请使用以下代码段:

List<StringWorker> workers = new List<StringWorker>();

foreach (var someString in arrayStrings)
{
    StringWorker worker = new StringWorker(someString);
    worker.BeginDoSomething();
    workers.Add(worker);
}

foreach (var worker in workers)
{
    worker.EndDoSomething();
}

Console.WriteLine("END");

And that's it.

就是这样。

Sidenote: If you want to get a result back from the BeginEnd then change the "Action" to Func and change the EndDoSomething to return a type.

旁注:如果要从BeginEnd返回结果,则将“Action”更改为Func并更改EndDoSomething以返回类型。

public class StringWorker
{
    private string m_someString;
    private IAsyncResult m_result;

    private Func<string> DoSomethingDelegate;

    public StringWorker(string someString)
    {
        DoSomethingDelegate = DoSomething;
    }

    private string DoSomething()
    {
        throw new NotImplementedException();
    }

    public IAsyncResult BeginDoSomething()
    {
        if (m_result != null) { throw new InvalidOperationException(); }
        m_result = DoSomethingDelegate.BeginInvoke(null, null);
        return m_result;
    }

    public string EndDoSomething()
    {
        return DoSomethingDelegate.EndInvoke(m_result);
    }
}

#5


Yes, there is.

就在这里。

Suggested approach

1) a counter and a wait handle

1)计数器和等待句柄

int ActiveCount = 1; // 1 (!) is important
EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);

2) adding loop

2)添加循环

foreach (string someString in arrayStrings)
{
     Interlocked.Increment(ref ActiveCount);

     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     // Thread.Sleep(100); // you really need this sleep ?
}

PostActionCheck();
ewhAllDone.Wait();

3) DoSomething should look like

3)DoSomething应该是什么样的

{
    try
    {
        // some long executing code
    }
    finally
    {
        // ....
        PostActionCheck();
    }
} 

4) where PostActionCheck is

4)PostActionCheck在哪里

void PostActionCheck()
{
    if (Interlocked.Decrement(ref ActiveCount) == 0)
        ewhAllDone.Set();
}

Idea

ActiveCount is initialized with 1, and then get incremented n times.

ActiveCount初始化为1,然后增加n次。

PostActionCheck is called n + 1 times. The last one will trigger the event.

PostActionCheck被称为n + 1次。最后一个将触发事件。

The benefit of this solution is that is uses a single kernel object (which is an event), and 2 * n + 1 calls of lightweight API's. (Can there be less ?)

这个解决方案的好处是使用单个内核对象(这是一个事件),以及轻量级API的2 * n + 1次调用。 (可以少吗?)

P.S.

I wrote the code here, I might have misspelled some class names.

我在这里写了代码,我可能拼错了一些类名。

#6


.NET 4.0 has a new class for that, Barrier.

.NET 4.0有一个新类,Barrier。

Aside from that your method isn't that bad, and you could optimize a little by only Pulsing if RunningWorkers is 0 after the decrement. That would look like:

除此之外,您的方法并不是那么糟糕,如果在减量后RunningWorkers为0,您可以仅通过Pulsing进行优化。那看起来像是:

this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable

foreach (string someString in arrayStrings)
{
     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     //Thread.Sleep(100);
}

// Waiting execution for all queued threads
Monitor.Wait(this.workerLocker);

// Method DoSomething() definition
public void DoSomething(object data)
{
    // Do a slow process...
    // ...

    lock (this.workerLocker)
    {
        this.RunningWorkers--;
        if (this.RunningWorkers == 0)
           Monitor.Pulse(this.workerLocker);
    }
}

You could use an EventWaithandle or AutoResetEvent but they are all wrapped Win32 API's. Since the Monitor class is pure managed code I would prefer a Monitor for this situation.

您可以使用EventWaithandle或AutoResetEvent,但它们都包含在Win32 API中。由于Monitor类是纯托管代码,因此我更喜欢Monitor。

#7


I'm not sure there is really, I've recently done something similar to scan each IP of a subnet for accepting a particular port.

我不确定是否真的,我最近做了类似的事情来扫描子网的每个IP以接受特定的端口。

A couple of things I can suggest that may improve performance:

我可以建议的一些事情可能会提高性能:

  • Use the SetMaxThreads method of ThreadPool to tweak performance (i.e. balancing having lots of threads running at once, against the locking time on such a large number of threads).

    使用ThreadPool的SetMaxThreads方法来调整性能(即平衡有大量线程一次运行,相对于如此大量线程的锁定时间)。

  • Don't sleep when setting up all the work items, there is no real need (that I am immediately aware of. Do however sleep inside the DoSomething method, perhaps only for a millisecond, to allow other threads to jump in there if need be.

    在设置所有工作项时不要睡觉,没有真正的需要(我立即意识到。但是在DoSomething方法中睡觉,可能只有一毫秒,以允许其他线程在那里跳入,如果需要的话。

I'm sure you could implement a more customised method yourself, but I doubt it would be more efficient than using the ThreadPool.

我相信你自己可以实现更自定义的方法,但我怀疑它比使用ThreadPool更有效。

P.S I'm not 100% clear on the reason for using monitor, since you are locking anyway? Note that the question is asked only because I haven't used the Monitor class previously, not because I'm actually doubting it's use.

P.S我不是100%清楚使用显示器的原因,因为你还在锁定?请注意,问题只是因为我之前没有使用过Monitor类,而不是因为我实际上怀疑它是否正在使用。

#8


Use Spring Threading. It has Barrier implementations built in.

使用Spring Threading。它内置了Barrier实现。

http://www.springsource.org/extensions/se-threading-net