手动控制执行任务的顺序

时间:2021-01-21 02:20:21

I have a RabbitMQ queue that I read asyncrounusly in batches, but I must preserve order of these messages. I have a field named ServiceNumber which defines a unique number of the message, and this order I have to keep.

我有一个RabbitMQ队列,我批量读取asyncrounusly,但我必须保留这些消息的顺序。我有一个名为ServiceNumber的字段,它定义了一个唯一的消息号,我必须保留这个命令。

For example

例如

   SN1 SN2 SN1 SN1 SN1 SN2
   1   2   3   4   5   6 

In this case we can process messages 1 and 2 sumultaneously (they have different SNs), then we can process 3 and 6, then 4, then 5.

在这种情况下,我们可以同时处理消息1和2(它们具有不同的SN),然后我们可以处理3和6,然后是4,然后是5。

I tried to implement it via ContinueWith chain in a following manner:

我尝试通过ContinueWith链以下列方式实现它:

private readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

private async Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
{
    Task taskToAwait;
    await _semaphore.WaitAsync();
    try
    {
        _currentTasks.TryGetValue(serviceNumber, out var task);
        if (task == null)
            task = Task.CompletedTask;

        taskToAwait = _currentTasks[serviceNumber] = task.ContinueWith(_ => taskFunc());
    }
    finally
    {
        _semaphore.Release();
    }

    await taskToAwait.ConfigureAwait(false);
}

void Main()
{
    Task.Run(async () => {
        var task1 = Task.Run(() =>
        {
            return WrapMessageInQueue("10", async () =>
            {
                await Task.Delay(5000);
                Console.WriteLine("first task finished");
            });
        });

        while (task1.Status == TaskStatus.WaitingForActivation) 
        {
            Console.WriteLine("waiting task to be picked by a scheduler. Status = {0}", task1.Status);
            await Task.Delay(100);
        }

        var task2 = Task.Run(() =>
        {
            return WrapMessageInQueue("10", async () =>
            {
                Console.WriteLine("second task finished");
            });
        });

        await Task.WhenAll(new[] {task1, task2});
    }).Wait();
}

The main idea here is that the first RUNNED task should be finished before all the rest start. So I implemented a dictionary, where I store a task, and each subsequent one gets added into ContinueWith chain. Thus, it gets executed strictly after the previos one gets executed. When the 3rd task arrives, it gets its place in the queue, and so on.

这里的主要想法是第一个RUNNED任务应该在所有其他任务开始之前完成。所以我实现了一个字典,在那里我存储了一个任务,每个后续的一个都被添加到了ContinueWith链中。因此,它会在普遍的执行后严格执行。当第3个任务到达时,它将在队列中占据一席之地,依此类推。

But for some reason it doesn't work and the output is

但由于某种原因,它不起作用,输出是

second task finished

第二项任务完成

first task finished

第一项任务完成

What's wrong with this code? Is there any better approach?

这段代码出了什么问题?有没有更好的方法?

3 个解决方案

#1


1  

The problem is that this code fragment

问题是这段代码片段

task.ContinueWith(_ => taskFunc());

doesn't do what you expect. It creates a task, which will just start the continuation, but not wait for it. As a result your both continuations are invoked immediately.

不符合你的期望。它创建了一个任务,它只会启动延续,但不会等待它。结果,您的两个延续都会立即被调用。

All in all you have too many unnecessary tasks there which are partially not properly awaited. I cleaned it up and implemented the continuation function you need to make it work.

总而言之,那里有太多不必要的任务,部分未经正常等待。我清理它并实现了使其工作所需的延续功能。

public static
class TaskExtensions
{
    public static async
    Task ContinueWith(this Task task, Func<Task> continuation)
    {
        await task;
        await continuation();
    }
}

class Program
{
    static readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();

    private static
    Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
    {
        lock (_currentTasks)
        {
            if (!_currentTasks.TryGetValue(serviceNumber, out var task))
                task = Task.CompletedTask;

            return _currentTasks[serviceNumber] = task.ContinueWith(() => taskFunc());
        }
    }

    public static
    void Main(string[] args)
    {
        Task.Run(async () =>
        {
            var task1 = WrapMessageInQueue("10", async () =>
            {
                await Task.Delay(500);
                Console.WriteLine("first task finished");
            });

            var task2 = WrapMessageInQueue("10", async () =>
            {
                Console.WriteLine("second task finished");
            });

            await Task.WhenAll(new[] { task1, task2 });
        }).Wait();
    }
}

#2


2  

You are using Task.Run to add your test tasks into the queue and have a race condition here — there is no guarantee that task1 will be picked by the thread pool earlier then task2.

您正在使用Task.Run将测试任务添加到队列中并且在此处具有竞争条件 - 无法保证task1将在任务2之前被线程池选中。

Not sure if that's really the case anyway.

无论如何,不​​确定是否真的如此。

You may want to check TPL Dataflow library, I assume that it's pretty suitable for described scenario.

您可能想要检查TPL Dataflow库,我认为它非常适合所描述的场景。

Or even group by SN with use of Reactive extensions and then process.

甚至可以通过SN使用Reactive扩展进行分组,然后进行处理。

#3


0  

Interesting approach. But since the processing of the messages (by SN) has to be sequential anyway, why do you one task per message? It just makes things more complicated, because you need to control the execution order of the tasks.

有趣的方法。但是,由于消息的处理(通过SN)必须是顺序的,为什么每条消息一个任务?它只会使事情变得更复杂,因为您需要控制任务的执行顺序。

Why not have a collector task which sorts the incoming messages into queues (by SN) and starts one task per SN to process the queue?

为什么没有收集器任务将传入的消息排序到队列中(通过SN)并且每个SN启动一个任务来处理队列?

#1


1  

The problem is that this code fragment

问题是这段代码片段

task.ContinueWith(_ => taskFunc());

doesn't do what you expect. It creates a task, which will just start the continuation, but not wait for it. As a result your both continuations are invoked immediately.

不符合你的期望。它创建了一个任务,它只会启动延续,但不会等待它。结果,您的两个延续都会立即被调用。

All in all you have too many unnecessary tasks there which are partially not properly awaited. I cleaned it up and implemented the continuation function you need to make it work.

总而言之,那里有太多不必要的任务,部分未经正常等待。我清理它并实现了使其工作所需的延续功能。

public static
class TaskExtensions
{
    public static async
    Task ContinueWith(this Task task, Func<Task> continuation)
    {
        await task;
        await continuation();
    }
}

class Program
{
    static readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();

    private static
    Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc)
    {
        lock (_currentTasks)
        {
            if (!_currentTasks.TryGetValue(serviceNumber, out var task))
                task = Task.CompletedTask;

            return _currentTasks[serviceNumber] = task.ContinueWith(() => taskFunc());
        }
    }

    public static
    void Main(string[] args)
    {
        Task.Run(async () =>
        {
            var task1 = WrapMessageInQueue("10", async () =>
            {
                await Task.Delay(500);
                Console.WriteLine("first task finished");
            });

            var task2 = WrapMessageInQueue("10", async () =>
            {
                Console.WriteLine("second task finished");
            });

            await Task.WhenAll(new[] { task1, task2 });
        }).Wait();
    }
}

#2


2  

You are using Task.Run to add your test tasks into the queue and have a race condition here — there is no guarantee that task1 will be picked by the thread pool earlier then task2.

您正在使用Task.Run将测试任务添加到队列中并且在此处具有竞争条件 - 无法保证task1将在任务2之前被线程池选中。

Not sure if that's really the case anyway.

无论如何,不​​确定是否真的如此。

You may want to check TPL Dataflow library, I assume that it's pretty suitable for described scenario.

您可能想要检查TPL Dataflow库,我认为它非常适合所描述的场景。

Or even group by SN with use of Reactive extensions and then process.

甚至可以通过SN使用Reactive扩展进行分组,然后进行处理。

#3


0  

Interesting approach. But since the processing of the messages (by SN) has to be sequential anyway, why do you one task per message? It just makes things more complicated, because you need to control the execution order of the tasks.

有趣的方法。但是,由于消息的处理(通过SN)必须是顺序的,为什么每条消息一个任务?它只会使事情变得更复杂,因为您需要控制任务的执行顺序。

Why not have a collector task which sorts the incoming messages into queues (by SN) and starts one task per SN to process the queue?

为什么没有收集器任务将传入的消息排序到队列中(通过SN)并且每个SN启动一个任务来处理队列?

相关文章