《Java高并发程序设计》学习 --3.1多线程的团队协作:同步控制

时间:2021-04-15 23:49:21
1)synchronized的功能扩展:重入锁 重入锁可以完全替代synchronized关键字。在JDK5.0的早期版本,重入锁的性能远远好于synchronized,但从JDK6.0开始,JDK在synchronized上做了大量的优化,使得两者的性能差距并不大。 重入锁使用java.util.concurrent.locks.ReentrantLock类来实现,如下代码:
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
lock.lock();
try {
i++;
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock tl = new ReenterLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
与synchronized相比,重入锁有着显示的操作过程。开发人员必须手动指定何时加锁,何时释放锁。重入锁对逻辑控制的灵活性要远远好于synchronized。但值得注意的是,在退出临界区时,必须记得释放锁,否则,其他线程就没有机会访问临界区了。这种锁是可以反复进入的。 中断响应 重入锁可以提供中断处理的能力。对于synchronized来说,如果一个线程在等待锁,那么结果只有两种情况,那么它获得这把锁继续执行,要么它就保持等待。而使用重入锁,则提供另外一种可能,那就是线程可以被中断。也就是在等待的过程中,程序可以根据需要取消对锁的请求。下面的代码产生了一个死锁,但得益于锁中断,可以轻易地解决这个死锁。
import java.util.concurrent.locks.ReentrantLock;
public class IntLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public IntLock(int lock) {
this.lock = lock;
}
@Override
public void run() {
try {
if(lock == 1) {
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
lock2.lockInterruptibly();
}
} else {
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
lock1.lockInterruptibly();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock1.isHeldByCurrentThread())
lock1.unlock();
if(lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId()+":线程退出");
}
}
public static void main(String[] args) throws InterruptedException {
IntLock r1 = new IntLock(1);
IntLock r2 = new IntLock(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
Thread.sleep(1000);
t2.interrupt();
}
}
线程t1和t2启动后,t1先占用lock1,再请求lock2;t2先占用lock2,再请求lock1。因此,很容易形成t1和t2之间的相互等待。这里,对锁的请求,统一用lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应中断。Thread.sleep(1000)表示主线程main处于休眠,此时,两个线程处于死锁状态;t2.interrupt()表示t2线程被中断,故t2会放弃对lock1的申请,同时释放lock2。这个操作导致t1线程可以顺利得到lock2而继续执行下去。 锁申请等待限时。 除了等待外部通知之外,要避免死锁还有另外一种方法,那就是限时等待。
public class TimeLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try {
if(lock.tryLock(5,TimeUnit.SECONDS)) {
Thread.sleep(6000);
} else {
System.out.println("get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock.isHeldByCurrentThread())
lock.unlock();
}
}
public static void main(String[] args) {
TimeLock tl = new TimeLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();
t2.start();
}
}
在这里,tryLock()方法接收两个参数,一个表示等待时长,另外一个表示计时单位。这里的单位设置为秒,时长为5,表示线程在这个锁请求中,最多等待5秒。如果超过5秒还没有得到锁,就会返回false。如果成功获得锁,则返回true。 在本例中,由于占用锁的线程会持有锁长达6秒,故另一个线程无法在5秒的等待时间内获得锁,因此,请求会失败。 ReentrantLock.tryLock()方法也可以不带参数直接运行。在这种情况下,当前线程会尝试获得锁,如果锁并未被其他线程占用,则申请锁会成功,并立即返回true。如果锁被其他线程占用,则当前线程不会进行等待,而是立即返回false。这种模式不会引起线程等待,因此也不会产生死锁。下面演示了这种使用方式:
public class TryLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public TryLock(int lock) {
this.lock = lock;
}
@Override
public void run() {
if(lock == 1) {
while(true) {
if(lock1.tryLock()) {
try {
try {
Thread.sleep(500);
} catch (InterruptedException e) {

}
if(lock2.tryLock()) {
try {
System.out.println(Thread.currentThread().getId() + ":My Job done");
return;
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
}
} else {
while(true) {
if(lock2.tryLock()) {
try {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
if(lock1.tryLock()) {
try {
System.out.println(Thread.currentThread().getId() + ":My Job done");
return;
} finally {
lock1.unlock();
}
}
} finally {
lock2.unlock();
}
}
}
}
}
public static void main(String[] args) {
TryLock r1 = new TryLock(1);
TryLock r2 = new TryLock(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
}


上述代码中,采用了非常容易死锁的加锁顺序。但是使用tryLock()后,这种情况大大改善了。由于线程不会傻傻地等待,而是不停地尝试,因此,只要执行足够长的时间,线程总是会得到所有需要的资源,从而正常执行。 公平锁 如果使用synchronized关键字进行锁控制,那么产生的锁就是非公平的。而重用锁允许我们对其公平性进行设置。它有一个如下的构造函数: public Reentrantlock(bool fair) 当参数fair为true时,表示锁是公平的。 下面的代码可以很好地突出公平锁的特点:
public class FairLock implements Runnable {
public static ReentrantLock fairLock = new ReentrantLock(true);
@Override
public void run() {
while(true) {
try {
fairLock.lock();
System.out.println(Thread.currentThread().getName() + "获得锁");
} finally {
fairLock.unlock();
}
}
}
public static void main(String[] args) {
FairLock r1 = new FairLock();
Thread t1 = new Thread(r1,"Thread_t1");
Thread t2 = new Thread(r1,"Thread_t2");
t1.start();
t2.start();
}
}
对于ReentrantLock的几个重要方法整理如下:lock():获得锁,如果锁已经被占用,则等待。lockInterruptibly():获得锁,但优先响应中断。tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回。tryLock(long time,TimeUnit unit):在给定时间内尝试获得锁。unlock():释放锁。在重入锁的实现中,主要包含三个要素:第一,是原子状态。原子状态使用CAS操作来存储当前锁的状态,判断锁是否已经被别的线程持有。第二,是等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。第三,是阻塞原语park()和unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。
2)重入锁的好搭档:Condition条件它和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的,而Condition是与重入锁相关联的。通过Lock()接口的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。Condition接口提供的基本方法如下:
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
以上方法的含义如下:await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()或者signalAll ()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。awaitUninterruptibly()方法与await()方法基本相同,但是它并不会在等待过程中响应中断。signal()方法用于唤醒一个在等待中的线程。相对于signalAll()方法会唤醒所有在等待中的线程。这和Object.notify()方法很类似。下面代码简单地演示了Condition的功能:
public class ReenterLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (InterruptedException e) {
// TODO: handle exception
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLockCondition tl = new ReenterLockCondition();
Thread t1 = new Thread(tl);
t1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
}
}
当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await()调用后,这个线程会释放这把锁。同理,在Condition.siginal()方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行。
3)允许多个线程同时访问:信号量(Semaphore)信号量为多线程协作提供了更为强大的控制方法。信号量可以指定多个线程,同时访问某一资源。信号量主要提供了以下构造函数:public Semaphore(int permits)public Semaphore(int permits, boolean fair)在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于制定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有:public void acquire()public void acquireUninterruptibly()tryAcquire()public boolean tryAcquire(long timeout, TimeUnit unit)public void release()acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可后者当前线程被中断。acquireUninterruptibly()方法和acquire()方法类似,但是不响应中断。tryAcquire()尝试获得一个许可,如果成功返回true,失败则返回false,它不会进行等待,立即返回。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
4)ReadWriteLock 读写锁ReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效地帮助减少锁竞争,以提升系统性能。读写锁的访问约束如下表所示。
 
非阻塞 阻塞
阻塞 阻塞
代码示例如下:
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;
public Object handleRead(Lock lock) throws InterruptedException {
try {
//模拟读操作
lock.lock();
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock,int index) throws InterruptedException {
try {
//模拟写操作
lock.lock();
Thread.sleep(1000);
value = index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleRead(readLock);
demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable writeRunnable = new Runnable() {
public void run() {
try {
demo.handleWrite(writeLock, new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for(int i=0; i<18; i++) {
new Thread(readRunnable).start();
}
for(int i=18; i<20; i++) {
new Thread(writeRunnable).start();
}
}
}
5)倒计时器:CountDownLatch这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。下面的例子演示了CountDownLatch的使用。
public class CountDownLatchDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(10);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
exec.submit(demo);
}
end.await();
System.out.println("Fire!");
exec.shutdown();
}
}
6)循环栅栏:CyclicBarrier
  1.  CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。 
  2.  CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。 
  3.  CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
在所有参与者都已经在此 barrier 上调用 await方法之前,将一直等待。如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:
  • 最后一个线程到达;或者
  • 其他某个线程中断当前线程;或者
  • 其他某个线程中断另一个等待线程;或者
  • 其他某个线程在等待 barrier 时超时;或者
  • 其他某个线程在此 barrier 上调用 reset()。
下面的示例使用CyclicBarrier演示了司令命令士兵完成任务的场景:
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclic;
Soldier(CyclicBarrier cyclic, String soldierName) {
this.cyclic = cyclic;
this.soldier = soldierName;
}
@Override
public void run() {
try {
cyclic.await();
doWork();
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ":任务完成");
}
}
public static class BarrierRun implements Runnable {
boolean flag;
int N;
public BarrierRun(boolean flag, int N) {
this.flag = flag;
this.N = N;
}
@Override
public void run() {
if(flag) {
System.out.println("司令:[士兵" + N + "个,任务完成!]");
} else {
System.out.println("司令:[士兵" + N + "个,集合完毕!]");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));

System.out.println("集合队伍!");
for (int i = 0; i < N; i++) {
System.out.println("士兵" + i + "报道!");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
allSoldier[i].start();
}
}
}
如果当前线程:
  • 在进入此方法时已经设置了该线程的中断状态;或者
  • 在等待时被中断
则抛出 InterruptedException,并且清除当前线程的已中断状态。如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出BrokenBarrierException 异常。如果任何线程在等待时被中断,则其他所有等待线程都将抛出 BrokenBarrierException 异常,并将 barrier 置于损坏状态。
7)线程阻塞工具类:LockSupportLockSupport是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。他们实现了一个限时的等待。LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可(也就是将许可变为不可用),如果许可不可用,就会阻塞。示例代码如下:
public class LockSupportDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");

public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
@Override
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
除了有定时阻塞的功能外,LockSupport.park()还能支持中断影响。但是和其他接收中断的函数很不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,但是我们可以从Thread.interrupted()等方法中断标记。
public class LockSupportIntDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
@Override
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
if(Thread.interrupted()) {
System.out.println(getName() + "被中断了");
}
}
System.out.println(getName()+"执行结束");
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
}
}