如何防止任务上的同步延续?

时间:2022-04-19 02:23:12

I have some library (socket networking) code that provides a Task-based API for pending responses to requests, based on TaskCompletionSource<T>. However, there's an annoyance in the TPL in that it seems to be impossible to prevent synchronous continuations. What I would like to be able to do is either:

我有一些库(套接字网络)代码,基于TaskCompletionSource ,为请求的挂起响应提供了基于任务的API。但是,TPL中有一个麻烦,似乎不可能阻止同步延续。我想做的是:

  • tell a TaskCompletionSource<T> that is should not allow callers to attach with TaskContinuationOptions.ExecuteSynchronously, or
  • 告诉一个TaskCompletionSource 是不允许调用者附加TaskContinuationOptions的。ExecuteSynchronously或
  • set the result (SetResult / TrySetResult) in a way that specifies that TaskContinuationOptions.ExecuteSynchronously should be ignored, using the pool instead
  • 以指定TaskContinuationOptions的方式设置结果(SetResult / TrySetResult)。应该忽略可执行性,而应该使用池

Specifically, the issue I have is that the incoming data is being processed by a dedicated reader, and if a caller can attach with TaskContinuationOptions.ExecuteSynchronously they can stall the reader (which affects more than just them). Previously, I have worked around this by some hackery that detects whether any continuations are present, and if they are it pushes the completion onto the ThreadPool, however this has significant impact if the caller has saturated their work queue, as the completion will not get processed in a timely fashion. If they are using Task.Wait() (or similar), they will then essentially deadlock themselves. Likewise, this is why the reader is on a dedicated thread rather than using workers.

具体地说,我遇到的问题是,传入的数据正在由一个专用阅读器处理,如果调用者可以附加TaskContinuationOptions的话。可执行地,他们可以使读者停滞不前(这不仅仅影响他们)。以前,我一直在通过一些牛车,检测是否延续存在,如果他们将完成到ThreadPool,然而这如果调用者有重大影响饱和工作队列,当完成不及时处理。如果他们正在使用Task.Wait(或类似的),那么他们实际上会自己死锁。同样,这也是为什么读者是在一个专用线程而不是使用工人。

So; before I try and nag the TPL team: am I missing an option?

所以;在我试图对TPL团队唠叨之前:我是否错过了一个选项?

Key points:

重点:

  • I don't want external callers to be able to hijack my thread
  • 我不希望外部调用者能够劫持我的线程
  • I can't use the ThreadPool as an implementation, as it needs to work when the pool is saturated
  • 我不能使用ThreadPool作为实现,因为当池饱和时它需要工作

The example below produces output (ordering may vary based on timing):

下面的示例产生输出(按时间顺序排列):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

The problem is the fact that a random caller managed to get a continuation on "Main thread". In the real code, this would be interrupting the primary reader; bad things!

问题在于,随机调用者设法在“主线程”上获得延续。在真正的代码中,这将打断主要读者;不好的事情!

Code:

代码:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

6 个解决方案

#1


46  

New in .NET 4.6:

新在。net 4.6:

.NET 4.6 contains a new TaskCreationOptions: RunContinuationsAsynchronously.

. net 4.6包含一个新的TaskCreationOptions: runcontinuations异步。


Since you're willing to use Reflection to access private fields...

既然您愿意使用反射来访问私有字段……

You can mark the TCS's Task with the TASK_STATE_THREAD_WAS_ABORTED flag, which would cause all continuations not to be inlined.

您可以使用TASK_STATE_THREAD_WAS_ABORTED标志来标记TCS的任务,这将导致所有延续没有内联。

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Edit:

编辑:

Instead of using Reflection emit, I suggest you use expressions. This is much more readable and has the advantage of being PCL-compatible:

我建议您使用表达式,而不是使用反射发射。这更易于阅读,并且具有与pcl兼容的优点:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Without using Reflection:

不使用反射:

If anyone's interested, I've figured out a way to do this without Reflection, but it is a bit "dirty" as well, and of course carries a non-negligible perf penalty:

如果有人感兴趣的话,我已经想出了一种不用思考就能做到的方法,但它也有点“肮脏”,当然还有不可忽视的惩罚:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

#2


10  

I don't think there's anything in TPL which would provides explicit API control over TaskCompletionSource.SetResult continuations. I decided to keep my initial answer for controlling this behavior for async/await scenarios.

我不认为TPL中有任何东西可以对TaskCompletionSource提供显式的API控制。SetResult延续。我决定保留对异步/等待场景控制这种行为的最初答案。

Here is another solution which imposes asynchronous upon ContinueWith, if the tcs.SetResult-triggered continuation takes place on the same thread the SetResult was called on:

如果是tcs,这是另一个将异步强加给ContinueWith的解决方案。SetResult触发的延续发生在同一个线程上,SetResult被调用:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Updated to address the comment:

更新后的评论:

I don't control the caller - I can't get them to use a specific continue-with variant: if I could, the problem would not exist in the first place

我不能控制调用者——我不能让他们使用特定的延续——与变量:如果我可以,问题将首先不存在

I wasn't aware you don't control the caller. Nevertheless, if you don't control it, you're probably not passing the TaskCompletionSource object directly to the caller, either. Logically, you'd be passing the token part of it, i.e. tcs.Task. In which case, the solution might be even easier, by adding another extension method to the above:

我不知道你不能控制打电话的人。然而,如果您不控制它,您可能也不会将TaskCompletionSource对象直接传递给调用者。逻辑上,您将传递它的标记部分,即tcs.Task。在这种情况下,通过在上面添加另一种扩展方法,解决方案可能更容易:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Use:

使用:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

This actually works for both await and ContinueWith (fiddle) and is free of reflection hacks.

这实际上适用于wait和ContinueWith (fiddle),并且不需要反射技巧。

#3


3  

What about instead of doing

与其去做,不如去做!

var task = source.Task;

you do this instead

你这样做不是

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Thus you are always adding one continuation which will be executed asynchronously and then it doesn't matter if the subscribers want a continuation in the same context. It's sort of currying the task, isn't it?

因此,您总是添加一个将异步执行的延续,然后无论订阅者是否希望在相同的上下文中有一个延续。这有点像是在执行任务,不是吗?

#4


3  

if you can and are ready to use reflection, this should do it;

如果您能够并且已经准备好使用反射,那么应该这样做;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

#5


3  

Updated, I posted a separate answer to deal with ContinueWith as opposed to await (because ContinueWith doesn't care about the current synchronization context).

更新后,我发布了一个单独的答案来处理ContinueWith,而不是wait(因为ContinueWith并不关心当前的同步上下文)。

You could use a dumb synchronization context to impose asynchrony upon continuation triggered by calling SetResult/SetCancelled/SetException on TaskCompletionSource. I believe the current synchronization context (at the point of await tcs.Task) is the criteria TPL uses to decide whether to make such continuation synchronous or asynchronous.

您可以使用哑同步上下文在TaskCompletionSource上调用SetResult/ setcancel /SetException触发的延续时施加异步。我相信当前的同步上下文(在wait tcs.Task的点上)是TPL用来决定是否使这种延续同步或异步的标准。

The following works for me:

以下是我的作品:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync is implemented like this:

SetResultAsync是这样实现的:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext is very cheap in terms of the overhead it adds. In fact, a very similar approach is taken by the implementation of WPF Dispatcher.BeginInvoke.

SynchronizationContext。SetSynchronizationContext在它添加的开销方面非常便宜。实际上,WPF Dispatcher.BeginInvoke方法与此类似。

TPL compares the target synchronization context at the point of await to that of the point of tcs.SetResult. If the synchronization context is the same (or there is no synchronization context at both places), the continuation is called directly, synchronously. Otherwise, it's queued using SynchronizationContext.Post on the target synchronization context, i.e., the normal await behavior. What this approach does is always impose the SynchronizationContext.Post behavior (or a pool thread continuation if there's no target synchronization context).

TPL将目标同步上下文与tcs.SetResult的点进行比较。如果同步上下文是相同的(或者两个地方都没有同步上下文),则会直接、同步地调用延续。否则,它将使用SynchronizationContext进行排队。在目标同步上下文中,即,正常的等待行为。这种方法的作用是始终强制同步上下文。Post行为(如果没有目标同步上下文,则使用池线程延续)。

Updated, this won't work for task.ContinueWith, because ContinueWith doesn't care about the current synchronization context. It however works for await task (fiddle). It also does work for await task.ConfigureAwait(false).

更新后,这对任务无效。继续,因为ContinueWith不关心当前的同步上下文。但它适用于等待任务(fiddle)。它也适用于wait . configure - wait(false)。

OTOH, this approach works for ContinueWith.

OTOH,这种方法适用于ContinueWith。

#6


3  

The simulate abort approach looked really good, but led to the TPL hijacking threads in some scenarios.

模拟中止方法看起来非常好,但是在某些场景中导致了TPL劫持线程。

I then had an implementation that was similar to checking the continuation object, but just checking for any continuation since there are actually too many scenarios for the given code to work well, but that meant that even things like Task.Wait resulted in a thread-pool lookup.

然后我有了一个类似于检查continuation对象的实现,但只是检查任何延续,因为实际上有太多的场景可以让给定的代码正常工作,但这意味着即使是像Task这样的事情。等待导致线程池查找。

Ultimately, after inspecting lots and lots of IL, the only safe and useful scenario is the SetOnInvokeMres scenario (manual-reset-event-slim continuation). There are lots of other scenarios:

最后,在检查了大量IL之后,唯一安全且有用的场景是SetOnInvokeMres场景(manual-reset-event-slim continuation)。还有很多其他的情况:

  • some aren't safe, and lead to thread hijacking
  • 有些是不安全的,会导致线程劫持
  • the rest aren't useful, as they ultimately lead to the thread-pool
  • 其余的并不有用,因为它们最终会导致线程池

So in the end, I opted to check for a non-null continuation-object; if it is null, fine (no continuations); if it is non-null, special-case check for SetOnInvokeMres - if it is that: fine (safe to invoke); otherwise, let the thread-pool perform the TrySetComplete, without telling the task to do anything special like spoofing abort. Task.Wait uses the SetOnInvokeMres approach, which is the specific scenario we want to try really hard not to deadlock.

最后,我选择检查一个非空连续对象;如果它是空的,很好(没有延续);如果它是非null,则对SetOnInvokeMres进行特殊情况检查——如果它是:很好(可以安全地调用);否则,让线程池执行TrySetComplete,而不告诉任务做任何特殊的事情,比如欺骗中止。的任务。Wait使用SetOnInvokeMres方法,这是我们希望尽量避免死锁的特定场景。

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

#1


46  

New in .NET 4.6:

新在。net 4.6:

.NET 4.6 contains a new TaskCreationOptions: RunContinuationsAsynchronously.

. net 4.6包含一个新的TaskCreationOptions: runcontinuations异步。


Since you're willing to use Reflection to access private fields...

既然您愿意使用反射来访问私有字段……

You can mark the TCS's Task with the TASK_STATE_THREAD_WAS_ABORTED flag, which would cause all continuations not to be inlined.

您可以使用TASK_STATE_THREAD_WAS_ABORTED标志来标记TCS的任务,这将导致所有延续没有内联。

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Edit:

编辑:

Instead of using Reflection emit, I suggest you use expressions. This is much more readable and has the advantage of being PCL-compatible:

我建议您使用表达式,而不是使用反射发射。这更易于阅读,并且具有与pcl兼容的优点:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Without using Reflection:

不使用反射:

If anyone's interested, I've figured out a way to do this without Reflection, but it is a bit "dirty" as well, and of course carries a non-negligible perf penalty:

如果有人感兴趣的话,我已经想出了一种不用思考就能做到的方法,但它也有点“肮脏”,当然还有不可忽视的惩罚:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

#2


10  

I don't think there's anything in TPL which would provides explicit API control over TaskCompletionSource.SetResult continuations. I decided to keep my initial answer for controlling this behavior for async/await scenarios.

我不认为TPL中有任何东西可以对TaskCompletionSource提供显式的API控制。SetResult延续。我决定保留对异步/等待场景控制这种行为的最初答案。

Here is another solution which imposes asynchronous upon ContinueWith, if the tcs.SetResult-triggered continuation takes place on the same thread the SetResult was called on:

如果是tcs,这是另一个将异步强加给ContinueWith的解决方案。SetResult触发的延续发生在同一个线程上,SetResult被调用:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Updated to address the comment:

更新后的评论:

I don't control the caller - I can't get them to use a specific continue-with variant: if I could, the problem would not exist in the first place

我不能控制调用者——我不能让他们使用特定的延续——与变量:如果我可以,问题将首先不存在

I wasn't aware you don't control the caller. Nevertheless, if you don't control it, you're probably not passing the TaskCompletionSource object directly to the caller, either. Logically, you'd be passing the token part of it, i.e. tcs.Task. In which case, the solution might be even easier, by adding another extension method to the above:

我不知道你不能控制打电话的人。然而,如果您不控制它,您可能也不会将TaskCompletionSource对象直接传递给调用者。逻辑上,您将传递它的标记部分,即tcs.Task。在这种情况下,通过在上面添加另一种扩展方法,解决方案可能更容易:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Use:

使用:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

This actually works for both await and ContinueWith (fiddle) and is free of reflection hacks.

这实际上适用于wait和ContinueWith (fiddle),并且不需要反射技巧。

#3


3  

What about instead of doing

与其去做,不如去做!

var task = source.Task;

you do this instead

你这样做不是

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Thus you are always adding one continuation which will be executed asynchronously and then it doesn't matter if the subscribers want a continuation in the same context. It's sort of currying the task, isn't it?

因此,您总是添加一个将异步执行的延续,然后无论订阅者是否希望在相同的上下文中有一个延续。这有点像是在执行任务,不是吗?

#4


3  

if you can and are ready to use reflection, this should do it;

如果您能够并且已经准备好使用反射,那么应该这样做;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

#5


3  

Updated, I posted a separate answer to deal with ContinueWith as opposed to await (because ContinueWith doesn't care about the current synchronization context).

更新后,我发布了一个单独的答案来处理ContinueWith,而不是wait(因为ContinueWith并不关心当前的同步上下文)。

You could use a dumb synchronization context to impose asynchrony upon continuation triggered by calling SetResult/SetCancelled/SetException on TaskCompletionSource. I believe the current synchronization context (at the point of await tcs.Task) is the criteria TPL uses to decide whether to make such continuation synchronous or asynchronous.

您可以使用哑同步上下文在TaskCompletionSource上调用SetResult/ setcancel /SetException触发的延续时施加异步。我相信当前的同步上下文(在wait tcs.Task的点上)是TPL用来决定是否使这种延续同步或异步的标准。

The following works for me:

以下是我的作品:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync is implemented like this:

SetResultAsync是这样实现的:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext is very cheap in terms of the overhead it adds. In fact, a very similar approach is taken by the implementation of WPF Dispatcher.BeginInvoke.

SynchronizationContext。SetSynchronizationContext在它添加的开销方面非常便宜。实际上,WPF Dispatcher.BeginInvoke方法与此类似。

TPL compares the target synchronization context at the point of await to that of the point of tcs.SetResult. If the synchronization context is the same (or there is no synchronization context at both places), the continuation is called directly, synchronously. Otherwise, it's queued using SynchronizationContext.Post on the target synchronization context, i.e., the normal await behavior. What this approach does is always impose the SynchronizationContext.Post behavior (or a pool thread continuation if there's no target synchronization context).

TPL将目标同步上下文与tcs.SetResult的点进行比较。如果同步上下文是相同的(或者两个地方都没有同步上下文),则会直接、同步地调用延续。否则,它将使用SynchronizationContext进行排队。在目标同步上下文中,即,正常的等待行为。这种方法的作用是始终强制同步上下文。Post行为(如果没有目标同步上下文,则使用池线程延续)。

Updated, this won't work for task.ContinueWith, because ContinueWith doesn't care about the current synchronization context. It however works for await task (fiddle). It also does work for await task.ConfigureAwait(false).

更新后,这对任务无效。继续,因为ContinueWith不关心当前的同步上下文。但它适用于等待任务(fiddle)。它也适用于wait . configure - wait(false)。

OTOH, this approach works for ContinueWith.

OTOH,这种方法适用于ContinueWith。

#6


3  

The simulate abort approach looked really good, but led to the TPL hijacking threads in some scenarios.

模拟中止方法看起来非常好,但是在某些场景中导致了TPL劫持线程。

I then had an implementation that was similar to checking the continuation object, but just checking for any continuation since there are actually too many scenarios for the given code to work well, but that meant that even things like Task.Wait resulted in a thread-pool lookup.

然后我有了一个类似于检查continuation对象的实现,但只是检查任何延续,因为实际上有太多的场景可以让给定的代码正常工作,但这意味着即使是像Task这样的事情。等待导致线程池查找。

Ultimately, after inspecting lots and lots of IL, the only safe and useful scenario is the SetOnInvokeMres scenario (manual-reset-event-slim continuation). There are lots of other scenarios:

最后,在检查了大量IL之后,唯一安全且有用的场景是SetOnInvokeMres场景(manual-reset-event-slim continuation)。还有很多其他的情况:

  • some aren't safe, and lead to thread hijacking
  • 有些是不安全的,会导致线程劫持
  • the rest aren't useful, as they ultimately lead to the thread-pool
  • 其余的并不有用,因为它们最终会导致线程池

So in the end, I opted to check for a non-null continuation-object; if it is null, fine (no continuations); if it is non-null, special-case check for SetOnInvokeMres - if it is that: fine (safe to invoke); otherwise, let the thread-pool perform the TrySetComplete, without telling the task to do anything special like spoofing abort. Task.Wait uses the SetOnInvokeMres approach, which is the specific scenario we want to try really hard not to deadlock.

最后,我选择检查一个非空连续对象;如果它是空的,很好(没有延续);如果它是非null,则对SetOnInvokeMres进行特殊情况检查——如果它是:很好(可以安全地调用);否则,让线程池执行TrySetComplete,而不告诉任务做任何特殊的事情,比如欺骗中止。的任务。Wait使用SetOnInvokeMres方法,这是我们希望尽量避免死锁的特定场景。

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));