TPL数据流:如何节流整个管道?

时间:2020-12-04 15:37:42

I want to limit the number of items posted in a Dataflow pipeline. The number of items depends of the production environment. These objects consume a large amount of memory (images) so I would like to post them when the last block of the pipeline has done its job.

我想限制在Dataflow管道中发布的项的数量。项目的数量取决于生产环境。这些对象消耗大量的内存(图像),所以当管道的最后一个块完成工作时,我希望将它们发布。

I tried to use a SemaphoreSlim to throttle the producer and release it in the last block of the pipeline. It works, but if an exception is raised during the process, the program waits forever and the exception is not intercepted.

我试着使用信号灯来控制生产者,并将其释放到管道的最后一块。它可以工作,但是如果在过程中引发异常,程序将一直等待,并且不会拦截异常。

Here is a sample which looks like our code. How can I do this ?

这是一个与我们的代码相似的示例。我该怎么做呢?

static void Main(string[] args)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);

    var downloadString = new TransformBlock<string, string>(uri =>
    {
        Console.WriteLine("Downloading '{0}'...", uri);
        return new WebClient().DownloadString(uri);
    });

    var createWordList = new TransformBlock<string, string[]>(text =>
    {
        Console.WriteLine("Creating word list...");

        char[] tokens = text.ToArray();
        for (int i = 0; i < tokens.Length; i++)
        {
            if (!char.IsLetter(tokens[i]))
                tokens[i] = ' ';
        }
        text = new string(tokens);

        return text.Split(new char[] { ' ' },
           StringSplitOptions.RemoveEmptyEntries);
    });

    var filterWordList = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Filtering word list...");
        throw new InvalidOperationException("ouch !"); // explicit for test
        return words.Where(word => word.Length > 3).OrderBy(word => word)
           .Distinct().ToArray();
    });

    var findPalindromes = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Finding palindromes...");

        var palindromes = new ConcurrentQueue<string>();

        Parallel.ForEach(words, word =>
        {
            string reverse = new string(word.Reverse().ToArray());

            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
                palindromes.Enqueue(word);
            }
        });

        return palindromes.ToArray();
    });

    var printPalindrome = new ActionBlock<string[]>(palindromes =>
    {
        try
        {
            foreach (string palindrome in palindromes)
            {
                Console.WriteLine("Found palindrome {0}/{1}",
                   palindrome, new string(palindrome.Reverse().ToArray()));
            }
        }
        finally
        {
            semaphore.Release();
        }
    });

    downloadString.LinkTo(createWordList);
    createWordList.LinkTo(filterWordList);
    filterWordList.LinkTo(findPalindromes);
    findPalindromes.LinkTo(printPalindrome);


    downloadString.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)createWordList).Fault(t.Exception);
        else createWordList.Complete();
    });
    createWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)filterWordList).Fault(t.Exception);
        else filterWordList.Complete();
    });
    filterWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
        else findPalindromes.Complete();
    });
    findPalindromes.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted)
            ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
        else printPalindrome.Complete();
    });

    try
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(i);

            downloadString.Post("http://www.google.com");
            semaphore.Wait(); // waits here when an exception throws
        }

        downloadString.Complete();

        printPalindrome.Completion.Wait();
    }
    catch (AggregateException agg)
    {
        Console.WriteLine("An error has occured : " + agg);
    }
    Console.WriteLine("Done");
    Console.ReadKey();
}

2 个解决方案

#1


2  

You should simply wait on both the semaphore and the completion task together. In that way if the block ends prematurely (either by exception or cancellation) then the exception will be rethrown and if not then you will wait on your semaphore until there's room to post more.

您应该简单地同时等待信号量和完成任务。这样,如果块提前结束(异常或取消),那么异常将被重新抛出,如果不是,那么您将等待信号量,直到有空间发布更多内容。

You can do that with Task.WhenAny and SemaphoreSlim.WaitAsync:

你可以用任务来做。WhenAny SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++)
{
    Console.WriteLine(i);
    downloadString.Post("http://www.google.com");

    if (printPalindrome.Completion.IsCompleted)
    {
        break;
    }

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}

Note: using Task.Wait is only appropriate in this case as it's Main. Usually this should be an async method and you should await the task returned from Task.WhenAny.

注意:使用的任务。在这种情况下,等待是适当的,因为它是主要的。通常这应该是一个异步方法,您应该等待从task返回的任务。

#2


0  

This is how I handled throttling or only allowing 10 items in the source block at any one time. You could modify this to have 1. Make sure that you also throttle any other blocks in the pipeline, otherwise, you could get the source block with 1 and the next block with a lot more.

这就是我处理节流的方式,或者一次只允许在源块中有10个项目。你可以把它修改成1。确保您也限制了管道中的任何其他块,否则,您可能会得到带有1的源块和带有更多块的下一个块。

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
        SingleProducerConstrained = true, 
        BoundedCapacity = 10 });

Then the producer does this:

然后制作人这样做:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);

If you're using async / await, just await the SendAsync call.

如果您正在使用async / wait,只需等待SendAsync调用。

#1


2  

You should simply wait on both the semaphore and the completion task together. In that way if the block ends prematurely (either by exception or cancellation) then the exception will be rethrown and if not then you will wait on your semaphore until there's room to post more.

您应该简单地同时等待信号量和完成任务。这样,如果块提前结束(异常或取消),那么异常将被重新抛出,如果不是,那么您将等待信号量,直到有空间发布更多内容。

You can do that with Task.WhenAny and SemaphoreSlim.WaitAsync:

你可以用任务来做。WhenAny SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++)
{
    Console.WriteLine(i);
    downloadString.Post("http://www.google.com");

    if (printPalindrome.Completion.IsCompleted)
    {
        break;
    }

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}

Note: using Task.Wait is only appropriate in this case as it's Main. Usually this should be an async method and you should await the task returned from Task.WhenAny.

注意:使用的任务。在这种情况下,等待是适当的,因为它是主要的。通常这应该是一个异步方法,您应该等待从task返回的任务。

#2


0  

This is how I handled throttling or only allowing 10 items in the source block at any one time. You could modify this to have 1. Make sure that you also throttle any other blocks in the pipeline, otherwise, you could get the source block with 1 and the next block with a lot more.

这就是我处理节流的方式,或者一次只允许在源块中有10个项目。你可以把它修改成1。确保您也限制了管道中的任何其他块,否则,您可能会得到带有1的源块和带有更多块的下一个块。

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
        SingleProducerConstrained = true, 
        BoundedCapacity = 10 });

Then the producer does this:

然后制作人这样做:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);

If you're using async / await, just await the SendAsync call.

如果您正在使用async / wait,只需等待SendAsync调用。