- I have a method that uses a connection (e.g. a method that downloads a page).
- I have to execute this method multiple times (e.g. download 1000 pages).
- Doing it the synchronous and sequential way takes a long time.
- I have limited resources ( 8 max threads and/or 50 max simultaneous connections )
- I want to use all resources to accelerate it.
- I know that parallelization (PLINQ, Parallel Extensions, etc.) could solve the problem, but I have already tried it, and this approach fails because of the scarce resources.
- I don't want to reinvent the wheel that parallelizes this kind of task while managing the resources, someone must have done it before and must have provided a library/tutorial for this.
我有一个使用连接的方法(例如下载页面的方法)。
我必须多次执行此方法(例如下载1000页)。
以同步和顺序方式执行此操作需要很长时间。
我的资源有限(8个最大线程和/或50个最大同时连接)
我想用所有资源来加速它。
我知道并行化(PLINQ,Parallel Extensions等)可以解决这个问题,但我已经尝试过了,由于资源稀缺,这种方法失败了。
我不想在管理资源的同时重新发明并行化这种任务的*,有人必须在此之前完成它,并且必须为此提供库/教程。
Can anyone help?
有人可以帮忙吗?
Update Things get much more complicated when you start to mix Asynchronous calls with Parallelization for maximum performance. This is implemented on several downloaders, like Firefox downloader, it gets 2 downloads simultaneously and when one of them is complete it gets the next file and so on. Maybe it seems very simple to implement, but when I implemented it I had, and still have, trouble to make it generic (useful for WebRequest and DbCommand) and to deal with problems (ie. timeouts)
当您开始将异步调用与Parallelization混合以获得最佳性能时,更新事情变得更加复杂。这是在几个下载器上实现的,比如Firefox下载器,它同时获得2次下载,当其中一个完成时,它会获得下一个文件,依此类推。也许它似乎很容易实现,但是当我实现它时,我已经并且仍然有麻烦使它成为通用的(对WebRequest和DbCommand有用)并处理问题(即超时)
Bounty Hunters The bounty will be granted to the first one that links a reliable and free($$) .NET library that provides a simple C# way to parallelize async tasks as HttpWebRequests.BegingetResponse and SqlCommand.BeginExecuteNonQuery. The parallelization must not wait for N tasks to complete to then start the next N, but it must start a new task as soon as one of the N initial ones finishes. The method must was provide timeout handling.
赏金猎人赏金将被授予第一个链接可靠和免费($$).NET库的赏金,它提供了一种简单的C#方式来并行化异步任务,如HttpWebRequests.BegingetResponse和SqlCommand.BeginExecuteNonQuery。并行化不能等待N个任务完成然后启动下一个N,但是一旦N个初始任务中的一个完成,它必须立即开始新任务。该方法必须提供超时处理。
11 个解决方案
#1
Can you give more information why Parallel Linq won't work?
您能否提供更多信息为什么Parallel Linq不起作用?
My point of view, your task is best suit with PLinq. If you run on 8 cores machine, PLinq will split to 8 tasks, and queue all remaining tasks for you.
我的观点是,你的任务最适合PLinq。如果您在8核计算机上运行,PLinq将拆分为8个任务,并为您排队所有剩余的任务。
Here is draft code,
这是草案代码,
PagesToDownload.AsParallel().ForAll(DownloadMethodWithLimitConnections);
I don't understand why PLinq consume up your resources. Based on my test, PLinq performance is even better than using ThreadPool.
我不明白为什么PLinq消耗你的资源。根据我的测试,PLinq性能甚至比使用ThreadPool更好。
#2
Look into a counting semaphore for the connections. http://en.wikipedia.org/wiki/Semaphore_(programming)
查看连接的计数信号量。 http://en.wikipedia.org/wiki/Semaphore_(programming)
EDIT: To answer your comment the .NET Framework has one already. http://msdn.microsoft.com/en-us/library/system.threading.semaphore.aspx
编辑:要回答您的评论,.NET Framework已经有了。 http://msdn.microsoft.com/en-us/library/system.threading.semaphore.aspx
#3
See the CCR. This is the 'right' way to do it although you may find the libraries learning curve a bit to much...
见CCR。尽管您可能会发现库的学习曲线有点多,但这是“正确”的方法。
#4
You could use the .NET System.Threading.ThreadPool
class. You can set the maximum number of threads to be active at any one time using ThreadPool.SetMaxThreads()
.
您可以使用.NET System.Threading.ThreadPool类。您可以使用ThreadPool.SetMaxThreads()在任何时候设置要激活的最大线程数。
#5
Here's what I don't get: you say max 50 connections, but only 8 threads. Each connection by definition "occupies" / runs in a thread. I mean, you're not using DMA or any sort of other magic to take the load off the CPU, so each transfer needs an execution context. If you can launch 50 async requests at once, fine, great, do that -- you should be able to launch them all from the same thread, since calling an async read function takes essentially no time at all. If you e.g. have 8 cores and want to make sure an entire core is dedicated to each transfer (that would probably be dumb, but it's your code, so...), you can only run 8 transfers at once.
这是我没有得到的:你说最多50个连接,但只有8个线程。根据定义,每个连接“占用”/在线程中运行。我的意思是,你没有使用DMA或任何其他魔法来消除CPU的负载,因此每次传输都需要执行上下文。如果你可以同时启动50个异步请求,那很好,很棒,你应该能够从同一个线程启动它们,因为调用异步读取函数基本上没有时间。如果你是有8个核心,并希望确保整个核心专用于每次传输(可能是愚蠢的,但它是你的代码,所以......),你一次只能运行8次传输。
My suggestion is to just launch 50 async requests, inside a sync block so that they all start before you allow any of them to complete (simplifies the math). Then, use a count semaphore as suggested by Jeremy or a synchronized Queue as suggested by mbeckish to keep track of the work remaining. At the end of your async-complete callback, launch the next connection (if appropriate). That is, start 50 connections, then when one finishes, use the "completed" event handler to launch the next one, until all the work is done. This shouldn't need any kind of additional libraries or frameworks.
我的建议是在同步块内启动50个异步请求,以便在允许其中任何一个完成之前启动它们(简化数学运算)。然后,使用Jeremy建议的计数信号量或mbeckish建议的同步队列来跟踪剩余的工作。在异步完成回调结束时,启动下一个连接(如果适用)。也就是说,启动50个连接,然后当一个完成时,使用“已完成”事件处理程序启动下一个,直到完成所有工作。这不应该需要任何其他库或框架。
#6
-
Create a data structure to keep track of what pages have been fetched, and what still needs to be fetched. e.g. a queue
创建数据结构以跟踪已提取的页面以及仍需要提取的内容。例如一个队列
-
Using the Producer/Consumer Queue pattern, dispatch 8 consumer threads to do your fetches. That way, you know that you will never exceed your 8 thread limit.
使用Producer / Consumer Queue模式,调度8个使用者线程来执行提取。这样,你知道你永远不会超过你的8线程限制。
See here for a good example.
在这里看一个很好的例子。
#7
I would strongly recommend staying away from the threadpool except for very short tasks. If you choose to use a semaphore make sure that you only block in the code that is queuing the work items, not at the start of the workitem code or you will quickly dead lock the thread pool if your (semaphore max count * 2) is greater than max pool threads.
我强烈建议远离线程池,除非是非常短的任务。如果您选择使用信号量,请确保您只阻止对工作项进行排队的代码,而不是在工作项代码的开头,或者如果您的(信号量最大值* 2)是,则您将很快锁定线程池大于最大池线程数。
In practice you really can never safely acquire a lock on a pool thread, nor can you safely make calls to most async APIs(or sync APIs like HttpWebRequest.GetResponse as it also performs async ops under its covers on the thread pool).
在实践中,您实际上永远无法安全地获取池线程上的锁,也无法安全地调用大多数异步API(或同步API,如HttpWebRequest.GetResponse,因为它还在线程池的掩盖下执行异步操作)。
#8
Jeffrey Richter has a Power Threading Library that might help you. Its chock full of samples and is pretty powerful. I couldn't find a quick sample with connections but there are plenty of examples that might work for you in regards to coordinating multiple asynchronous operations.
Jeffrey Richter有一个可以帮助你的Power Threading Library。它充满了样品,非常强大。我找不到带有连接的快速示例,但是在协调多个异步操作方面有很多可能对您有用的示例。
It can be downloaded from here and there are several articles and samples here. Also, this link has a detailed article from Jeffrey explaining concurrent asynchronous operations.
它可以从这里下载,这里有几篇文章和样本。此外,此链接还有一篇来自Jeffrey的详细文章,解释了并发异步操作。
#9
The async WebRequest methods can appear slugggish because they block while performing DNS lookup, then switch to asynchronous behaviour. Having followed this path myself, it seems inefficient to spin up eight threads to feed requests into an API that already spins up threads to do the bulk of the work. You might reconsider some of your approaches bearing this shortcoming of the async WebRequest API. Our solution eventually involved using the synchronous API, each one on its own thread. I'd be interested in anyone commenting on the correctness of this approach.
异步WebRequest方法可能看起来很迟钝,因为它们在执行DNS查找时会阻塞,然后切换到异步行为。自己遵循这条路径,启动八个线程将请求提供给已经旋转线程来完成大部分工作的API似乎效率低下。您可能会重新考虑一些带有异步WebRequest API缺点的方法。我们的解决方案最终涉及使用同步API,每个API都在自己的线程上。我对任何评论这种方法正确性的人都感兴趣。
#10
This is how you'd do it with the base class library in .net 3.5: The call to SetMinThreads is optional - see what happens with & without it.
这就是你如何使用.net 3.5中的基类库:对SetMinThreads的调用是可选的 - 看看有没有它会发生什么。
You should handle timeouts within your replacement to DoSomethingThatsSlow
您应该在替换DoSomethingThatsSlow中处理超时
public class ThrottledParallelRunnerTest
{
public static void Main()
{
//since the process is just starting up, we need to boost this
ThreadPool.SetMinThreads(10, 10);
IEnumerable<string> args = from i in Enumerable.Range(1, 100)
select "task #" + i;
ThrottledParallelRun(DoSomethingThatsSlow, args, 8);
}
public static void DoSomethingThatsSlow(string urlOrWhatever)
{
Console.Out.WriteLine("{1}: began {0}", urlOrWhatever, DateTime.Now.Ticks);
Thread.Sleep(500);
Console.Out.WriteLine("{1}: ended {0}", urlOrWhatever, DateTime.Now.Ticks);
}
private static void ThrottledParallelRun<T>(Action<T> action, IEnumerable<T> args, int maxThreads)
{
//this thing looks after the throttling
Semaphore semaphore = new Semaphore(maxThreads, maxThreads);
//wrap the action in a try/finally that releases the semaphore
Action<T> releasingAction = a =>
{
try
{
action(a);
}
finally
{
semaphore.Release();
}
};
//store all the IAsyncResult - will help prevent method from returning before completion
List<IAsyncResult> results = new List<IAsyncResult>();
foreach (T a in args)
{
semaphore.WaitOne();
results.Add(releasingAction.BeginInvoke(a, null, null));
}
//now let's make sure everything's returned. Maybe collate exceptions here?
foreach (IAsyncResult result in results)
{
releasingAction.EndInvoke(result);
}
}
}
#11
You should take a look at F# asynchronous workflows.
您应该看看F#异步工作流程。
You really don't want your code to be parallel but asynchronous
你真的不希望你的代码是并行的,而是异步的
asynchronous refers to programs that perform some long running operations that don't necessary block a calling thread, for example accessing the network, calling web services or performing any other I/O operation in general
异步是指执行一些长时间运行的操作的程序,这些操作不必阻塞调用线程,例如访问网络,调用Web服务或执行任何其他I / O操作
This is a very interesting article on this concept explained using C# iterators.
这是一篇关于使用C#迭代器解释的这个概念的非常有趣的文章。
This is a great book about F# and asynchronous programming.
这是一本关于F#和异步编程的好书。
The learning curve is very bad (a lot of odd stuff: F# syntax, the Async<'a> type , monads, etc.) but is a VERY powerful approach and can be used in real life with great C# interop.
学习曲线非常糟糕(很多奇怪的东西:F#语法,Async <'a>类型,monad等),但是它是一种非常强大的方法,可以在现实生活中使用很棒的C#interop。
The main idea here is continuation: while your're wating for some I/O operations let your threads do something else!
这里的主要思想是延续:当你为某些I / O操作做好准备时,让你的线程做一些其他事情!
#1
Can you give more information why Parallel Linq won't work?
您能否提供更多信息为什么Parallel Linq不起作用?
My point of view, your task is best suit with PLinq. If you run on 8 cores machine, PLinq will split to 8 tasks, and queue all remaining tasks for you.
我的观点是,你的任务最适合PLinq。如果您在8核计算机上运行,PLinq将拆分为8个任务,并为您排队所有剩余的任务。
Here is draft code,
这是草案代码,
PagesToDownload.AsParallel().ForAll(DownloadMethodWithLimitConnections);
I don't understand why PLinq consume up your resources. Based on my test, PLinq performance is even better than using ThreadPool.
我不明白为什么PLinq消耗你的资源。根据我的测试,PLinq性能甚至比使用ThreadPool更好。
#2
Look into a counting semaphore for the connections. http://en.wikipedia.org/wiki/Semaphore_(programming)
查看连接的计数信号量。 http://en.wikipedia.org/wiki/Semaphore_(programming)
EDIT: To answer your comment the .NET Framework has one already. http://msdn.microsoft.com/en-us/library/system.threading.semaphore.aspx
编辑:要回答您的评论,.NET Framework已经有了。 http://msdn.microsoft.com/en-us/library/system.threading.semaphore.aspx
#3
See the CCR. This is the 'right' way to do it although you may find the libraries learning curve a bit to much...
见CCR。尽管您可能会发现库的学习曲线有点多,但这是“正确”的方法。
#4
You could use the .NET System.Threading.ThreadPool
class. You can set the maximum number of threads to be active at any one time using ThreadPool.SetMaxThreads()
.
您可以使用.NET System.Threading.ThreadPool类。您可以使用ThreadPool.SetMaxThreads()在任何时候设置要激活的最大线程数。
#5
Here's what I don't get: you say max 50 connections, but only 8 threads. Each connection by definition "occupies" / runs in a thread. I mean, you're not using DMA or any sort of other magic to take the load off the CPU, so each transfer needs an execution context. If you can launch 50 async requests at once, fine, great, do that -- you should be able to launch them all from the same thread, since calling an async read function takes essentially no time at all. If you e.g. have 8 cores and want to make sure an entire core is dedicated to each transfer (that would probably be dumb, but it's your code, so...), you can only run 8 transfers at once.
这是我没有得到的:你说最多50个连接,但只有8个线程。根据定义,每个连接“占用”/在线程中运行。我的意思是,你没有使用DMA或任何其他魔法来消除CPU的负载,因此每次传输都需要执行上下文。如果你可以同时启动50个异步请求,那很好,很棒,你应该能够从同一个线程启动它们,因为调用异步读取函数基本上没有时间。如果你是有8个核心,并希望确保整个核心专用于每次传输(可能是愚蠢的,但它是你的代码,所以......),你一次只能运行8次传输。
My suggestion is to just launch 50 async requests, inside a sync block so that they all start before you allow any of them to complete (simplifies the math). Then, use a count semaphore as suggested by Jeremy or a synchronized Queue as suggested by mbeckish to keep track of the work remaining. At the end of your async-complete callback, launch the next connection (if appropriate). That is, start 50 connections, then when one finishes, use the "completed" event handler to launch the next one, until all the work is done. This shouldn't need any kind of additional libraries or frameworks.
我的建议是在同步块内启动50个异步请求,以便在允许其中任何一个完成之前启动它们(简化数学运算)。然后,使用Jeremy建议的计数信号量或mbeckish建议的同步队列来跟踪剩余的工作。在异步完成回调结束时,启动下一个连接(如果适用)。也就是说,启动50个连接,然后当一个完成时,使用“已完成”事件处理程序启动下一个,直到完成所有工作。这不应该需要任何其他库或框架。
#6
-
Create a data structure to keep track of what pages have been fetched, and what still needs to be fetched. e.g. a queue
创建数据结构以跟踪已提取的页面以及仍需要提取的内容。例如一个队列
-
Using the Producer/Consumer Queue pattern, dispatch 8 consumer threads to do your fetches. That way, you know that you will never exceed your 8 thread limit.
使用Producer / Consumer Queue模式,调度8个使用者线程来执行提取。这样,你知道你永远不会超过你的8线程限制。
See here for a good example.
在这里看一个很好的例子。
#7
I would strongly recommend staying away from the threadpool except for very short tasks. If you choose to use a semaphore make sure that you only block in the code that is queuing the work items, not at the start of the workitem code or you will quickly dead lock the thread pool if your (semaphore max count * 2) is greater than max pool threads.
我强烈建议远离线程池,除非是非常短的任务。如果您选择使用信号量,请确保您只阻止对工作项进行排队的代码,而不是在工作项代码的开头,或者如果您的(信号量最大值* 2)是,则您将很快锁定线程池大于最大池线程数。
In practice you really can never safely acquire a lock on a pool thread, nor can you safely make calls to most async APIs(or sync APIs like HttpWebRequest.GetResponse as it also performs async ops under its covers on the thread pool).
在实践中,您实际上永远无法安全地获取池线程上的锁,也无法安全地调用大多数异步API(或同步API,如HttpWebRequest.GetResponse,因为它还在线程池的掩盖下执行异步操作)。
#8
Jeffrey Richter has a Power Threading Library that might help you. Its chock full of samples and is pretty powerful. I couldn't find a quick sample with connections but there are plenty of examples that might work for you in regards to coordinating multiple asynchronous operations.
Jeffrey Richter有一个可以帮助你的Power Threading Library。它充满了样品,非常强大。我找不到带有连接的快速示例,但是在协调多个异步操作方面有很多可能对您有用的示例。
It can be downloaded from here and there are several articles and samples here. Also, this link has a detailed article from Jeffrey explaining concurrent asynchronous operations.
它可以从这里下载,这里有几篇文章和样本。此外,此链接还有一篇来自Jeffrey的详细文章,解释了并发异步操作。
#9
The async WebRequest methods can appear slugggish because they block while performing DNS lookup, then switch to asynchronous behaviour. Having followed this path myself, it seems inefficient to spin up eight threads to feed requests into an API that already spins up threads to do the bulk of the work. You might reconsider some of your approaches bearing this shortcoming of the async WebRequest API. Our solution eventually involved using the synchronous API, each one on its own thread. I'd be interested in anyone commenting on the correctness of this approach.
异步WebRequest方法可能看起来很迟钝,因为它们在执行DNS查找时会阻塞,然后切换到异步行为。自己遵循这条路径,启动八个线程将请求提供给已经旋转线程来完成大部分工作的API似乎效率低下。您可能会重新考虑一些带有异步WebRequest API缺点的方法。我们的解决方案最终涉及使用同步API,每个API都在自己的线程上。我对任何评论这种方法正确性的人都感兴趣。
#10
This is how you'd do it with the base class library in .net 3.5: The call to SetMinThreads is optional - see what happens with & without it.
这就是你如何使用.net 3.5中的基类库:对SetMinThreads的调用是可选的 - 看看有没有它会发生什么。
You should handle timeouts within your replacement to DoSomethingThatsSlow
您应该在替换DoSomethingThatsSlow中处理超时
public class ThrottledParallelRunnerTest
{
public static void Main()
{
//since the process is just starting up, we need to boost this
ThreadPool.SetMinThreads(10, 10);
IEnumerable<string> args = from i in Enumerable.Range(1, 100)
select "task #" + i;
ThrottledParallelRun(DoSomethingThatsSlow, args, 8);
}
public static void DoSomethingThatsSlow(string urlOrWhatever)
{
Console.Out.WriteLine("{1}: began {0}", urlOrWhatever, DateTime.Now.Ticks);
Thread.Sleep(500);
Console.Out.WriteLine("{1}: ended {0}", urlOrWhatever, DateTime.Now.Ticks);
}
private static void ThrottledParallelRun<T>(Action<T> action, IEnumerable<T> args, int maxThreads)
{
//this thing looks after the throttling
Semaphore semaphore = new Semaphore(maxThreads, maxThreads);
//wrap the action in a try/finally that releases the semaphore
Action<T> releasingAction = a =>
{
try
{
action(a);
}
finally
{
semaphore.Release();
}
};
//store all the IAsyncResult - will help prevent method from returning before completion
List<IAsyncResult> results = new List<IAsyncResult>();
foreach (T a in args)
{
semaphore.WaitOne();
results.Add(releasingAction.BeginInvoke(a, null, null));
}
//now let's make sure everything's returned. Maybe collate exceptions here?
foreach (IAsyncResult result in results)
{
releasingAction.EndInvoke(result);
}
}
}
#11
You should take a look at F# asynchronous workflows.
您应该看看F#异步工作流程。
You really don't want your code to be parallel but asynchronous
你真的不希望你的代码是并行的,而是异步的
asynchronous refers to programs that perform some long running operations that don't necessary block a calling thread, for example accessing the network, calling web services or performing any other I/O operation in general
异步是指执行一些长时间运行的操作的程序,这些操作不必阻塞调用线程,例如访问网络,调用Web服务或执行任何其他I / O操作
This is a very interesting article on this concept explained using C# iterators.
这是一篇关于使用C#迭代器解释的这个概念的非常有趣的文章。
This is a great book about F# and asynchronous programming.
这是一本关于F#和异步编程的好书。
The learning curve is very bad (a lot of odd stuff: F# syntax, the Async<'a> type , monads, etc.) but is a VERY powerful approach and can be used in real life with great C# interop.
学习曲线非常糟糕(很多奇怪的东西:F#语法,Async <'a>类型,monad等),但是它是一种非常强大的方法,可以在现实生活中使用很棒的C#interop。
The main idea here is continuation: while your're wating for some I/O operations let your threads do something else!
这里的主要思想是延续:当你为某些I / O操作做好准备时,让你的线程做一些其他事情!