第10章:信号量与管程
标签(空格分隔): 操作系统
- 信号量
- 信号量使用
- 互斥访问
- 条件同步
- 管程
- 经典同步问题
- 哲学家就餐问题
- 读者-写者问题
10.1 信号量(Semaphore)
回顾
- 并发问题
- 多线程并发导致资源竞争
- 同步概念
- 协调多线程对共享数据的访问
- 任何时刻只能有一个线程执行临界区代码
- 确保同步正确的方法
- 底层硬件支持
- 高层次的编程抽象
信号量
- 信号量是操作系统提供的一种协调共享资源访问的方法
- 软件同步是平等线程间的一种同步协商机制
- 操作系统是管理者,地位高于进程
- 用信号量表示系统资源的数量
- 由Dijkstra在20世纪60年代提出
- 早起的操作系统的主要同步机制
- 现在很少用(但在计算机科学研究还是非常重要)
- 信号量是一种抽象的数据类型
- 由一个整型(sem)变量和两个原子操作组成
- P()(Prolaag(荷兰语尝试减少))
- sem减1
- 如sem<0,进入等待,否则继续
- V()(Verhoog(荷兰语增加))
- sem加1
- 如sem<=0,唤醒一个等待进程
信号量的特性
- 信号量是被保护的整数变量
- 初始化完成后,只能通过P()和V()操作修改
- 由操作系统保证,PV操作是原子操作
- P()可能阻塞,V()不会阻塞
- 通常假定信号量是“公平的”
- 线程不会被无限期阻塞在P()操作
- 假定信号量等待按先进先出排队
- 自旋锁是做不到先进先出的,因为自旋锁需要占用CPU随时地去查,有可能临界区使用者退出的时候它刚查完,下一个进入者是谁去查它就能进去
信号量的实现
ClassSemaphore {
int sem;
WaitQueue q;
}
Semaphore :: P() {
sem --;
if (sem<0) {
Add this thread t to q;
block(P);
}
}
Semaphore :: V() {
sem ++;
if (sem<=0) {
Remove a thread t from q;
wakeup(t);
}
}
10.2 信号量的使用
信号量分类
- 可分为两种信号量
- 二进制信号量:资源数目为0或1
- 资源信号量:资源数目为任何非负值
- 两者等价
- 基于一个可以实现另一个
信号量使用
- 互斥访问
- 临界区的互斥访问控制
- 条件同步
- 线程间的事件等待
用信号量实现临界区的互斥访问
- 每类资源设置一个信号量,其初值为1
mutex = new Semaphore(1); //初始化
mutex -> P(); //第一个线程进来之后,由1变成0,能进去;若第二个线程也想进入临界区,由0变成-1,在P操作进入等待状态
critical section;
mutex -> V(); //第一次线程执行结束,进行V操作,使信号量计数由-1变成0, 唤醒第二个进程,则第二个进程就会在第一个进程结束时进入临界区
- 必须成对使用P()和V()操作
- P()操作保证互斥访问临界资源
- V()操作在使用后释放临界资源
- PV操作不能次序错误、重复或遗漏
用信号量实现条件同步
- 条件同步设置一个信号量,其初值为0
condition = new Semaphore(0);
//线程A 线程B
...M...
condition -> P();
...N... //使用数据 ...X... //准备数据
condition -> v();
...Y...
- 条件等待:线程A要等待线程B执行完X模块才能执行N模块
- 假设B先执行完X,此时信号量变成1,则N能直接往下执行
- 假设A先执行完M,此时信号量变成-1,阻塞,进入等待状态,然后进程B进行V操作,释放,此时信号量为0。若信号量小于或等于0,则说明有线程在等待,此时唤醒等待中的线程A,执行N模块
生产者-消费者问题
生产者 -> 缓冲区 -> 消费者
- 有界缓冲区的生产者-消费者问题描述
- 一个或多个生产者在生成数据后放在一个缓冲区里
- 单个消费者从缓冲区取出数据处理
- 任何时刻只能有一个生产者或消费者可访问缓冲区
- 问题分析
- 任何时刻只能有一个线程操作缓冲区(互斥访问)
- 缓冲区空时,消费者必须等待生产者(条件同步)
- 缓冲区满时,生产者必须等待消费者(条件同步)
- 用信号量描述每个约束
- 二进制信号量 mutex
- 资源信号量 fullBuffers
- 资源信号量 emptyBuffers
- 信号量实现
CLass BoundedBuffer {
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}
BoundedBuffer :: Deposit(c) {
emptyBuffer -> P();
mutex -> P();
Add c to the buffer;
mutex -> V();
fullBuffers -> V();
}
BoundedBuffer :: Remove(c) {
fullBuffer -> P();
mutex -> P();
Remove c from buffer;
mutex -> V();
emptyBuffers -> V();
}
使用信号量的困难
- 读/开发代码比较困难
- 程序员需要能运用信号量机制
- 容易出错
- 使用的信号量已经被另一个线程占用
- 忘记释放信号量
- 不能够处理死锁问题
10.3 管程(Monitor)
- 改进信号量在处理临界区的时候的一些麻烦
- defs:管程是一种用于多线程互斥访问共享资源的程序结构
- 采用面向对象方法,简化了线程的同步机制
- 任一时刻最多只有一个线程执行管理代码
- 正在管程中的线程可临时放弃管理的互斥访问,等待事件出现时恢复
注
- 一个线程在临界区执行,必须执行到它退出临界区,它才能放弃临界区的互斥访问,而管程允许在执行的过程当中,临时放弃。放弃之后其他线程就可进入管程区域
管程的使用
- 在对象/模块中,收集相关共享数据
管程组成
- 一个锁
- 控制管程代码的互斥访问
- 0个或多个条件变量
- 0个等同于临界区
- 管理共享数据的并发访问
条件变量(Condition Variable)
- 条件变量是管程内的等待机制
- 进入管程的线程因资源被占用而进入等待状态
- 每个条件变量表示一种等待原因,对应一个等待队列
- Wait()操作
- 将自己阻塞在等待队列中
- 唤醒一个等待着或释放管程的互斥访问
- Signal()操作
- 将等待队列中的一个线程唤醒
- 如果等待队列为空,则等同空操作
条件变量实现
Class Condition {
int numWaiting = 0;
WaitQueue = q;
}
Condition :: wait(lock) {
numWaiting ++;
Add this thread t to q;
release(lock);
schedule();//need mutex
require(lock);
}
Condition :: Signal() {
if (numWaiting>0) {
Remove a thread from q;
wakeup(t);//need mutex
numWaiting --;
}
}
用管程解决生产者-消费者问题
ClassBoundedBuffer {
...
Lock lock;
int count = 0;
Condition notFull,notEmpty;
}
BoundedBuffer :: Deposit(c) {
lock -> Acquire();
while (count == n)
notFull.Wait(&lock);
Add c to the buffer;
count ++;
notEmpty.Signal();
lock -> Release();
}
BoundedBuffer :: Remove(c) {
lock -> Acquire(c);
while (count == 0)
notEmpty.Wait(&lock);
Remove c from buffer;
count --;
notFull.Signal();
lock -> Release();
}
管理条件变量的释放处理方式
- Hansen 管程
l.acquire()
...
x.wait() //T1进入等待
//T2进入管程 l.acquire()
...
x.Signal()
...
//T1退出管程 l.release()
... //T1恢复管程执行
l.release()
- Hoare 管程
l.acquire()
...
x.wait() //T1进入等待
//T2进入管程 l.acquire()
...
//T2进入等待 x.Signal()
... //T1恢复管程执行
l.release() //T1结束
... //T2恢复管程执行
l.release()
Hansen 管程和Hoare 管程
- Hansen 管程
- 主要用于真实操作系统和Java中
- 当前正在执行的线程优先级更高
- 效率更高,少了一次切换
-
Hoare 管程
- 主要见于教材
- 内部的线程优先执行
- 优先角度考虑更合理,更容易证明正确性
Hansen 管程和Hoare 管程区别
Hansen-style:Deposit() {
lock -> acquire();
while (count == n) {
notFull.Wait(&lock);
}
Add thing;
count ++;
notEmpty.Signal();
lock -> Release();
}
Hoare-style:Deposit() {
lock -> acquire();
if (count == n) {
notFull.Wait(&lock);
}
Add thing;
count ++;
notEmpty.Signal();
lock -> Release();
}
10.4 经典同步问题
(一)哲学家就餐问题
问题描述
-
5个哲学家围绕着一张圆桌而坐
- 桌上放着5支叉子
- 每两个哲学家之间放1支
-
哲学家的动作包括思考和进餐
- 进餐时需要同时拿到左右两边的叉子
- 思考时将两支叉子放回原处
-
如何保证哲学家们的动作有序进行?
- 如:不出现永远都难不到叉子的情况
方案1
#define N 5 //哲学家个数
semaphore fork[5]; //信号量初值为1
void philosopher(int i) //哲学家编号:0-4
while (TRUE)
{
think(); //哲学家在思考
P(fork[i]); //去拿左边的叉子
P(fork[(i+1)%N]); //去拿右边的叉子
eat(); //吃面条中
V(fork[i]); //放下左边的叉子
V(fork[(i+1)%N]); //放下右边的叉子
}
- 不正确,可能导致死锁
- 5个人同时拿到左边刀叉时,就会进入死锁状态
方案2
- 一旦无法充分利用系统资源,就把所有资源打成一个整包,只有一个进程可占用这一整包的资源,不管它是不是都需要,这种情况系统也能正常运行,只是效率低一些。
#define N 5 //哲学家个数
semaphore fork[5]; //信号量初值为1
semaphore mutex; //互斥信号量,初值为1
void philosopher(int i) //哲学家编号:0-4
while (TRUE)
{
think(); //哲学家在思考
P(mutex); //进入临界区,任一时刻只能有一个进程申请到二进制信号量
P(fork[i]); //去拿左边的叉子
P(fork[(i+1)%N]); //去拿右边的叉子
eat(); //吃面条中
V(fork[i]); //放下左边的叉子
V(fork[(i+1)%N]); //放下右边的叉子
V(mutex); //退出临界区
}
- 互斥访问正确,但每次只允许一个人进餐,性能不好
方案3
#define N 5 //哲学家个数
semaphore fork[5]; //信号量初值为1
void philosopher(int i) //哲学家编号:0-4
while (TRUE)
{
think(); //哲学家在思考
if (i%2 == 0) {
P(fork[i]); //去拿左边的叉子
P(fork[(i+1)%N]); //去拿右边的叉子
} else {
P(fork[(i+1)%N]); //去拿右边的叉子
P(fork[i]); //去拿左边的叉子
}
eat(); //吃面条中
V(fork[i]); //放下左边的叉子
V(fork[(i+1)%N]); //放下右边的叉子
}
- 没有死锁,可有多人同时访问
(二)读者-写者问题
问题描述
-
共享数据的两类使用者
- 读者:只读取数据,不修改
- 写者:读取和修改数据
-
读者-写者问题描述:对共享数据的读写
-
“读-读”允许
- 同一时刻,允许有多个读者同时读
-
“读-写”互斥
- 没有写者时读者才能读
- 没有读者时写者才能写
-
“写-写”互斥
- 没有其他写者时写者才能写
-
用信号量解决读者-写者问题
-
用信号量描述每个约束
-
信号量WriteMutex
- 控制读写操作互斥
- 初始化为1
-
读者计数Rcount
- 正在进行读操作的读者数目
- 初始化为0
-
信号量CountMutex
- 控制对读者计数的互斥修改
- 初始化为1
-
//Writer
P(WriteMutex);
write;
V(WriteMutex);
//Reader
P(CountMutex);
if(Rcount == 0) //第一个读者才需要申请读写互斥信号量,后来读者只需读者计数器加1
P(WriteMutex);
++ Rcount;
V(CountMutex);
read;
P(CountMutex);
-- Rcount;
if(Rcount == 0) //只有最后一个读者才需要释放读写互斥信号量、
V(WriteMutex);
V(CountMutex);
- 特征:读者优先
- 即后来的读者一定会先于等待的写者进行写操作
读者-写者问题:优先策略
- 读者优先策略
- 只要有读者正在读状态,后来的读者都能直接进入
- 如读者持续不断进入,则写者就处于饥饿
- 写者优先策略
- 只要有写者就绪,写者应尽快执行写操作
- 如写者持续不断就绪,则读者就处于饥饿
用管程解决读者-写者问题
- 两个基本方法
Database :: Read() {
Wait until no writers;
read database;
check out - wake up waiting writers;
}
Database :: Write() {
Wait until no readers/writers;
write database;
check out - wake up waiting readers/writers;
}
- 管程的状态变量
AR=0; //# of active readers
AW=0; //# of active writers
// 只有一个>0
WR=0; //# of waiting readers
WW=0; //# of waiting writers
//可能都>0
Lock lock;
Condition okToRead;
Condition okToWrite;
- 解决方案详情:读者
AR=0; //# of active readers
AW=0; //# of active writers
WR=0; //# of waiting readers
WW=0; //# of waiting writers
Lock lock;
Condition okToRead;
Condition okToWrite;
Public Database :: Read() { //Wait until no writers;
StartRead();
read database;
//check out - wake up waiting writers;
DoneRead();
}
Private Database :: StartRead() {
lock.Acquire();
while ((AW+WW)>0) { //条件决定了优先策略
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Private Database :: DoneRead() {
lock.Acquire();
AR--;
if ((AR==0 && WW)>0) { //条件决定了优先策略
okToWrite.signal();
}
lock.Release();
}
- 解决方案详情:写者
AR=0; //# of active readers
AW=0; //# of active writers
WR=0; //# of waiting readers
WW=0; //# of waiting writers
Lock lock;
Condition okToRead;
Condition okToWrite;
Public Database :: Write() { //Wait until no readers/writers;
StartWrite();
write database;
//check out - wake up waiting readers/writers;
DoneWrite();
}
Private Database :: StartWrite() {
lock.Acquire();
while ((AW+AR)>0) {
WW++;
okToWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}
Private Database :: DoneWrite() {
lock.Acquire();
AW--;
if (WW>0) {
okToWrite.signal();
} else if (WR>0) {
okToRead.broadcast();
}
lock.Release();
}