(译)构建Async同步基元,Part 1 AsyncManualResetEvent

时间:2021-03-29 23:37:01

传送门:异步编程系列目录……

最近在学习.NET4.5关于“并行任务”的使用。“并行任务”有自己的同步机制,没有显示给出类似如旧版本的:事件等待句柄、信号量、lockReaderWriterLock……等同步基元对象,但我们可以沿溪这一编程习惯,那么这系列翻译就是给“并行任务”封装同步基元对象。翻译资源来源《(译)关于Async与Await的FAQ》

1.         构建Async同步基元,Part 1 AsyncManualResetEvent

2.         构建Async同步基元,Part 2 AsyncAutoResetEvent

3.         构建Async同步基元,Part 3 AsyncCountdownEvent

4.         构建Async同步基元,Part 4 AsyncBarrier

5.         构建Async同步基元,Part 5 AsyncSemaphore

6.         构建Async同步基元,Part 6 AsyncLock

7.         构建Async同步基元,Part 7 AsyncReaderWriterLock

 

源码:构建Async同步基元.rar

开始:Async同步基元,Part 1 AsyncManualResetEvent

基于任务异步模式(TAP)不仅仅是关于开始然后异步等待完成的异步操作,更概括的说,任务可以用来指代各种事件,使你能够等待任何事的条件发生。我们甚至可以使用任务来构建简单的同步基元,这些同步基元类似.NET原生提供的非任务版本,但是它们允许等待异步完成。

线程同步基元之一:事件等待句柄,它们存在于.NET FrameworkManualResetEvent AutoResetEvent.NET 4ManualResetEvent新增的优化版本ManualResetEventSlim。事件等待句柄就是一方等待另一方提供信号。比如ManualResetEvent,他会在调用Set()后保持信号直到显示调用Reset()

TaskCompletionSource<TResult>本身基于SpinWait结构与Task的IsCompleted属性实现类似事件等待句柄,仅仅是缺少Reset()方法。

    // 表示未绑定到委托的 System.Threading.Tasks.Task<TResult> 的制造者方,
// 并通过 Tasks.TaskCompletionSource<TResult>.Task属性提供对使用者方的访问。
public class TaskCompletionSource<TResult>
{
public TaskCompletionSource();

// 获取由此 Tasks.TaskCompletionSource<TResult> 创建的 Tasks.Task<TResult>。
public Task<TResult> Task { get; }

// 将基础 Tasks.Task<TResult> 转换为 Tasks.TaskStatus.Canceled状态。
public void SetCanceled();
public bool TrySetCanceled();

// 将基础 Tasks.Task<TResult> 转换为 Tasks.TaskStatus.Faulted状态。
public void SetException(Exception exception);
public void SetException(IEnumerable<Exception> exceptions);
public bool TrySetException(Exception exception);
public bool TrySetException(IEnumerable<Exception> exceptions);

// 尝试将基础 Tasks.Task<TResult> 转换为 TaskStatus.RanToCompletion状态。
public bool TrySetResult(TResult result);
……
}

TaskCompletionSource<TResult>开始于无信号,它指代的任务不能完成,因此,等待这个任务的“异步方法”也不能完成。(Try)Set*方法充当信号,将任务切换到完成状态,这样才能完成等待任务。因此我们可以很容易基于TaskCompletionSource<TResult>来构建一个AsyncManualResetEvent。它可以为我们提供缺失的Reset()能力。接下来我们构建此AsyncManualResetEvent

这是我们将构建的目标类型:

public class AsyncManualResetEvent
{
public Task WaitAsync();
public void Set();
public void Reset();
}

WaitAsync()Set()方法非常简单,直接封装TaskCompletionSource<bool>实例成员,如下:

public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();
public Task WaitAsync() { return m_tcs.Task; }
public void Set() { m_tcs.TrySetResult(true); }

}

剩下的只有Reset()方法了。我们的目标是使随后调用的WaitAsync()中返回的Task无法完成。因为Task最终状态只有完成状态(即,正常完成、取消、异常),所以我们需要切换一个新的TaskCompletionSource<bool>实例。这样做,我们只需要确保如果多个线程同时调用Reset()Set()WaitAsync() WaitAsync()不会返回孤立的Task(即,我们不希望一个线程调用WaitAsync()返回一个不能完成的Task(已经被Reset())后,另一个线程又在新的Task上调用Set())。为了达到此目的,我们将确保如果当前Task已经完成就切换一个新的Task,并且还确保这个切换操作的原子性。(当然,还有其他策略实现此目标,这仅仅是我选择的一个特定例子)

注意:关键字volatile和Interlocked类的使用。

public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();

public Task WaitAsync() { return m_tcs.Task; }
public void Set() { m_tcs.TrySetResult(true); }
public void Reset()
{
while (true)
{
var tcs = m_tcs;
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref m_tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}

到此,我们的AsyncManualResetEvent已经完成。然而,还有一个重要的潜在行为要记住。在之前的文章中,我们谈论过延续任务和他们是如何同步执行,这意味着延续任务将作为任务完成的一部分执行,在同一个线程上同步完成任务。对于TaskCompletionSource<TResult>,这意味着同步延续任务将作为(Try)Set*方法的一部分执行,也就是说,在AsyncManualResetEvent例子中,延续任务将作为Set()方法的一部分执行。根据你的需求,如果你不希望这种事情发生,有一些替代的方法。一种方法是异步运行(Try)Set*方法,并使Set()调用阻塞,直到任务真真完成只是任务本身,不包括任务的延续任务)。Eg

public void Set() 
{
var tcs = m_tcs;
Task.Factory.StartNew(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs
, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
tcs.Task.Wait();
}

当然,还有其他可能的方法,如何实现取决于你的需求。

 

这就是本节要讲的AsyncManualResetEvent

完整源码如下:

    public class AsyncManualResetEvent
{
private volatile TaskCompletionSource<bool> m_tcs = new TaskCompletionSource<bool>();

public Task WaitAsync() { return m_tcs.Task; }

public void Set()
{
var tcs = m_tcs;
Task.Factory.StartNew(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs
, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
tcs.Task.Wait();
}

public void Reset()
{
while (true)
{
var tcs = m_tcs;
// 短逻辑单元 确保如果当前Task已经完成就切换一个新的Task。
if (!tcs.Task.IsCompleted ||
Interlocked.CompareExchange(ref m_tcs, new TaskCompletionSource<bool>(), tcs) == tcs)
return;
}
}
}

下一节,我将实现一个async版本的AutoResetEvent

 

推荐阅读:

                   异步编程:同步基元对象(上)

                   异步编程:同步基元对象(下)

 

感谢你的观看……

原文:Building Async Coordination Primitives, Part 1: AsyncManualResetEvent

作者:Stephen Toub – MSFT