前言
抛开死锁不谈,只聊性能问题,尽管锁总能粗暴的满足同步需求,但一旦存在竞争关系,意味着一定会有线程被阻塞,竞争越激烈,被阻塞的线程越多,上下文切换次数越多,调度成本越大,显然在高并发的场景下会损害性能。在高并发高性能且要求线程安全的述求下,无锁构造(非阻塞构造)闪亮登场。
参考文档:
C# - 理论与实践中的 C# 内存模型,第 2 部分 | Microsoft Docs
一、非阻塞同步
重排序与缓存
我们观察下面这个例子:
public class Foo
{
private int _answer;
private bool _complete;
void A() //A 1
{
_answer = 10;
_complete = true;
}
void B() //B 2
{
if (_complete) Console.WriteLine(_answer);
}
}
如果方法A
和B
在不同的线程上并发运行,B
可能会打印 “ 0 “ 吗?答案是会的,原因如下:
- 编译器、CLR 或 CPU 可能会对代码/指令进行重排序(reorder)以提高效率。
- 编译器、CLR 或 CPU 可能会进行缓存优化,导致其它线程不能马上看到变量的新值。
请务必重视它们,它们将是幽灵般的存在
int x = 0, y = 0, a = 0, b = 0;
var task1 = Task.Run(() => // A 1
{
a = 1; // 1
x = b; // 2
});
var task2 = Task.Run(() => // B 2
{
b = 2; // 3
y = a; // 4
});
Task.WaitAll(task1, task2);
Console.WriteLine("x:" + x + " y:" + y);
直觉和经验告诉我们,程序至顶向下执行:代码1一定发生在代码2之前,代码3一定发生在代码4之前,然鹅
在一个独立的线程中,每一个语句的执行顺序是可以被保证的,但在不使用lock,waithandle这样的显式同步操作时,我们就没法保证事件在不同的线程中看到的执行顺序是一致的了。尽管线程A中一定需要观察到a=1执行成功之后才会去执行x=b,但它没法确保自己观察得到线程B中对b的写入,所以A还可能会打印出y的一个旧版的值。这就叫指令重排序。
x:0 y:1 #1-2-3-4
x:2 y:0 #3-4-1-2
x:2 y:1 #1-3-2-4
可实际运行时还是有些让我们惊讶的情况:
x:0 y:0 #??
这就是缓存问题,如果两个线程在不同的CPU上执行,每一个核心有自己的缓存,这样一个线程的写入对于其它线程,在主存同步之前就是不可见的了。
C#编译器和CLR运行时会非常小心的保证上述优化不会破坏普通的单线程代码,和正确使用锁的多线程代码。但有时,你仍然需要通过显示的创建内存屏障(memory barrier,也称作内存栅栏 (memory fence))来对抗这些优化,限制指令重排序和读写缓存产生的影响。
内存屏障
处理器支持哪种内存重排序(LoadLoad重排序、LoadStore重排序、StoreStore重排序、StoreLoad重排序),就会提供相对应能够禁止重排序的指令,而这些指令就被称之为内存屏障(LoadLoad屏障、LoadStore屏障、StoreStore屏障、StoreLoad屏障)
屏障名称 | 示例 | 具体作用 |
---|---|---|
StoreLoad | Store1;Store2;Store3;StoreLoad;Load1;Load2;Load3 | 禁止StoreLoad重排序,确保屏障之前任何一个写(如Store2)的结果都会在屏障后任意一个读操作(如Load1)加载之前被写入 |
StoreStore | Store1;Store2;Store3;StoreStore;Store4;Store5;Store6 | 禁止StoreStore重排序,确保屏障之前任何一个写(如Store1)的结果都会在屏障后任意一个写操作(如Store4)之前被写入 |
LoadLoad | Load1;Load2;Load3;LoadLoad;Load4;Load5;Load6 | 禁止LoadLoad重排序,确保屏障之前任何一个读(如Load1)的数据都会在屏障后任意一个读操作(如Load4)之前被加载 |
LoadStore | Load1;Load2;Load3;LoadStore;Store1;Store2;Store3 | 禁止LoadStore重排序,确保屏障之前任何一个读(如Load1)的数据都会在屏障后任意一个写操作(如Store1)的结果被写入高速缓存(或主内存)前被加载 |
读屏障告诉处理器在执行任何的加载前,执行所有已经在失效队列(Invalidte Queues)中的失效(I)指令。即:所有load barrier之前的store指令对之后(本核心和其他核心)的指令都是可见的。
Store Memory Barrier:写屏障,等同于前文的StoreStore Barriers 将store buffer都写入缓存。
写屏障告诉处理器在执行这之后的指令之前,执行所有已经在存储缓存(store buffer)中的修改(M)指令。即:所有store barrier之前的修改(M)指令都是对之后的指令可见。
最简单的内存屏障是完全内存屏障(full memory barrier,或全栅栏(full fence)),它可以阻止所有跨越栅栏的指令进行重排并提交修改和刷新缓存
。内存屏障之前的所有写操作都要写入内存,并将内存中的新值刷到缓存,使得其它CPU核心能够读取到最新值,完全保证了数据的强一致性,进而解决CPU缓存带来的可见性问题。
我们简单修改一下前面的案例
void A()
{
_answer = 10;
Thread.MemoryBarrier(); // 1
_complete = true;
Thread.MemoryBarrier(); // 3
}
void B()
{
Thread.MemoryBarrier(); // 2
if (_complete)
{
_testOutputHelper.WriteLine(_answer.ToString());
}
}
屏障1,3使得这个例子不可能打印出0,屏障2保证如果B在A之后执行,_complete一定读到的是true
内存屏障离我们并不遥远,以下方式都会隐式的使用全栅栏:
-
lock语法糖或
Monitor.Enter
/Monitor.Exit
-
Interlocked
类中的所有方法 -
使用线程池的异步回调,包括异步委托,APM回调,以及任务延续(task continuations)
-
信号构造的等待/复位
-
任何依赖信号同步的情况,比如启动或等待Task,因此下面的代码也是线程安全的
int x = 0; Task t = Task.Factory.StartNew (() => x++); t.Wait(); Console.WriteLine (x); // 1
volatile
另一个(更高级的)解决这个问题的方法是对_complete
字段使用volatile
关键字。
volatile bool _complete;
volatile
关键字通知编译器在每个读这个字段的地方使用一个读栅栏(acquire-fence),并且在每个写这个字段的地方使用一个写栅栏(release-fence)。
这种“半栅栏(half-fences)”比全栅栏更快,因为它给了运行时和硬件更大的优化空间。
读栅栏:也就是读屏障(Store Memory Barrier),等同于前文的LoadLoad Barriers 将Invalidate的 都执行完成。告诉处理器在执行任何的加载前,执行所有已经在失效队列(Invalidte Queues)中的失效(I)指令。即:所有load barrier之前的store指令对之后(本核心和其他核心)的指令都是可见的。
写栅栏:也就是写屏障(Store Memory Barrier),等同于前文的StoreStore Barriers 将store buffer都写入主存。
告诉处理器在执行这之后的指令之前,执行所有已经在存储缓存(store buffer)中的修改(M)指令。即:所有store barrier之前的修改(M)指令都是对之后的指令可见。
巧的是,Intel 的 X86 和 X64 处理器总是在读时使用读栅栏,写时使用写栅栏,无论是否使用
volatile
关键字。所以在使用这些处理器的情况下,这个关键字对硬件来说是无效的。然而,volatile
关键字对编译器和 CLR 进行的优化是有作用的,以及在 64 位 AMD 和 Itanium 处理器上也是有作用的。这意味着不能因为你的客户端运行在特定类型的 CPU 上而放松警惕。
注意:使用volatile
不能阻止写-读被交换
第一条指令 | 第二条指令 | 是否会被交换 |
---|---|---|
读 | 读 | 不会 |
读 | 写 | 不会 |
写 | 写 | 不会(CLR 确保写-写操作永远不会被交换,就算是没有volatile 关键字) |
写 | 读 | 会! |
在下面案例中仍然有可能会打印00的情况(对a的读取可能发生在写入前--重排序)
int a = 0, b = 0;
int x = 0, y = 0;
var task1 = Task.Run(() =>
{
Thread.VolatileWrite(ref a, 1);
x = Thread.VolatileRead(ref b);
});
var task2 = Task.Run(() =>
{
Thread.VolatileWrite(ref b, 2);
y = Thread.VolatileRead(ref a);
});
Task.WaitAll(task1, task2);
Console.WriteLine("x:" + x + " y:" + y);
volatile
关键字不能应用于数组元素,不能用在捕获的局部变量:这些情况下你必须使用VolatileRead
和VolatileWrite
方法
从上面的例子我们可以看出,写-读操作可能被重新排序,官方的解释是:
在多处理器系统上,易失性读取操作不保证获取由任何处理器写入该内存位置的最新值。 同样,易失性写入操作不保证写入的值会立即对其他处理器可见。
(我的理解是:
volatile
关键字只能解决重排序问题,解决不了多处理器的缓存一致性问题)
注意double
和 long
无法标记为 volatile
,因为对这些类型的字段的读取和写入不能保证是原子的。 若要保护对这些类型字段的多线程访问,请使用 Interlocked 类成员或使用 lock
语句保护访问权限。
Interlocked
位于System.Threading
,为多个线程共享的变量提供原子操作,这也是DOTNET为数不多的线程安全类型之一。
Interlocked
通过将原子性的需求传达给操作系统和CLR来进行实现其功能,此类的成员不会引发异常。
可以防止 1.线程上下文切换,2.线程更新可由其他线程访问的变量时,或者当两个线程同时在不同的处理器上执行时 可能会出现的错误。
场景:
int i = 0;
i ++;
在大多数计算机上,自增并不是原子操作,需要以下步骤:
- 将变量
i
的值加载到寄存器中。 - 计算
i + 1
。 - 将上面的计算结果存储在变量
i
中。
假设A线程执行完1-2时被抢占,B线程执行1-2-3,当A线程恢复时继续执行3,此时B线程的值就被覆盖掉了。
使用Increment
即可解决,123会被打包成一个操作,以原子的方式实现自增
CAS
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置的值即可。”
Interlocked.CompareExchange,实现了CAS:比较两个值是否相等,如果相等,则替换第一个值,否则什么都不做,最终返回这个位置的原始值。
Interlocked.CompareExchange(ref _num, 1000, 500);
CAS在保证原子性读写的同时,没有加锁,保障了程序并发度,但也存在缺陷:
- ABA问题
- 只能保证一个地址的读写原子性
- 自旋CAS时间过长,容易给CPU带来大开销
二、延迟初始化
面试时候经常问:单例模式中的懒汉模式线程安全问题
场景:某个字段构造开销非常大,使得在初始化A
时需要承担初始化Expensive
的开销,即使Expensive字段不会被用到。
public class A
{
public readonly Expensive Expensive = new Expensive();
// ..
}
public class Expensive
{
// 构造开销非常昂贵
}
自然会想到懒汉模式:按需加载
public class B
{
private Expensive _expensive;
public Expensive GetExpensiveInstance()
{
if (_expensive == null) _expensive = new Expensive();
return _expensive;
}
}
新的问题产生:GetExpensiveInstance
是线程安全的吗?我们可以通过加锁解决
public class C
{
private readonly object _locker = new object();
private Expensive _expensive;
public Expensive GetExpensiveInstance()
{
lock (_locker)
{
if (_expensive == null) _expensive = new Expensive();
return _expensive;
}
}
}
现在面试官继续问:还有性能更好的版本吗?..
Lazy
net standard1.0 提供System.Lazy<T>
来帮助你以线程安全且高效的方式(DCL)解决延迟初始化问题,只需
public class D
{
private Lazy<Expensive> _expensive = new Lazy<Expensive>(() => new Expensive(), true);
public Expensive GetExpensiveInstance() => _expensive.Value;
}
第一个参数是一个委托,告知如何构建,第二个参数是boolean类型,传false
实现的就是上面提到的plain B
非线程安全迟初始化
双检锁 double checked locking会进行一次额外的易失读(volatile read),在对象已经完成初始化时,能够避免获取锁产生的开销。
public class E
{
private readonly object _locker = new object();
private volatile Expensive _expensive;
public Expensive GetExpensiveInstance()
{
// 额外的易失读(volatile read)
if (_expensive == null)
{
lock (_locker)
{
if (_expensive == null) _expensive = new Expensive();
}
}
return _expensive;
}
}
LazyInitializer
LazyInitializer
是一个静态类,提供EnsureInitialized
方法,第一个参数是需要构造的变量地址,第二个参数是一个委托,告知如何构造
public class F
{
private Expensive _expensive;
public Expensive GetExpensiveInstance()
{
LazyInitializer.EnsureInitialized(ref _expensive,
() => new Expensive());
return _expensive;
}
}
它使用竞争初始化模式的实现,比双检锁更快(在多核心情况下),因为它的实现完全不使用锁。这是一个很少需要用到的极端优化,并且会带来以下代价:
- 当参与初始化的线程数大于核心数时,它会更慢。
- 可能会因为进行了多余的初始化而浪费 CPU 资源。
- 初始化逻辑必须是线程安全的(例如,
Expensive
的构造器对静态字段进行写,就不是线程安全的)。 - 如果初始化的对象是需要进行销毁的,多余的对象需要额外的逻辑才能被销毁。
竞争初始化(race-to-initialize)模式,通过易失性和CAS,实现无锁构造
public class G
{
private volatile Expensive _expensive;
public Expensive Expensive
{
get
{
if (_expensive == null)
{
var instance = new Expensive();
Interlocked.CompareExchange (ref _expensive, instance, null);
}
return _expensive;
}
}
}
三、线程局部存储
我们花费了大量篇幅来讲并发访问公共数据问题,前文提到的锁构造,信号构造,无锁构造本质上都是使用同步构造,使得多线程在访问公共数据时能安全的进行,然而有时我们会希望数据在线程间是隔离的,局部变量就能实现这个目的,但他们的生命周期总是那么短暂(随代码块而释放),我们期待更大作用域的隔离数据,线程局部变量(thread-local storage,TLS)就可以实现这个目的。
ThreadStatic
被ThreadStatic标记的static字段不会在线程间共享,每个执行线程都有一个单独的字段实例
Note:
- 被标记的必须是static字段,不能在实例字段上使用(添加了也无效)
- 请不要给被标记的字段指定初始值,因为这种初始化只会在类被构造时执行一次,影响一个线程,因此他依赖零值
如果你需要使用实例字段,或者非零值,请使用ThreadLocal<T>
public class ThreadStatic测试
{
private readonly ITestOutputHelper _testOutputHelper;
[ThreadStatic] private static int _num;
public ThreadStatic测试(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
void Work()
{
for (int i = 0; i < 100000; i++)
{
_num++;
_testOutputHelper.WriteLine(_num.ToString());
}
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(_num.ToString());
}
}
输出:
100000
100000
0
LocalDataStoreSlot
封装内存槽以存储本地数据。 此类不能被继承。.NET Framework 1.1加入,但在standard2.0+才有。
public sealed class LocalDataStoreSlot
.NET Framework 提供了两种机制,用于使用线程本地存储 (TLS) :LocalDataStoreSlot
和ThreadStaticAttribute
LocalDataStoreSlot
比ThreadStaticAttribute
更慢,更尴尬。此外,数据存储为类型 Object
,因此必须先将其强制转换为正确的类型,然后再使用它。
有关使用 TLS 的详细信息,请参阅 线程本地存储。
同样,.NET Framework 提供了两种使用上下文本地存储的机制:LocalDataStoreSlot
和ContextStaticAttribute
。 上下文相对静态字段是用属性标记的 ContextStaticAttribute 静态字段。 请参考注解
// 同一个 LocalDataStoreSlot 对象可以跨线程使用。
LocalDataStoreSlot _slot = Thread.AllocateNamedDataSlot("mySlot");
void Work()
{
for (int i = 0; i < 100000; i++)
{
int num = (int)(Thread.GetData(_slot)??0);
Thread.SetData(_slot, num + 1);
}
_testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
输出效果和ThreadStaticAttribute
一样:
100000
100000
0
使用Thread.FreeNamedDataSlot("mySlot");
可以释放所有线程上的指定槽,但是只有在所有对该槽的引用都出了其作用域,并且被垃圾回收后才会真正释放。这确保了只要保持对槽的引用,就能继续使用槽。
你也可以通过Thread.AllocateDataSlot()
来创建一个无名槽位,与命名槽的区别是无名槽需要自行控制作用域
当然我们也可以对上面复杂的᠍᠍᠍᠍᠍Thread.GetData
,Thread.SetData
进行封装
LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
int Num
{
get
{
object data = Thread.GetData(_secSlot);
return data == null ? 0 : (int) data; // null 相当于未初始化。
}
set { Thread.SetData (_secSlot, value); }
}
ThreadLocal
ThreadLocal<T>
是 Framework 4.0 加入的,涵盖在netstandard1.0。它提供了可用于静态字段和实例字段的线程局部存储,并且允许设置默认值。
public class ThreadLocal测试
{
ThreadLocal<int> _num = new ThreadLocal<int> (() => 3);
private readonly ITestOutputHelper _testOutputHelper;
public ThreadLocal测试(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
void Work()
{
for (int i = 0; i < 100000; i++)
{
_num.Value++;
}
_testOutputHelper.WriteLine(_num.ToString());
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(_num.ToString());
}
}
输出
100003
100003
3
下面这个测试非常有意思
[Fact]
void Show()
{
var threadName = new ThreadLocal<string>(() => "Thread" + Thread.CurrentThread.ManagedThreadId);
Parallel.For(0, 13, x =>
{
bool repeat = threadName.IsValueCreated;
_testOutputHelper.WriteLine($"ThreadName = {threadName.Value} {(repeat ? "(repeat)" : "")}");
});
threadName.Dispose(); // 释放资源
}
你会发现当Parallel.For第二个参数超过你的逻辑内核后,repeat出现了!
ThreadName = Thread5
ThreadName = Thread8
ThreadName = Thread31
ThreadName = Thread29
ThreadName = Thread31 (repeat)
ThreadName = Thread30
ThreadName = Thread18
ThreadName = Thread12
ThreadName = Thread32
ThreadName = Thread28
ThreadName = Thread33
ThreadName = Thread35
ThreadName = Thread34
Random
类不是线程安全的,所以我们要不然在使用Random
时加锁(这样限制了并发),如今我们有了ThreadLocal:
var localRandom = new ThreadLocal<Random>(() => new Random());
很轻易的就解决了线程安全问题,但是上面的版本使用的Random
的无参构造方法,会依赖系统时间作为生成随机数的种子,在大概 10ms 时间内创建的两个Random
对象可能会使用相同的种子,下边是解决这个问题的一个办法:
var localRandom = new ThreadLocal<Random>(() => new Random (Guid.NewGuid().GetHashCode()) );
特别注意,不要以为GUID全局唯一,GUID的HashCode也全局唯一,上面的随机数仍然不是真随机