一、创建线程的三种方式:
创建线程的方式有三种,一是创建Thread实例,二是实现Runnable接口,三是实现Callable接口,Runnable接口和Callable接口的区别是一个无返回值,一个有返回值;不管是Runnable还是Callable接口,都需要借助Thread去运行;
(一)继承Thread类:
- 定义Thread类的子类,并重写该类的run方法,该run方法的方法体就代表了线程要完成的任务。因此把run()方法称为执行体。
- 创建Thread子类的实例,即创建了线程对象。
- 调用线程对象的start()方法来启动该线程。
(二)实现 Runnable接口:
- 定义runnable接口的实现类,并重写该接口的run()方法,该run()方法的方法体同样是该线程的线程执行体。
- 创建 Runnable实现类的实例,并依此实例作为Thread的target来创建Thread对象,该Thread对象才是真正的线程对象。
- 调用线程对象的start()方法来启动该线程。
(三)实现Callable接口:
- 创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,并且有返回值。
- 创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。
- 使用FutureTask对象作为Thread对象的target创建并启动新线程。
- 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。
使用Demo如下:
public class CallableThreadTest implements Callable<Integer> { public static void main(String[] args) { CallableThreadTest ctt = new CallableThreadTest(); FutureTask<Integer> ft = new FutureTask<>(ctt); new Thread(ft,"有返回值的线程").start(); System.out.println("子线程的返回值:"+ft.get()); } @Override public Integer call() throws Exception { ....... return i; } }
二、FutureTask:
Future就是在启动Callable接口的线程执行后,可以继续去做其他事情,当流程进行到想要获取刚才的线程的执行结果时,可以直接从Future接口去获取,如果还未执行完成,则会阻塞等待执行完成并返回结果,可以使用isdone判断是否执行完成;
FutureTask就是融合Future接口和Callable功能的类,执行时传入Callable任务,然后启动执行,然后在需要返回值的地方使用FutureTask.get()获取返回值;
Future接口提供的方法有:cancel, isCancelled, isDone, get;
FutureTask类的实现框架如图:
三、Thread类:
1. Priority:
线程的优先级,优先级高的线程能得到更多的cpu资源;
Java中线程优先级的取值范围是1~10,创建线程时初始默认的线程优先级是5;
Thread.getPriority获取指定线程的优先级,Thread.setPriority(value)设置指定线程的优先级;
2. Sleep:
自指定的毫秒数内,让当前正在执行的线程休眠(暂停执行);这是一个静态方法,休眠当前正在执行的线程,通常是执行这段代码的线程(但并不一定总是)
如:Thread.sleep(1000);
3. yield:
使当前线程从执行态变为可执行态,也就是就绪态吧。cpu会从众多的可执行态里选择,也就是说,当前也就是刚刚的那个线程还是有可能会被再次执行到的,并不是说一定会执行其他线程而该线程在下一次不会执行到了。
这也是一个静态方法,针对的是当前线程(Thread.CurrentThread)
4. join:
使所属的线程对象x正常执行run方法,而当前线程z无限期的阻塞,直到线程x执行完成并销毁后再执行;
Join具有使线程排队运行的作用;
Join的实现其实是调用了wait使得当前线程阻塞,等待指定的线程执行完成后会调用线程自身的notifyAll方法,使得当前线程从join处继续执行;
Join与Synchronized的区别是join在内部使用wait进行等待,Synchronized使用的是对象监视器原理作为同步;
Join与sleep的区别是:调用join后,当前线程的锁被释放,其他线程可以调用此线程中的同步方法了,而sleep方法却不释放锁;
5. interrupt/isInterrupted:
Interrupt用来停止线程,但并不是真正停止了线程,而是在指定线程中打了一个停止的标记,需要结合interrupted或者isInterrupted来判断线程是否停止并退出运行或者抛出异常;interrupted判断当前线程(是指运行this. Interrupted的线程)是否已经中断,线程的中断状态由该方法清除,也就是说如果联系调用两次,由于第一次调用时清除了标记,因此第二次永远返回false,isInterrupted测试线程是否已经中断,但不清除标记;
Interrupted是静态方法,从代码可以看出其作用于当前线程对象,与调用时指定的线程对象无关,且最终实现是调用的isInterrupted本地方式,由操作系统实现;
6. stop/suspend:
这两个方法都是建议废除的;
Stop可以强制结束线程,有可能使一些请理性的工作得不到完成;
Suspend是暂停线程的执行,暂停后可以通过resume恢复线程的执行;
7. resume:
这个方法也是建议废除的;
作用是恢复由suspend暂停的线程继续运行;
8. wait:
这个方法不属于Thread类的方法,而是Object类的方法;
Wait方法使当前线程进入睡眠状态,也即阻塞当前线程,调用wait方法后,锁被自动释放;
9. notify/notifyAll:
Notify的功能是唤醒在此对象监视器上等待的单个线程(随机唤醒等待队列里的一个线程),notifyAll的作用是唤醒在此对象监视器上等待的所有线程;这两个方法一般是和wait方法配合起来使用(执行完wait后自动释放锁,而执行完notify/notifyAll后却不自动释放锁);
四、同步:
(一)Synchronized:
这是一个java内置的关键字;
关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。
可以看到,任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器。如果获取失败,线程进入同步队列,线程状态变为BLOCKED。当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。
(二)Volatile:
这是一个java内置的关键字;
Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。
关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。
五、锁:
(一)Lock:
Lock与synchronized对比:
- Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问。
- synchronized不需要手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
结构图:
ReadLock/WriteLock/ReentrantReadWriteLock为读写锁,实现了读写锁分离,读锁是共享的,写锁是独占的,当写锁占用着资源时,所有的读锁和写锁都不能再申请到共享资源,必须等写锁释放资源后才能申请到,而读锁占用着资源时,其他线程的读锁依然可以申请到共享资源,而写锁则不能;
ReentrantLock被称为可重入锁,字面意思就是支持重进入的锁,表示该锁能支持一个线程对资源的重复加锁;Synchronized关键字隐式的支持重进入;
Lock接口提供的方法有:
- Lock与unlock:
Lock用于获取锁,但它不会主动释放锁所以需要与unlock()配合使用。一般在使用Lock时必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。
- 获取锁等待时间tryLock(long time, TimeUnit unit):
通常我们是无法判断一个线程为什么会无法获得锁,但我们可以给该线程一个获取锁的时间限制,如果到时间还没有获取到锁,则放弃获取锁。同样tryLock也要配合unlock使用,但是在使用unLock之前可以使用isHeldByCurrentThread来判断下判断锁是否被当前线程获取了,如:
try { if (lock.tryLock(1, TimeUnit.SECONDS)) { ………. } } finally { if(lock.isHeldByCurrentThread()){ lock.unlock(); } }
(二)ReentrantLock:
重入锁,执行线程在获取锁之后仍能连续多次的获得该锁;像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。
重入锁在获取锁的算法上分为公平性和非公平性;在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之就是不公平的;实际上公平性的锁往往没有非公平性的锁的效率高,这主要是因为公平性的锁发生的切换的次数在实际场景中往往比非公平性的锁切换更多更频繁;
从上图看出,ReentrantLock的实现依赖于NonfairSync和FairSync两个同步器,NonfairSync是非公平性同步器,FairSync是公平性同步器,在创建ReentrantLock对象时传入不同的参数可以决定ReentrantLock在竞争锁资源时是采用公平性同步器算法还是非公平性同步器算法,这两个类继承了Sync类,而Sync类又继承了AQS类(AbstractQueuedSynchronizer),在Sync类中实现了公平性和非公平性同步器共同的接口函数;
(三)ReadWriteLock:
读写锁,在jdk中提供的实现类是ReentrantReadWriteLock;读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大的提升;
六、原子操作:
CAS的全程即Compare And Swap,翻译成中文为比较并交换;
CAS操作依赖于CPU指令CMPXCHG来实现比较并交换操作的原子性;
实现了圆子操作的类有AtomicInteger、AtomicBoolean、AtomicLong、AtomicReference、 AtomicLongArray、AtomicIntegrArray、AtomicStampedReference等等,其核心实现原理都是基于CAS操作完成的;
AtomicInteger封装了对int类型的原子操作,通过类的接口操作int数据都是原子性的,不会造成在多线程环境中脏数据的读写;实现int的原子操作是通过unsafe接口函数操作完成的(比如putOrderedInt、compareAndSwapInt等);
由于AtomicInteger采用了无限循环的方式来操作,因此称作自旋CAS方式;CAS方式存在三大问题:
- ABA问题:因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
- 从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
- 循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
- 只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
七、线程池:
(一)线程池对象的继承关系:
ThreadPoolExecutor作为线程池中最核心的类,实现了接口ExecutorService,可以通过new ThreadPoolExecutor的方法创建线程池对象;
(二)ThreadPoolExecutor构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); }
函数的参数含义如下:
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
线程池执行的过程:
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
(三)扩展线程池:
在默认的ThreadPoolExecutor实现中,提供了空的beforeExecutor和afterExecutor的实现,在实际应用中可以对其进行扩展来实现对线程池运行状态的追踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。
如下:
public class MyThreadPool extends ThreadPoolExecutor { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("...............beforeExecute"); } // 线程执行之后运行 @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("...............afterExecute"); } }
(四)四种线程池:
这四种线程池的创建都是基于核心类ThreadPoolExecutor来实现的:
1. newCachedThreadPool:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse 如果没有,就建一个新的线程加入池中
- 缓存型池子通常用于执行一些生存期很短的异步型任务 因此在一些面向连接的 daemon 型 SERVER 中用得不多。但对于生存期短的异步任务,它是 Executor 的首选。
- 能 reuse 的线程,必须是 timeout IDLE 内的池中线程,缺省 timeout 是 60s,超过这个 IDLE 时长,线程实例将被终止及移出池。
2. newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- newFixedThreadPool 与 cacheThreadPool 差不多,也是能 reuse 就用,但不能随时建新的线程。
- 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。
- 和 cacheThreadPool 不同,FixedThreadPool 没有 IDLE 机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的 TCP 或 UDP IDLE 机制之类的),所以 FixedThreadPool 多数针对一些很稳定很固定的正规并发线程,多用于服务器。
- 从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
- fixed 池线程数固定,并且是0秒IDLE(无IDLE)。
- cache 池线程数支持 0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60 秒 IDLE 。
3. newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 单例线程,任意时间池中只能有一个线程
- 用的是和 cache 池和 fixed 池相同的底层池,但线程数目是 1-1,0 秒 IDLE(无 IDLE)
4. newScheduledThreadPool:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
- 调度型线程池
- 这个池子里的线程可以按 schedule 依次 delay 执行,或周期执行