我 们知道并行编程模型两种:一种是基于消息式的,第二种是基于共享内存式的。 前段时间项目中遇到了第二种 使用多线程开发并行程序共享资源的问题 ,今天以实际案例出发对.net里的共享内存式的线程同步机制做个总结,由于某些类库的应用属于基础,所以本次不对基本使用做出讲解,基本使用 MSDN是最好的教程。
一、volatile关键字
基本介绍: 封装了 Thread.VolatileWrite() 和 Thread.VolatileRead()的实现 ,主要作用是强制刷新高速缓存。
使用场景: 适用于在多核多CPU的机器上 解决变量在内存和高速缓存同步不及时的问题。
案例:参考下文 二、原子操作的 案例 或者 System.Collections.Concurrent命名空间下的 ConcurrentQueue ,ConcurrentDictionary 等并发集合的实现方式。
二、原子操作(Interlock)
基本介绍: 原 子操作是 实现Spinlock,Monitor,ReadWriterLock锁的基础,其实现原理是在计算机总线上标志一个信号来表示资源已经被占用 如果其他指令进行修改则等待本次操作完成后才能进行,因为原子操作是在硬件上实现的 所以速度非常快,大约在50个时钟周期。其实原子操作也可以看做一种锁。
使用场景:性 能要求较高的场合,需要对字段进行快速的同步或者对变量进行原子形式的跟新操作(例如:int b=0; b=b+1 实际分解为多条汇编指令,在多线程情况下 多条汇编指令并行的执行可能导致错误的结果,所以要保证执行 b=b+1 生成的汇编指令是一个原子形式执行 ),例如实现一个并行队列,异步队列等。
案例:一个基于事件触发机制队列的实现
001.
/// <summary>
002.
/// 表示一个实时处理队列
003.
/// </summary>
004.
public
class
ProcessQueue<T>
005.
{
006.
#region [成员]
007.
008.
private
ConcurrentQueue<IEnumerable<T>> queue;
009.
010.
private
Action<IEnumerable<T>> PublishHandler;
011.
012.
//指定处理的线程数
013.
private
int
core = Environment.ProcessorCount;
014.
015.
//正在运行的线程数
016.
private
int
runingCore = 0;
017.
018.
public
event
Action<Exception> OnException;
019.
020.
//队列是否正在处理数据
021.
private
int
isProcessing=0;
022.
023.
//队列是否可用
024.
private
bool
enabled =
true
;
025.
026.
#endregion
027.
028.
#region 构造函数
029.
030.
public
ProcessQueue(Action<IEnumerable<T>> handler)
031.
{
032.
033.
queue =
new
ConcurrentQueue<IEnumerable<T>>();
034.
035.
PublishHandler = handler;
036.
this
.OnException += ProcessException.OnProcessException;
037.
}
038.
039.
#endregion
040.
041.
#region [方法]
042.
043.
/// <summary>
044.
/// 入队
045.
/// </summary>
046.
/// <param name="items">数据集合</param>
047.
public
void
Enqueue(IEnumerable<T> items)
048.
{
049.
if
(items !=
null
)
050.
{
051.
queue.Enqueue(items);
052.
}
053.
054.
//判断是否队列有线程正在处理
055.
if
(enabled && Interlocked.CompareExchange(
ref
isProcessing, 1, 0) == 0)
056.
{
057.
if
(!queue.IsEmpty)
058.
{
059.
ThreadPool.QueueUserWorkItem(ProcessItemLoop);
060.
}
061.
else
062.
{
063.
Interlocked.Exchange(
ref
isProcessing, 0);
064.
}
065.
}
066.
}
067.
068.
/// <summary>
069.
/// 开启队列数据处理
070.
/// </summary>
071.
public
void
Start()
072.
{
073.
Thread process_Thread =
new
Thread(PorcessItem);
074.
process_Thread.IsBackground =
true
;
075.
process_Thread.Start();
076.
}
077.
078.
/// <summary>
079.
/// 循环处理数据项
080.
/// </summary>
081.
/// <param name="state"></param>
082.
private
void
ProcessItemLoop(
object
state)
083.
{
084.
//表示一个线程递归 当处理完当前数据时 则开起线程处理队列中下一条数据 递归终止条件是队列为空时
085.
//但是可能会出现 队列有数据但是没有线程去处理的情况 所有一个监视线程监视队列中的数据是否为空,如果为空
086.
//并且没有线程去处理则开启递归线程
087.
088.
if
(!enabled && queue.IsEmpty)
089.
{
090.
Interlocked.Exchange(
ref
isProcessing, 0);
091.
return
;
092.
}
093.
094.
//处理的线程数 是否小于当前CPU核数
095.
if
(Thread.VolatileRead(
ref
runingCore) <= core * 2*)
096.
{
097.
IEnumerable<T> publishFrame;
098.
//出队以后交给线程池处理
099.
if
(queue.TryDequeue(
out
publishFrame))
100.
{
101.
Interlocked.Increment(
ref
runingCore);
102.
try
103.
{
104.
PublishHandler(publishFrame);
105.
106.
if
(enabled && !queue.IsEmpty)
107.
{
108.
ThreadPool.QueueUserWorkItem(ProcessItemLoop);
109.
}
110.
else
111.
{
112.
Interlocked.Exchange(
ref
isProcessing, 0);
113.
}
114.
115.
}
116.
catch
(Exception ex)
117.
{
118.
OnProcessException(ex);
119.
}
120.
121.
finally
122.
{
123.
Interlocked.Decrement(
ref
runingCore);
124.
}
125.
}
126.
}
127.
128.
}
129.
130.
/// <summary>
131.
///定时处理帧 线程调用函数
132.
///主要是监视入队的时候线程 没有来的及处理的情况
133.
/// </summary>
134.
private
void
PorcessItem(
object
state)
135.
{
136.
int
sleepCount=0;
137.
int
sleepTime = 1000;
138.
while
(enabled)
139.
{
140.
//如果队列为空则根据循环的次数确定睡眠的时间
141.
if
(queue.IsEmpty)
142.
{
143.
if
(sleepCount == 0)
144.
{
145.
sleepTime = 1000;
146.
}
147.
else
if
(sleepCount == 3)
148.
{
149.
sleepTime = 1000 * 3;
150.
}
151.
else
if
(sleepCount == 5)
152.
{
153.
sleepTime = 1000 * 5;
154.
}
155.
else
if
(sleepCount == 8)
156.
{
157.
sleepTime = 1000 * 8;
158.
}
159.
else
if
(sleepCount == 10)
160.
{
161.
sleepTime = 1000 * 10;
162.
}
163.
else
164.
{
165.
sleepTime = 1000 * 50;
166.
}
167.
sleepCount++;
168.
Thread.Sleep(sleepTime);
169.
}
170.
else
171.
{
172.
//判断是否队列有线程正在处理
173.
if
(enabled && Interlocked.CompareExchange(
ref
isProcessing, 1, 0) == 0)
174.
{
175.
if
(!queue.IsEmpty)
176.
{
177.
ThreadPool.QueueUserWorkItem(ProcessItemLoop);
178.
}
179.
else
180.
{
181.
Interlocked.Exchange(
ref
isProcessing, 0);
182.
}
183.
sleepCount = 0;
184.
sleepTime = 1000;
185.
}
186.
}
187.
}
188.
}
189.
190.
/// <summary>
191.
/// 停止队列
192.
/// </summary>
193.
public
void
Stop()
194.
{
195.
this
.enabled =
false
;
196.
197.
}
198.
199.
/// <summary>
200.
/// 触发异常处理事件
201.
/// </summary>
202.
/// <param name="ex">异常</param>
203.
private
void
OnProcessException(Exception ex)
204.
{
205.
var tempException = OnException;
206.
Interlocked.CompareExchange(
ref
tempException,
null
,
null
);
207.
208.
if
(tempException !=
null
)
209.
{
210.
OnException(ex);
211.
}
212.
}
213.
214.
#endregion
215.
216.
}
三、自旋锁(Spinlock)
基本介绍: 在原子操作基础上实现的锁,用户态的锁,缺点是线程一直不释放CPU时间片。操作系统进行一次线程用户态到内核态的切换大约需要500个时钟周期,可以根据这个进行参考我们的线程是进行用户等待还是转到内核的等待.。
使用场景:线程等待资源时间较短的情况下使用。
案例: 和最常用的Monitor 使用方法一样 这里就不举例了,在实际场景中应该优先选择使用Monitor,除非是线程等待资源的时间特别的短。
四、监视器(Monitor)
基本介绍: 原子操作基础上实现的锁,开始处于用户态,自旋一段时间进入内核态的等待释放CPU时间片,缺点使用不当容易造成死锁 c#实现的关键字是Lock。
使用场景: 所有需要加锁的场景都可以使用。
案例: 案例太多了,这里就不列出了。
五、读写锁(ReadWriterLock)
原理分析: 原子操作基础上实现的锁,
使用场景:适用于写的次数少,读的频率高的情况。
案例:一个线程安全的缓存实现(.net 4.0 可以使用基础类库中的 ConcurrentDictionary<K,V>) 注意:老版本ReaderWriterLock已经被淘汰,新版的是ReaderWriterLockSlim
01.
class
CacheManager<K, V>
02.
{
03.
#region [成员]
04.
05.
private
ReaderWriterLockSlim readerWriterLockSlim;
06.
07.
private
Dictionary<K, V> containter;
08.
09.
#endregion
10.
11.
#region [构造函数]
12.
13.
public
CacheManager()
14.
{
15.
this
.readerWriterLockSlim =
new
ReaderWriterLockSlim();
16.
this
.containter =
new
Dictionary<K, V>();
17.
}
18.
19.
#endregion
20.
21.
#region [方法]
22.
23.
public
void
Add(K key, V value)
24.
{
25.
readerWriterLockSlim.EnterWriteLock();
26.
27.
try
28.
{
29.
containter.Add(key, value);
30.
}
31.
32.
finally
33.
{
34.
readerWriterLockSlim.ExitWriteLock();
35.
}
36.
}
37.
38.
public
V Get(K key)
39.
{
40.
41.
bool
result =
false
;
42.
V value;
43.
44.
do
45.
{
46.
readerWriterLockSlim.EnterReadLock();
47.
48.
try
49.
{
50.
result = containter.TryGetValue(key,
out
value);
51.
}
52.
53.
finally
54.
{
55.
readerWriterLockSlim.ExitWriteLock();
56.
}
57.
58.
}
while
(!result);
59.
60.
return
value;
61.
}
62.
63.
#endregion
64.
}
.net中还有其他的线程同步机制:ManualResetEventSlim ,AutoResetEvent ,SemaphoreSlim 这里就逐个进行不介绍 具体在《CLR Via C# 》中解释的非常详细,但在具体的实际开发中我还没有使用到。
最好的线程同步机制是没有同步,这取决于良好的设计,当然有些情况下无法避免使用锁。 在性能要求不高的场合基本的lock就能满足要求,但性能要求比较苛刻的情就需求更具实际场景进行选择哪种线程同步机制。