如何限制.Net中的异步功能?

时间:2021-05-09 01:11:37

I'm using async-await in .Net. How can I limit the number of concurrent asynchronous calls?

我在.Net中使用async-await。如何限制并发异步调用的数量?

4 个解决方案

#1


6  

One relatively simple way is to (ab)use TPL Dataflow. Something like:

一种相对简单的方法是(ab)使用TPL Dataflow。就像是:

public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
    IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
    int maxDegreeOfParallelism)
{
    var outputs = new ConcurrentQueue<TOutput>();

    var block = new ActionBlock<TInput>(
        async x => outputs.Enqueue(await asyncFunction(x)),
        new ExecutionDataflowBlockOptions
        { MaxDgreeOfParallelism = maxDegreeOfParallelism });

    foreach (var input in inputs)
        block.Send(input);

    block.Complete();
    block.Completion.Wait();

    return outputs.ToArray();
}

#2


0  

Depending on the code, the simplest approach might be using Parallel.For(Each) and specify the max parallelism in the parallel options.

根据代码,最简单的方法可能是使用Parallel.For(Each)并在并行选项中指定max parallelism。

#3


0  

Note: I leave this here for legacy. Don't do it this way because there will be too many tasks waiting on the WhenAny simultaneously. And the stack will get deep.

注意:我留在这里留下遗产。不要这样做,因为会有太多任务同时等待WhenAny。堆栈会越来越深。

Based on this code by Stephen Toub:

根据Stephen Toub的代码:

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

I wrote this:

我写了这个:

Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
    If Not ThrottleGroups.ContainsKey(GroupId) Then
        ThrottleGroups.Add(GroupId, New List(Of Task))
    End If
    If ThrottleGroups(GroupId).Count < MaxCount Then
        Dim NewTask As Task(Of TResult) = f()
        ThrottleGroups(GroupId).Add(NewTask)
        Return Await NewTask
    Else
        Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
        ThrottleGroups(GroupId).Remove(FinishedTask)
        Return Await ThrottleAsync(f, GroupId, MaxCount)
    End If
End Function

To use, just replace:

要使用,只需替换:

ExampleTaskAsync(param1, param2)

with:

有:

Dim f As Func(Of Task(Of Integer))
f = Function()
        Return ExampleAsync(param1, param2)
    End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)

Notice that we have to wrap the call to the task in a function f because otherwise we would be already starting the Task. The second parameter to ThrottleAsync is any object that identifies the "group"; I used a string. All asynchronous tasks in the same "group" are limited to CONCURRENT_TASKS tasks, in this case 4.

请注意,我们必须在函数f中包含对任务的调用,否则我们将启动Task。 ThrottleAsync的第二个参数是标识“组”的任何对象;我用了一根绳子。同一“组”中的所有异步任务仅限于CONCURRENT_TASKS任务,在本例中为4。

Here's sample code that show how only four threads run at a time. All Ready! displays immediately because the subroutine is asynchronous. Also, even if the threads start or end out of order, the "output" lines will still be in the same order as the input.

这是示例代码,显示一次只运行四个线程。一切就绪!立即显示,因为子例程是异步的。此外,即使线程开始或结束不按顺序,“输出”行仍将与输入的顺序相同。

Dim results As New List(Of Task(Of Integer))
    For i As Integer = 0 To 20
        Dim j As Integer = i
        Dim f As Func(Of Task(Of Integer))
        f = Function() As Task(Of Integer)
                Return Task.Run(Function() As Integer
                                    Debug.WriteLine(DateTime.Now & "Starting " & j)
                                    System.Threading.Thread.Sleep(5000)
                                    Debug.WriteLine(DateTime.Now & "Ending " & j)
                                    Return j
                                End Function)
            End Function
        Const CONCURRENT_UPLOADS As Integer = 4
        results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
    Next
    Debug.WriteLine("all ready!")
    For Each x As Task(Of Integer) In results
        Debug.WriteLine(DateTime.Now & "Output: " & Await x)
    Next

#4


0  

I like this technique better. I'm using TaskCompletionSource to create output tasks for the incoming tasks. This is necessary because I want to return a Task before I even run it! The class below associates each input Func(of Task(of Object)) with a TaskCompletionSource which is returned immediately and puts them into a queue.

我更喜欢这种技术。我正在使用TaskCompletionSource为传入的任务创建输出任务。这是必要的,因为我想在运行它之前返回一个Task!下面的类将每个输入Func(Task(of Object))与TaskCompletionSource相关联,该TaskCompletionSource立即返回并将它们放入队列中。

Elements from the queue are dequeued into a list of running tasks and a continuation sets the TaskCompletionSource. An invocation to WhenAny in a loop makes sure to move elements from the queue to the running list when room frees up. There's also a check to make sure that there isn't more than one WhenAny at a time, though it might have concurrency issues.

队列中的元素出列到正在运行的任务列表中,并且continuation设置TaskCompletionSource。在循环中调用WhenAny可确保在释放空间时将元素从队列移动到运行列表。还有一个检查,以确保一次只有一个WhenAny,虽然它可能有并发问题。

To use, just replace synchronous functions like this:

要使用,只需替换这样的同步函数:

Task.Run(AddressOf MySyncFunction) 'possibly many of these

with this:

有了这个:

Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.

For functions which already return a Task, it's important to convert those into functions that return Task so that the thottler can run them. Replace:

对于已经返回Task的函数,将它们转换为返回Task的函数非常重要,这样thottler就可以运行它们。更换:

NewTask = MyFunctionAsync()

with:

有:

NewTask = t1.Run(Function () return MyFunctionAsync())

The class below also implements many different signatures for Throttler.Run() depending on whether the function is sync/async, has/hasn't input, has/hasn't output. Converting Task to Task(Of Output) is especially tricky!

下面的类还为Throttler.Run()实现了许多不同的签名,具体取决于函数是同步/异步,有/没有输入,有/没有输出。将任务转换为任务(输出)特别棘手!

Class Throttler
    Property MaxCount As Integer

    Sub New(Optional MaxCount As Integer = 1)
        Me.MaxCount = MaxCount
    End Sub

    Private Running As New List(Of Task)
    Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
    Private AlreadyWaiting As Boolean

    Async Sub MakeWaiter()
        If AlreadyWaiting Then Exit Sub
        AlreadyWaiting = True
        Do While Waiting.Count > 0
            Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
            Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
                Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
                Dim NewTask As Task(Of Object) = NewFunc()
                Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
                NewTask.ContinueWith(Sub(t2 As Task(Of Object))
                                         CurrentTcs.SetResult(t2.Result)
                                     End Sub)
                Running.Add(NewTask)
            Loop
            If Waiting.Count > 0 Then
                Dim Waiter As Task(Of Task)
                Waiter = Task.WhenAny(Running)
                Dim FinishedTask As Task = Await Waiter
                Await FinishedTask
                Running.Remove(FinishedTask)
            End If
        Loop
        AlreadyWaiting = False
    End Sub

    Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
        Dim NewTcs As New TaskCompletionSource(Of Object)
        Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
        MakeWaiter()
        Return NewTcs.Task
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return f(input)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f(input)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Func(Of Task)) As Task
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f().ContinueWith(Function(t As task) As Object
                                               Return Nothing
                                           End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function() As Object
                                       Return f(input)
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f(input)
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(f As Func(Of Object)) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function()
                                       Return f()
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Action) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f()
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function
End Class

#1


6  

One relatively simple way is to (ab)use TPL Dataflow. Something like:

一种相对简单的方法是(ab)使用TPL Dataflow。就像是:

public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
    IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
    int maxDegreeOfParallelism)
{
    var outputs = new ConcurrentQueue<TOutput>();

    var block = new ActionBlock<TInput>(
        async x => outputs.Enqueue(await asyncFunction(x)),
        new ExecutionDataflowBlockOptions
        { MaxDgreeOfParallelism = maxDegreeOfParallelism });

    foreach (var input in inputs)
        block.Send(input);

    block.Complete();
    block.Completion.Wait();

    return outputs.ToArray();
}

#2


0  

Depending on the code, the simplest approach might be using Parallel.For(Each) and specify the max parallelism in the parallel options.

根据代码,最简单的方法可能是使用Parallel.For(Each)并在并行选项中指定max parallelism。

#3


0  

Note: I leave this here for legacy. Don't do it this way because there will be too many tasks waiting on the WhenAny simultaneously. And the stack will get deep.

注意:我留在这里留下遗产。不要这样做,因为会有太多任务同时等待WhenAny。堆栈会越来越深。

Based on this code by Stephen Toub:

根据Stephen Toub的代码:

const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
    nextIndex++;
}

while(imageTasks.Count > 0)
{
    try
    {
        Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
        imageTasks.Remove(imageTask);

        Bitmap image = await imageTask;
        panel.AddImage(image);
    }
    catch(Exception exc) { Log(exc); }

    if (nextIndex < urls.Length)
    {
        imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
        nextIndex++;
    }
}

I wrote this:

我写了这个:

Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
    If Not ThrottleGroups.ContainsKey(GroupId) Then
        ThrottleGroups.Add(GroupId, New List(Of Task))
    End If
    If ThrottleGroups(GroupId).Count < MaxCount Then
        Dim NewTask As Task(Of TResult) = f()
        ThrottleGroups(GroupId).Add(NewTask)
        Return Await NewTask
    Else
        Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
        ThrottleGroups(GroupId).Remove(FinishedTask)
        Return Await ThrottleAsync(f, GroupId, MaxCount)
    End If
End Function

To use, just replace:

要使用,只需替换:

ExampleTaskAsync(param1, param2)

with:

有:

Dim f As Func(Of Task(Of Integer))
f = Function()
        Return ExampleAsync(param1, param2)
    End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)

Notice that we have to wrap the call to the task in a function f because otherwise we would be already starting the Task. The second parameter to ThrottleAsync is any object that identifies the "group"; I used a string. All asynchronous tasks in the same "group" are limited to CONCURRENT_TASKS tasks, in this case 4.

请注意,我们必须在函数f中包含对任务的调用,否则我们将启动Task。 ThrottleAsync的第二个参数是标识“组”的任何对象;我用了一根绳子。同一“组”中的所有异步任务仅限于CONCURRENT_TASKS任务,在本例中为4。

Here's sample code that show how only four threads run at a time. All Ready! displays immediately because the subroutine is asynchronous. Also, even if the threads start or end out of order, the "output" lines will still be in the same order as the input.

这是示例代码,显示一次只运行四个线程。一切就绪!立即显示,因为子例程是异步的。此外,即使线程开始或结束不按顺序,“输出”行仍将与输入的顺序相同。

Dim results As New List(Of Task(Of Integer))
    For i As Integer = 0 To 20
        Dim j As Integer = i
        Dim f As Func(Of Task(Of Integer))
        f = Function() As Task(Of Integer)
                Return Task.Run(Function() As Integer
                                    Debug.WriteLine(DateTime.Now & "Starting " & j)
                                    System.Threading.Thread.Sleep(5000)
                                    Debug.WriteLine(DateTime.Now & "Ending " & j)
                                    Return j
                                End Function)
            End Function
        Const CONCURRENT_UPLOADS As Integer = 4
        results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
    Next
    Debug.WriteLine("all ready!")
    For Each x As Task(Of Integer) In results
        Debug.WriteLine(DateTime.Now & "Output: " & Await x)
    Next

#4


0  

I like this technique better. I'm using TaskCompletionSource to create output tasks for the incoming tasks. This is necessary because I want to return a Task before I even run it! The class below associates each input Func(of Task(of Object)) with a TaskCompletionSource which is returned immediately and puts them into a queue.

我更喜欢这种技术。我正在使用TaskCompletionSource为传入的任务创建输出任务。这是必要的,因为我想在运行它之前返回一个Task!下面的类将每个输入Func(Task(of Object))与TaskCompletionSource相关联,该TaskCompletionSource立即返回并将它们放入队列中。

Elements from the queue are dequeued into a list of running tasks and a continuation sets the TaskCompletionSource. An invocation to WhenAny in a loop makes sure to move elements from the queue to the running list when room frees up. There's also a check to make sure that there isn't more than one WhenAny at a time, though it might have concurrency issues.

队列中的元素出列到正在运行的任务列表中,并且continuation设置TaskCompletionSource。在循环中调用WhenAny可确保在释放空间时将元素从队列移动到运行列表。还有一个检查,以确保一次只有一个WhenAny,虽然它可能有并发问题。

To use, just replace synchronous functions like this:

要使用,只需替换这样的同步函数:

Task.Run(AddressOf MySyncFunction) 'possibly many of these

with this:

有了这个:

Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.

For functions which already return a Task, it's important to convert those into functions that return Task so that the thottler can run them. Replace:

对于已经返回Task的函数,将它们转换为返回Task的函数非常重要,这样thottler就可以运行它们。更换:

NewTask = MyFunctionAsync()

with:

有:

NewTask = t1.Run(Function () return MyFunctionAsync())

The class below also implements many different signatures for Throttler.Run() depending on whether the function is sync/async, has/hasn't input, has/hasn't output. Converting Task to Task(Of Output) is especially tricky!

下面的类还为Throttler.Run()实现了许多不同的签名,具体取决于函数是同步/异步,有/没有输入,有/没有输出。将任务转换为任务(输出)特别棘手!

Class Throttler
    Property MaxCount As Integer

    Sub New(Optional MaxCount As Integer = 1)
        Me.MaxCount = MaxCount
    End Sub

    Private Running As New List(Of Task)
    Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
    Private AlreadyWaiting As Boolean

    Async Sub MakeWaiter()
        If AlreadyWaiting Then Exit Sub
        AlreadyWaiting = True
        Do While Waiting.Count > 0
            Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
            Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
                Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
                Dim NewTask As Task(Of Object) = NewFunc()
                Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
                NewTask.ContinueWith(Sub(t2 As Task(Of Object))
                                         CurrentTcs.SetResult(t2.Result)
                                     End Sub)
                Running.Add(NewTask)
            Loop
            If Waiting.Count > 0 Then
                Dim Waiter As Task(Of Task)
                Waiter = Task.WhenAny(Running)
                Dim FinishedTask As Task = Await Waiter
                Await FinishedTask
                Running.Remove(FinishedTask)
            End If
        Loop
        AlreadyWaiting = False
    End Sub

    Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
        Dim NewTcs As New TaskCompletionSource(Of Object)
        Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
        MakeWaiter()
        Return NewTcs.Task
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return f(input)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f(input)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Func(Of Task)) As Task
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return f().ContinueWith(Function(t As task) As Object
                                               Return Nothing
                                           End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function() As Object
                                       Return f(input)
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f(input)
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function

    Function Run(f As Func(Of Object)) As Task(Of Object)
        Dim NewF As Func(Of Task(Of Object))
        NewF = Function() As Task(Of Object)
                   Return Task.Run(Function()
                                       Return f()
                                   End Function)
               End Function
        Return CType(Me.Run(NewF), Task(Of Object))
    End Function

    Function Run(f As Action) As Task
        Dim NewF As Func(Of Task)
        NewF = Function() As Task
                   Return Task.Run(Sub()
                                       f()
                                   End Sub)
               End Function
        Return Me.Run(NewF)
    End Function
End Class