并发基础知识
一、线程的基本概念
线程表示一条单独的执行流,它有自己的程序计数器,有自己的栈。
1.创建线程
1)继承Thread
Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写run方法来实现一个线程:
public class MyThread extends Thread{
@Override
public void run() {
System.out.println("thread name: " + Thread.currentThread().getName() +
" thread id: " + Thread.currentThread().getId());
System.out.println("Running my thread!");
} public static void main(String[] args) {
MyThread thread = new MyThread();
//启动线程
thread.start();
/*thread name: Thread-0 thread id: 11
Running my thread!*/
}
}
2)实现Runnable接口
public class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("thread name:" + Thread.currentThread().getName());
} public static void main(String[] args) {
System.out.println("Main thread name : "
+ Thread.currentThread().getName()); //Main thread name : main
Thread thread = new Thread(new MyRunnable()); //thread name:Thread-0
thread.start();
}
}
2.线程的基本属性和方法
1)id和name
id: 一个递增整数,每创建一个线程就加一
name:默认值是Thread-后跟一个编号,name可以在Thread的构造方法中指定,可以通过setName方法进行设置。
2)优先级
优先级从1到10,默认为5,相关方法:
public final void setPriority(int newPriority)
public final int getPriority()
在编程中不要过分依赖优先级。
3)状态
线程有一个状态概念,Thread获取状态方法:
public State getState()
返回值类型为Thread.State,它是一个枚举值:
public enum State {
NEW, //线程还没调用start
//调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE,
//不过,这并不代表CPU一定在执行该线程的代码,可能正在执行也可能在
//等待操作系统分配时间片,只是它在等待其他条件。
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED; //线程已经结束运行
}
Thread还有一个方法,返回线程是否alive:
//线程被启动后,run方法运行结束前,返回值都是true
public final native boolean isAlive()
4.是否是daemon线程
一般情况下整个程序只有在所有的线程都结束后,线程才会退出。
而daemon线程不一样,它是守护线程,当它辅助的主线程结束时,它也会结束。
public final void setDaemon(boolean on)
public final boolean isDaemon()
5.sleep方法
Thread有一个静态方法,调用该方法会让当前线程睡眠指定时间,单位是毫秒。
public static native void sleep(long millis) throws InterruptedException;
在睡眠期间,该线程会让出CPU,但睡眠时间不是确切的给定毫秒数,可能有一定偏差。
睡眠期间,线程可以被中断,如果被中断,sleep会抛异常。
6.yield方法
public static native void yield();
调用该方法意思是:我现在不急着占用CPU,你可以先让其他线程运行。
不过这对系统调度器也仅仅是建议,调度器如何处理不一定,它可能忽略该调用。
7.join方法
在前面的MyThread例子中,MyThread可能没执行完,main线程就可能执行完了。
Thread有一个join方法,可以让调用join的线程等待该线程的结束。
public final void join() throws InterruptedException
//限定等待的最长时间
public final synchronized void join(long millis) throws InterruptedException
MyThread thread = new MyThread();
thread.start();
//让main线程在子线程调用结束后再退出,相当于阻塞了main线程
thread.join();
3.共享内存及可能出现的问题
虽然每个线程表示一条单独的执行流,有自己的程序计数器和栈,
但线程之间可以共享内存,它们可以访问和操作相同的对象。
public class ShareMemoryDemo {
private static int shared = 0;
private static void incrShared() {
shared ++;
}
static class ChildThread extends Thread {
List<String> list;
public ChildThread(List<String> list) {
this.list = list;
}
@Override
public void run() {
incrShared();
list.add(Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
ArrayList<String> list = new ArrayList<>();
ChildThread t1 = new ChildThread(list);
ChildThread t2 = new ChildThread(list);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(shared);
System.out.println(list);
/*2
[Thread-0, Thread-1]*/
}
}
ChildThread的run方法访问了共享变量shared和list。
执行流、内存和程序代码之间的关系:
1)不同的执行流可以访问和操作相同的变量
2)不同的执行流可以执行相同的程序代码
所以,在分析代码执行过程时,理解代码在被哪个线程执行是很重要的
3)当多条执行流执行相同的程序代码时,每条执行流都有自己的栈,方法中
的参数和局部变量都有自己的一份。
当多条执行流可以操作相同的变量时,可能会出现意料之外的结果,包括竞态条件和内存可见性问题。
1.竞态条件
所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终结果和执行时序
有关,可能正确也可能不正确。
public class CounterThread extends Thread{
private static int counter = 0;
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
counter++;
}
}
public static void main(String[] args) throws InterruptedException {
int num = 1000;
Thread[] threads = new Thread[num];
for (int i = 0; i< num; i++) {
threads[i] = new CounterThread();
threads[i].start();
}
for (int i = 0; i < 1000; i++) {
threads[i].join();
}
System.out.println(counter); //
}
}
期望结果应该是10000000,实际结果为998400,为什么呢?
因为counter++这个操作不是原子操作,它分为了3个步骤:
1)去counter的当前值
2)在当前值上加1
3)将新值重新赋值给counter
两个线程可能同时执行第一步,取到了相同的counter值。
2.内存的可见性
多个线程可以共享和访问和操作相同的变量,但一个线程对一个共享变量的修改,
另一个线程不一定能马上见到,甚至永远见不到。
public class VisibilityDemo {
private static boolean shutdown = false;
static class MyThread extends Thread {
@Override
public void run() {
while (!shutdown) {
//do nothing
}
System.out.println("exit myThread");
}
}
public static void main(String[] args) throws InterruptedException {
new MyThread().start();
Thread.sleep(1000);
shutdown = true;
System.out.println("exit main!");
}
}
期望的结果是两个线程都退出,但实际执行时,很可能会发现myThread永远不退出,
也就是说在myThread执行流看来,shutdown永远为false,即使main线程已经更改为了ture。
这就是内存可见性问题,在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及
各级缓存中,当访问一个变量时,可能从CPU寄存器和各级缓存取,而不是从内存,当
修改一个变量时,也可能是先修改到缓存中,稍后再同步更新到内存中。在单线程程序中,这
一般不是问题,但在多线程程序中,尤其是有多个CPU的情况下,这就是严重问题。一个线程对
内存的修改,另一个线程看不到,一是修改没有及时同步到到内存,二是另一个线程根本就没有从内存中读取。
二、理解synchronized(同步)
1.用法和基本原理
synchronized可以用于:
1)实例方法:
public class Counter {
private int count;
public synchronized void incre() {
count ++;
}
public synchronized int getCount() {
return count;
}
}
public class CounterThread extends Thread {
private Counter counter;
public CounterThread(Counter counter) {
this.counter = counter;
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
counter.incre();
}
}
public static void main(String[] args) throws InterruptedException {
int num = 1000;
Counter counter = new Counter();
Thread[] threads = new Thread[num];
for (int i = 0; i < num; i++) {
threads[i] = new CounterThread(counter);
threads[i].start();
threads[i].join();
}
System.out.println(counter.getCount());
}
}
看上去,synchronized使得同时只能有一个线程执行实例方法,
但这个理解是不确切的。多个线程是可以同时执行一个synchronized方法的,
只要它们访问的对象不同即可。所以,synchronized实例方法保护的是同一个
对象方法的调用,确保同时只能有一个线程执行。大致过程如下:
1)尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒;
2)执行实例方法代码;
3)释放锁,如果等待队列上有等待线程,从中抽取一个并唤醒,如果有多个等待线程,唤醒
哪一个是不一定的,不保证公平性。
当线程不能获得锁的时候,他会加入等待队列,状态会变为BLOCKED.
注意:一般在保护变量时,需要在所有访问该变量的方法上加上synchronized。
再次强调:synchronized保护的是对象而非代码,只要访问的是同一个对象的synchronized方法,
即使是不同的代码,也会被同步顺序访问。比如,Counter中的两个实例方法,incre和getCount,
对于同一个Counter对象,一个线程想去执行incre,另一个线程想执行getCount,它们是不能同时执行的,
会被synchronized同步顺序执行。
2)静态方法
synchronized保护的是类对象,而非实例对象(实际上,每个对象都有一个锁和等待队列,类对象也不例外)。
public class StaticCounter {
private static int count = 0;
public static void incr() {
synchronized (StaticCounter.class) {
count ++;
}
}
public static int getCount() {
synchronized (StaticCounter.class) {
return count;
}
}
}
3)代码块
public class Counter {
private int count;
public void incre() {
//synchronized括号里面的就是保护对象{}里就是同步执行代码
synchronized (this) {
count ++;
}
}
public synchronized int getCount() {
synchronized (this) {
return count;
}
}
}
public class Counter {
private int count;
private Object obj = new Object();
public void incre() {
//synchronized同步的对象可以是任意对象,任意对象都有一个锁和等待队列
synchronized (obj) {
count ++;
}
}
public synchronized int getCount() {
synchronized (obj) {
return count;
}
}
}
2.进一步理解synchronized
1)可重入性
对同一个执行线程,在它获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。
比如,在一个synchronized实例方法内,可以调用同一个实例中的synchronized实例方法。
2)内存可见性
synchronized除了保证原子操作外,它还有一个重要作用,就是保证内存可见性,
在释放锁时,所有写入都会写入内存,而获得锁后,都会从内存中读取最新数据。
不过,如果只是为了保证内存可见性,synchronized成本有点高,可以给变量加修饰符volatile代替。
加了volatile后,Java会在操作对应变量时加入特殊指令,保证读写到内存最新值,而非缓存值。
3)死锁
死锁举例:有a、b两个线程,a线程持有锁A,在等待锁B,b线程持有锁B,在等待锁A,
a、b陷入了互相等待,最后谁都执行不下去。
public class DeadLockDemo {
private static Object lockA = new Object();
private static Object lockB = new Object();
public static void startThreadA() {
Thread t = new Thread(() -> {
synchronized (lockA) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {}
}
});
t.start();
}
public static void startThreadB() {
Thread t = new Thread(() -> {
synchronized (lockB) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockA) {}
}
});
t.start();
}
public static void main(String[] args) {
startThreadA();
startThreadA();
}
}
如何解决死锁问题:首先,应该尽量避免在持有一个锁的同时去申请另一个锁,
如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。
3.同步容器
Collections类中有一些方法,可以返回线程安全的同步容器:
public static <T> Collection<T> synchronizedCollection(Collection<T> c)
public static <T> List<T> synchronizedList(List<T> list)
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
它们是给所有容器方法加上synchronized来实现安全的,例如:
static class SynchronizedCollection<E> implements Collection<E> {
final Collection<E> c; //Backing Collection
final Object mutex; //Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
if(c==null)
throw new NullPointerException();
this.c = c;
mutex = this;
} public int size() {
synchronized (mutex) {return c.size();}
}
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
synchronized (mutex) {return c.remove(o);}
}
//…
}
这里的线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,
不需要额外的同步操作,也不会出现错误结果。这样是不是就绝对安全了呢?不是
的我们还需要注意以下情况:
1)复合操作
public class EnhancedMap <K, V> {
Map<K, V> map;
public EnhancedMap(Map<K,V> map){
this.map = Collections.synchronizedMap(map);
}
public V putIfAbsent(K key, V value){
V old = map.get(key);
if(old!=null){
return old;
}
return map.put(key, value);
}
public V put(K key, V value){
return map.put(key, value);
}
//…
}
其中的putAbsent方法语义是只有在原方法没有对应键的情况下才添加,
这是一个检查然后在更新的操作,在多线程的情况下,可能有多个线程
执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,这就破坏了该方法的语义。
2.伪同步
那么给该方法加上synchronized就线程安全了吗?
public synchronized V putIfAbsent(K key, V value){
V old = map.get(key);
if(old!=null){
return old;
}
return map.put(key, value);
}
答案是否定的,原因是同步对象错了。putAbsent同步使用的是EnhancedMap对象,
而其他方法使用的是Collections当作的map对象。要解决这个问题,所有方法必须
使用相同的锁。所以可以改为:
public V putIfAbsent(K key, V value){
synchronized(map){
V old = map.get(key);
if(old!=null){
return old;
}
return map.put(key, value);
}
}
3.迭代
对于同步容器对象,虽然单个操作是安全的,但迭代并不是:
public class DeadLockDemo { private static void startModifyThread(final List<String> list) {
Thread modifyThread = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 100; i++) {
list.add("item " + i);
try {
Thread.sleep((int) (Math.random() * 10));
} catch (InterruptedException e) {
}
}
}
});
modifyThread.start();
} private static void startIteratorThread(final List<String> list) {
Thread iteratorThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
for(String str : list) {
}
}
}
});
iteratorThread.start();
} public static void main(String[] args) {
final List<String> list = Collections
.synchronizedList(new ArrayList<String>());
startIteratorThread(list);
startModifyThread(list);
}
/*Exception in thread "Thread-0" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at com.cdert.roadpms.test.utils.DeadLockDemo$2.run(DeadLockDemo.java:34)
at java.lang.Thread.run(Thread.java:748)*/
}
如果在遍历的同时容器发生了结构性变化就会抛出该异常。
可以改为(在遍历的时候给整个容器对象加锁):
private static void startIteratorThread2(final List<String> list) {
Thread iteratorThread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
synchronized(list){
for(String str : list) {
}
}
}
}
});
iteratorThread.start();
}
二、线程的基本协作机制
一)协作的场景
1)生产者/消费者模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到
队列上,而消费者从队列取数据或者任务,如果队列长度有限,在队列满的时候,生产者需要等待,
在队列为空的时候,消费者需要等待。
2)同时开始:在一些程序,尤其是仿真程序中,要求多个线程同时开始。
3)等待结束:主线程将任务分解成若干子任务,为每个子任务创建一个线程,主线程在继续执行其他
任务之前需要等待每个子任务执行完毕。
4)异步结果:在主从协作模式中,主线程手工创建子线程往往笔记麻烦,一种常见的模式是把子线程封装
为异步调用,异步调用马上返回,但返回的不是最终结果,而是一个称为Future的对象,通过它可以在随后获得最终结果。
5)集合点:在一些程序中,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程
到齐后,交还数据和计算结果,再进行下一次迭代。
二、wait/notify
在Java根父类中定义了一些线程协作的基本方法,这些方法
分为两类:wait和notify。wait主要有两个方法:
public final void wait() throws InterruptedException
//单位为毫秒表示最长等待时间,wait(0)表示无限期等待,无参wait等于调用wait(0)
public final native void wait(long timeout) throws InterruptedException;
每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,
会尝试获取锁,如果获取不到就会把当前线程加入等待队列,其实,除了
用于锁的等待队列,每个对象还有另一个等待队列,叫做条件队列,该队列
用于线程间的协作。调用wait就会把当前线程(调用该方法使用的线程)放到条件
队列上并阻塞,表示当前线程无法执行,它需要等待一个条件,这个条件不能由
它自己发起,需要其他线程发起。当其他线程改变条件后,应该调用notify方法。
//notify将会从条件队列中选取一个线程,将其从队列中移除并唤醒
public final native void notify();
//notifyAll与notify的区别是:它会移除条件队列中的所有线程,并全部唤醒
public final native void notifyAll();
简单的协作示例:
public class WaitThread extends Thread{
private volatile boolean fire = false;
@Override
public void run() {
try {
synchronized(this) {
while (!fire) {
wait();
}
}
System.out.println("fired!");
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized void fire() {
this.fire = true;
notify();
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}
两个线程都要访问变量fire,容易出现竞态条件,所有相关代码都要被
synchronized保护。实际上,wait/notify方法只能在synchronized代码
块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常。
看看另一种情况:
public class WaitThread extends Thread{
private volatile boolean fire = false;
private volatile int count = 0;
@Override
public void run() {
try {
synchronized(this) {
while (!fire) {
sleep(1000);
System.out.println(++count);
}
}
System.out.println("fired");
} catch (Exception e) {
e.printStackTrace();
}
}
//实例对象的锁不会被释放,该方法永远不会被执行
public synchronized void fire() {
System.out.println("fire it");
this.fire = true;
}
public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}
思考:如果wait必须被synchronized保护,那一个线程在wait时,另一个线程
怎么可能调用到同样被synchronized保护的notify方法(注意两个被保护的synchronized方法是一个实例的)?
它不需要等待锁么?我们需要进一步理解wait的内部过程才能解开谜团!!虽然是在synchronized内,但调用
wait时,线程会释放对象锁。wait的具体过程是:
1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。
2)等待时间到或者被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:
如果能够重新获得锁,线程状态变为RUNNABLE,并从wait调用中返回。
否则,该线程加入对象锁等待队列,线程变为BLOCKED,只有在获得锁后才会从wait调用中返回。
线程从wait调用中返回后,不代表其等待条件就一定成立了,它需要重新检查其等待条件,一般
调用模式是:
synchronized (obj) {
while(条件不成立)
obj.wait();
…//执行条件满足后的操作
}
比如上例中的代码:
synchronized (this) {
while(!fire) {
wait();
}
}
调用notify会把条件队列中的线程唤醒并从队列中移除,但它并不会
释放调用它的代码块获得的对象锁,也就是说,只有在包含notify的
synchronized代码块执行完毕后,等待线程才会从wait调用中返回。
三、生产者\消费者模式
public class MyQueue<E> {
private Queue<E> queue = null;
private int limit;
public MyQueue(int limit) {
this.limit = limit;
queue = new ArrayDeque<E>(limit);
}
//给生产者用的方法,往队列上放数据,满了就wait
public synchronized void put(E e) throws InterruptedException {
while (queue.size() == limit) wait();
queue.add(e);
//由于条件不同但又使用相同的的等待队列,所以
//要用notifyAll而不能调用notify
notifyAll();
}
//take给消费者用的方法空了就wait
public synchronized E take() throws InterruptedException {
while (queue.isEmpty()) wait();
E e = queue.poll();
notifyAll();
return e;
}
}
只能有一个条件等待队列,这是 wait/notify机制的缺陷,这使得对于条件的分析变得复杂。
public class Producer extends Thread{
MyQueue<String> queue;
public Producer(MyQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int num = 0;
try {
while (true) {
String task = String.valueOf(num);
queue.put(task);
System.out.println("produce: " + task);
num ++;
Thread.sleep(3000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer extends Thread{
MyQueue<String> queue;
public Consumer(MyQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String task = queue.take();
System.out.println("consumer: " + task);
Thread.sleep(3000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
三、同时开始
public class FireFlag {
private volatile boolean fired = false;
public synchronized void waitForFire() throws InterruptedException {
while (!fired) {
wait();
}
}
public synchronized void fire() {
this.fired = true;
notifyAll();
}
}
public class Racer extends Thread{
private FireFlag flag;
public Racer(FireFlag fireFlag) {
this.flag = fireFlag;
}
@Override
public void run() {
try {
flag.waitForFire();
System.out.println("start run " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws InterruptedException {
int num = 10;
FireFlag flag = new FireFlag();
Thread[] threads = new Thread[num];
for (int i = 0; i < num; i++) {
threads[i] = new Racer(flag);
threads[i].start();
}
Thread.sleep(1000);
flag.fire();
}
}
四、等待结束
join方法实际上就是调用了wait方法:
while (isAlive()) {
wait(0);
}
只要线程是活的,isAlive()返回true,join就一直等待。谁来通知它呢?
当线程运行结束时,Java系统调用notifyAll来通知。使用join比较麻烦,需要
主线程逐一等待每个子线程。这里我们展示一种新写法:
public class MyLatch {
//未完成的线程个数,初始值为子线程总个数
private int threadsCount;
public MyLatch(int threadsCount) {
this.threadsCount = threadsCount;
}
public synchronized void await() throws InterruptedException {
while (threadsCount > 0) wait();
}
public synchronized void countDown() {
threadsCount --;
if (threadsCount <= 0) this.notifyAll();
}
}
public class Worker extends Thread{
MyLatch myLatch;
public Worker(MyLatch myLatch) {
this.myLatch = myLatch;
}
@Override
public void run() {
try {
Thread.sleep(5000);
myLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws InterruptedException {
int num = 10;
MyLatch myLatch = new MyLatch(num);
Thread[] threads = new Thread[num];
for (int i = 0; i < num; i++) {
threads[i] = new Worker(myLatch);
threads[i].start();
}
myLatch.await();
System.out.println("ready to end !");
}
}
五、异步结果
在Java中表示子任务的接口是Callable,声明为:
public interface Callable<V> {
V call() throws Exception;
}
为表示异步调用的结果,定义一个MyFuture接口:
public interface MyFuture <V> {
//get方法返回真正的结果,如果结果没有计算完成,
//get方法会阻塞直到计算完成
V get() throws Exception ;
}
为方便主线程调用子任务,定义一个MyExcecutor类,其中定义一个execute
方法表示执行子任务并返回调用结果。
//利用该方法,对于主线程,就不需要创建并管理子线程了,
//并且可以方便地获取异步调用的结果
public <V> MyFuture<V> execute(final Callable<V> task)
实例:
//表示子任务得接口
interface MyCallable<V> {
V call() throws Exception;
}
//表示异步调用的结果
interface MyFuture<V> {
//返回真正的结果
V get() throws Exception;
}
interface MyExecutor {
<V> MyFuture<V> execute(final MyCallable<V> task);
}
//执行任务的子线程
public class ExecuteThread<V> extends Thread{ private V result = null;
private Exception exception = null;
private volatile boolean done = false;
private MyCallable<V> task;
private Object lock; public ExecuteThread(MyCallable<V> task, Object lock) {
this.task = task;
this.lock = lock;
} @Override
public void run() {
try {
result = task.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
} public V getResult() {
return result;
}
public boolean isDone() {
return done;
} public Exception getException() {
return exception;
}
}
public class MyExecutorImpl implements MyExecutor{
@Override
public <V> MyFuture<V> execute(MyCallable<V> task) {
final Object lock = new Object();
final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
MyFuture<V> myFuture = new MyFuture<V>(){
@Override
public V get() throws Exception{
synchronized (lock) {
System.out.println("Try to get result");
while (!thread.isDone()) {
try {
//阻塞直到任务执行完毕
lock.wait();
} catch (Exception e) {}
}
if (thread.getException() != null) {
throw thread.getException();
}
}
System.out.println("Got it!!!");
return thread.getResult();
}
};
thread.start();
return myFuture;
}
}
public class MainClass {
public static void main(String[] args) {
MyExecutor executor = new MyExecutorImpl();
MyCallable<Integer> task = new MyCallable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Start do the call.");
int result = (int) (Math.random() * 1000);
Thread.sleep(5000);
System.out.println("Do the call done.");
return result;
}
};
//异步调用,返回一个MyFuture对象
MyFuture<Integer> future = executor.execute(task);
//执行其他操作
try {
System.out.println("Start do other things.");
Thread.sleep(3000);
System.out.println("Other things done. ");
System.out.println("Ready to get.");
//获取异步调用结果
Integer result = future.get();
System.out.println("The result is " + result);
} catch (Exception e) {
e.printStackTrace();
}
/*Start do other things.
Do the call done.
Other things done.
The result is 802*/
}
}
六、集合点
public class AssemblePoint {
//未到达集合点的线程数量,初始为所有线程数
private int threadsCount;
public AssemblePoint(int threadsCount) {
this.threadsCount = threadsCount;
}
public synchronized void await() throws InterruptedException {
System.out.println("Now the count is " + threadsCount);
if (threadsCount > 0) {
threadsCount --;
if (threadsCount == 0 ) {
notifyAll();
System.out.println("All the threads done.");
}
else {
while (threadsCount !=0) {
this.wait();
}
}
}
}
}
public class AssemblePointDemo {
static class Tourist extends Thread {
AssemblePoint point;
public Tourist(AssemblePoint point) {
this.point = point;
}
@Override
public void run() {
try {
//模拟各自独自运行
Thread.sleep((long) (Math.random() * 1000));
//该线程运行完毕,集合
point.await();
} catch (Exception e) {
e.printStackTrace();
}
}
} public static void main(String[] args) {
int num = 10;
Tourist[] threads = new Tourist[num];
AssemblePoint point = new AssemblePoint(num);
for (int i = 0; i < num; i++) {
threads[i] = new Tourist(point);
threads[i].start();
}
}
}
三、线程的中断
一)取消/关闭机制
在Java中停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,
是给线程传递一个取消信号,但是由线程线程决定如何以及何时退出。每个线程都有一个标志位,表示该线程是否被中断了。
//返回线程的中断标志位
public boolean isInterrupted()
//中断对应的线程
public void interrupt()
//该方法是静态方法,实际会调用Thread.currentThread()操作当前线程,返回标志位,并清空
public static boolean interrupted()
二)线程中断的反应
interrupt()对线程的影响与线程的状态和正在进行的IO操作有关,
我们主要讨论线程状态:
1)RUNNABLE:线程正在运行或者具备运行的条件只是在等待操作系统调度;
2)WAITING/TIME_WAITING:线程在等待某个条件或者超时;
3)BLOCKED:线程在等待锁,试图进入同步块
4)NEW/TERMINATED:线程还未启动或者已经结束。
1.RUNNABLE
如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标致位,
没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,例如:
public class InterruptRunnableDemo extends Thread {
@Override
public void run() {
while(!Thread.currentThread().isInterrupted()) {
}
System.out.println("done ");
}
}
2.WAITING/TIME_WAITING
线程调用join/wait/sleep方法会进入WAITING/TIME_WAITING状态,
在这些状态时,对线程对象调用interrupt会使线程抛出InterruptedException。
抛出异常后,中断标志位会被清空,而不是设置。比如:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(Thread.currentThread().isInterrupted()); //false
}
});
thread.start();
try {
Thread.sleep(100);
} catch (Exception e) {}
thread.interrupt();
}
捕获InterruptedException,通常表示希望线程结束,线程大致有两种处理方式:
1)向上传递该异常,这使得该方法也变成一个可中断方法,需要调用者进行处理;
2)有些情况不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能
抛出受检异常,这时,应该捕获异常,进行合理的清理操作,清理后,一般应该调
用Thread的interrupt方法设置中断标志位,使得其他代码有方法知道它发生中断。
3.BLOCKED
如果一个线程在等待锁,对线程调用interrupt只是会设置中断标志位,线程
仍然会处于BLOCKED状态。interrupt并不能使一个正在等待锁的线程正真“中断”。
public class InterruptSynchronizedDemo {
private static Object lock = new Object();
private static class MyThread extends Thread {
@Override
public void run() {
synchronized (lock) {
System.out.println("Coming in "); //never
while (!Thread.currentThread().isInterrupted()) {
}
}
System.out.println("exit");//never
}
}
public static void test() throws InterruptedException {
synchronized (lock) {
MyThread thread = new MyThread();
thread.start();
Thread.sleep(1000);
thread.interrupt();
thread.join();
}
}
public static void main(String[] args) {
//test方法在持有锁的情况下启动线程thread,而线程thread
//也尝试获得锁,所以会进入等待队列。
try {
test();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。
4.NEW/TERMINATE
调用interrupt无任何效果。
三)如何正确地取消/关闭线程
interrupt方法不一定会真正中断线程,它只是一种协作机制。
对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供
单独的取消/关闭方法给调用者。