[置顶] 高并发程序设计入门

时间:2022-06-09 23:49:06

说在前面

本文绝大部分参考《JAVA高并发程序设计》,类似读书笔记和扩展。

走入并行世界

概念

同步(synchronous)与异步(asynchronous)

同步和异步通常来形容一次方法调用。同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续执行任务。
异步方法更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的工作。异步方法通常会在另外的线程中“真实”的执行。整个过程不会阻碍调用者的工作。

并发(concurrency)和并行(parallelism)

链接:并发Concurrent与并行Parallel的区别

临界区

临界区表示一种公共资源或者说是共享资源,可以被多个线程使用。但是每一次只能有一个线程使用它,一旦临界区资源被占用,其他线程要想得到这个资源就必须等待。
在并行程序中。临界区资源是保护对象。就比如大家公用的一台打印机,必然是一个人打完另一个人的才能打印,否则就会出乱子。

阻塞(blocking)与非阻塞(non-blocking)

阻塞和非阻塞通常来形容多线程间的相互影响。比如一个线程占用了临界区资源,那么其他所有需要这个资源的线程都需要在临界区中等待。等待会导致线程挂起,这种情况就是阻塞。此时如果占用这个资源的线程一直不愿释放资源,那么其他所有阻塞在这个临界区上的线程都不能工作。
反之就是非阻塞,它强调没有一个线程可以妨碍其他线程执行。所有线程都会尝试不断前向执行。

死锁(deadlock)、饥饿(starvation)和活锁(livelock)

这三种情况都属于线程活跃性问题。如果发现上述情况,那么相关线程可能就不再活跃,也就是说它可能很难再继续执行任务了。
1 死锁
应该是最糟糕的情况之一。它们彼此相互都占用着其他线程的资源,都不愿释放,那么这种状态将永远维持下去。
死锁是一个很严重的问题,应该避免和小心。就如4辆小汽车,互相都占用对方的车道,无法正常行驶。
[置顶]        高并发程序设计入门

2 饥饿
是指一个或多个线程因为种种原因一直无法得到所需要的资源,导致一直无法执行,比如它的线程优先级太低,高优先级的线程一直抢占它所需要的资源。另一种可能是某一个线程一直占用着关键资源不放,导致其他需要这个资源的线程一直无法得到这个资源,无法正常执行。与死锁相比,饥饿还是可能在一段时间内解决的,比如高优先级的线程执行完任务后,不在抢占资源,资源得到释放。

3 活锁
是非常有趣的情况,也是最难解决的情况。这就比如,大家在一个两人宽的桥上走路,双方都很有礼貌。都在第一时间礼让对方,一个往左一个往右,导致两人都无法正常通行。放到线程中,就体现为,两个线程都拿到资源后都主动释放给他人使用,那么就会出现资源不断的在两个线程中跳动,而没有一个线程可以拿到资源后正常执行,这个就是活锁。

并发级别

由于临界区的存在,多线程之间的并发必须受到控制。根据控制并发的策略,我们可以把并发的级别进行分类,大致上可以分为阻塞、无饥饿、无障碍,无锁和无等待几种。

阻塞(blocking)

一个线程是阻塞的,那么在其他线程释放资源之前,当前线程无法继续执行。当我们使用synchronized关键字,或者重入锁时,我们得到的就是阻塞的线程。
无论是synchronized还是重入锁,都会在视图执行后续代码前得到临界区的锁,如果得不到,线程就会被挂起等待,直到占有了所需要的资源为止。

无饥饿

如果线程间是有优先级的,那么线程调用总是会倾向于满足高优先级的线程。也就是说对同一个资源的分配是不公平的。对于非公平的锁来说,系统允许高优先级的线程插队,这样有可能导致低优先级的线程产生饥饿。但如果锁是公平的,满足先来后到,那么饥饿就不会产生,不管新来的线程优先级多高,要想获得资源就必须排队。那么所有的线程都有机会执行。
[置顶]        高并发程序设计入门

无障碍(obstruction-Free)

无障碍是一种最弱的非阻塞调度。两个线程如果是无障碍的执行,那么他们不会因为临界区的问题导致一方被挂起。大家都可以大摇大摆进入临界区工作。那么如果大家都修改了共享数据怎么办呢?对于无障碍的线程来说,一旦出现这种情况,当前线程就会立即对修改的数据进行回滚,确保数据安全。但如果没有数据竞争发生,那么线程就可以顺利完成自己的工作,走出临界区。
如果阻塞控制的方式比喻成悲观策略。也就是说系统认为两个线程之间很有可能发生不幸的冲突,因此,保护共享数据为第一优先级。相对来说,非阻塞的调度就是一种乐观策略,他认为多线程之间很有可能不会发生冲突,或者说这种概率不大,但是一旦检测到冲突,就应该回滚。
从这个策略来看,无障碍的多线程程序不一定能顺利执行。因为当临界区的字眼存在严重的冲突时,所有线程可能都进行回滚操作,导致没有一个线程可以走出临界区。所以我们希望在这一堆线程中,至少可以有一个线程可以在有限时间内完成自己的操作,至少这可以保证系统不会再临界区进行无线等待。
一种可行的无障碍实现可以依赖一个“一致性标记”来实现。线程在操作之前,先读取并保持这个标记,在操作完后,再次读取,检查这个标记是否被修改过,如果前后一致,则说明资源访问没有冲突。如果不一致,则说明资源可能在操作过程中与其他写线程冲突,需要重试操作。任何对保护资源修改之前,都必须更新这个一致性标记,表示数据不安全。

无锁(lock-free)

无锁的并行都是无障碍的。在无锁的情况下,所有的线程都能尝试对临界区的资源进行访问,但不同的是,无锁的并发保证必然有一个线程能够在有限步内完成操作离开临界区。
在无锁的调度中,一个典型的特点是可能会包含一个无穷循环。在这个循环中线性不断尝试修改共享数据。如果没有冲突,修改成功,那么线程退出,否则尝试重新修改。但无论如何,无锁的并行总能保证有一个线程可以胜出,不至于全军覆没。至于临界区中竞争失败的线程,则不断重试。如果运气不好,总是不成功,则会出现类似饥饿的现象,线程会停止不前。

无等待(wait-free)

无锁是要求至少有一个线程在有限步内完成操作,而无等待则是在无锁的基础之上进一步扩展。他要求所有线程都必须在有限步内完成操作。这样就不会引起饥饿问题。如果限制这个步骤上限,还可以分为有界无等待和线程无关的无等待几种,它们之间的区别只是对循环次数的限制不同。
一种典型的无等待结构是RCU(read-copy-update)。它的基本思想是,对数据的读可以不加控制,因此所有读线程都是无等待的,它们既不会被锁定等待也不会引起任何冲突。但在写数据时,先取得原始数据的副本,接着只修改副本数据,修改完后,在合适的时机回写数据。

有关并行的两个重要定律

Amdahl定律

加速比定义:加速比= 优化前系统耗时/优化后系统耗时
根据Amdahl定律,使用多核CPU对系统进行优化,优化的效果取决于CPU的数量以及系统中串行程序的比重。CPU数量越多,串行化比重越低,则优化效果越好。仅提高CPU核数不降低系统串行程序比重,也无法提高系统性能。

Gustafson定律

根据Gustafson定律,我们更容易发现,如果串行化比例很小,并行化比例很大,那么加速比就是处理器的个数。只要不断增加CPU核数,就可以提高系统性能。

JAVA内存模型(JMM)

由于并发程序要比串行程序复杂的多,其中一个重要的原因是并发程序下数据访问的一致性和安全性将受到严重的挑战。因此我们需要在深入了解并行机制之前,再定义一种规则,保证多线程程序可以有效的,正确的协同工作。而JMM也就为此而生。JMM的关键技术点都是围绕多线程的原子性、可见性和有序性来建立的。

原子性(atomicity)

指一个操作是不可中断的。即使多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
比如对一个静态变量int i赋值,A线程赋值1,B线程赋值-1,那么这个变量i的结果可能是1或者-1,没有其他情况。这就是原子性。
但如果是给一个long型赋值的话就没那么幸运了。在32位系统下,long型数据的读写不是原子性的(因为long有64位)。
32位的java虚拟机上运行如下例子,就会出现非原子性的问题了。


public class E1 {

public static long t=0;

public static class ChangT implements Runnable{

private long to;

public ChangT(long to) {
this.to = to;
}
@Override
public void run() {
while (true){
E1.t = to;
Thread.yield();
}
}
}

public static class ReadT implements Runnable{
@Override
public void run() {

while (true){
long tmp = E1.t;
if (tmp != 111L && tmp != -999L && tmp != 333L && tmp != -444L)
System.out.println(tmp);
Thread.yield();
}
}
}

public static void main(String[] a){

new Thread(new ChangT(111L)).start();
new Thread(new ChangT(-999L)).start();
new Thread(new ChangT(333L)).start();
new Thread(new ChangT(-444L)).start();
new Thread(new ReadT()).start();

}
}

理想的结果可能是什么都不输出,但是,一旦运行,就会有大量的输出一下信息

...
-4294966963
4294966852
-4294966963
...

我们可以看到读取线程居然读取到不可能存在的数据。因为32为系统中的long型数据的读和写不是原子性的,多线程之间互相干扰了。
如果我们给出结果中几个数值的2进制,大家就会更清晰的认识了。


-999 = 1111111111111111111111111111111111111111111111111111110000011001
-444 = 1111111111111111111111111111111111111111111111111111111001000100

111 = 0000000000000000000000000000000000000000000000000000000001101111
333 = 0000000000000000000000000000000000000000000000000000000101001101

4294966852 = 0000000000000000000000000000000011111111111111111111111001000100
-4294967185 = 1111111111111111111111111111111100000000000000000000000001101111

上面这几个数值的补码形式,也是在计算机内真实存储的内容。不难发现4294966852其实是111或333的前32为夹杂着-444的后32位的数据。而-4294967185其实是-999或-444夹杂111后32位的数据。换句话说,由于并行的关系数字被写乱了。或者读的时候读串位了。
通过这个例子,大家应该对原子性应该有基本的认识。

可见性(visibility)

可见性是指当一个线程修改了一个共享变量。其他线程是否可以立即知道这个修改。对于串行程序来说这个问题是不存在的。但这个问题在并行程序中就很有可能出现。如果一个线程修改了某一个全局变量。其他线程未必可以马上知道这个修改。如果CPU1和CPU2上各运行了一个线程,它们共享变量t。由于编译器优化或者硬件优化缘故。在CPU1上的线程将变量t进行了优化,将其缓存在cache中或者寄存器里。这种情况下如果CPU2上的某个线程修改了t的实际值,那么CPU1上的线程可能就无法意识到这个改动,依旧会读取cache或者寄存器中的旧值。因此就产生了可见性的问题。可见性问题在并行程序中也是需要重点关注的问题之一。
[置顶]        高并发程序设计入门
可见性问题是一个综合性问题,处理上述提到的缓存优化和硬件优化会导致可见性问题外,指令重排以及编译器的优化,都有可能导致这个问题。
附两个例子便于理解可见性问题。
[置顶]        高并发程序设计入门

有序性(ordering)

有序性是三个问题中最难理解的,对于一个线程的执行代码而言,我们总是习惯性的认为代码的执行是从先往后的,依次执行的。这么理解也不能完全说是错误的。在一个线程的情况下确实是从先往后。但是在并发时,程序的执行就可能出现乱序,写在前面的代码可能会后执行。
有序性的问题的原因是因为程序在执行的时候,可能发生指令重排,重排后的指令和原指令的顺序未必一致。
指令重排有一个基本的前提是,保证串行语义的一致性。指令重排不会使串行的语义逻辑发生问题。因此在串行代码中不必担心这个问题。而在多线程间就无法保证了。
so,问题来了。为什么会指令重排呢?
这完全是基于性能考虑。
我们知道一条指令的执行是可以分很多步骤的。简单的说可以分如下几步:
- 取指 IF
- 译码和取寄存器操作数 ID
- 执行或者有效地址计算 EX
- 存储器访问 MEM
- 回写 WB
我们的汇编指令也不是一步就执行完了。在CPU的实际工作中,还是要分几步去执行的。当然,每个步骤涉及的硬件也可能不同。比如,取指会用到PC寄存器和存储器,译码会用到指令寄存器组,执行会使用ALU(算术逻辑单元(arithmetic and logic unit) 是能实现多组算术运算和逻辑运算的组合逻辑电路,简称ALU。主要功能是二进制算数运算),写回时需要寄存器组。
由于一个步骤可能使用不同的硬件完成,因此,就发明了流水线技术来执行指令。
- 指令1 IF ID EX MEM WB
- 指令2 IF ID EX MEM WB
可以看到,到两条指令执行时,第一条指令其实还未执行完,这样的好处是,假设每一步需要1毫秒,那么第2条指令需要等待5毫秒才能执行。而通过流水线技术,指令2就只需等待1毫秒。这样有了流水线就可以让CPU高效的执行。但是,流水线总是害怕被中断。流水线满载的时候性能确实相当不错,但是一旦中断,所有硬件设备都会进入停顿期,再次满载又需要几个周期,因此性能损失会比较大,所以我们就需要想办法来不让流水线中断。
之所以需要指令重排就是避免流水线中断,或尽量少的中断流水线。当然指令重排只是减少中断的一种技术,实际上CPU设计中,我们还有更多的软硬件技术来防止中断。具体大家就自己探究吧。
通过例子我们加深下理解。
示例 1 :
A = B + C执行过程。
左边是汇编指令,LW表示load,其中LW R1,B表示把B的值加载到R1寄存器中。ADD就是加法,把R1,R2的值想加放到R3中。SW表示store,就是将R3寄存器的值保存到变量A中。

//A = B + C 执行过程

LW R1,B IF ID EX MEM WB
LW R2,C IF ID EX MEM WB
ADD R3,R1,R2 IF ID X EX MEM WB
SW A,R3 IF X ID EX MEM WB

左边是指令由上到下执行,右边是流水线情况。在ADD上的大叉表示一个中断。因为R2中的数据还没准备好,所以ADD操作必须进行一次等待。由于ADD的延迟,后面的指令都要慢一拍。
示例 2 :

a = b + c ;
d = e - f ;

执行过程如下
[置顶]        高并发程序设计入门
其实就是将中断的时间去做别的事情,如load数据。这样时间就可以规划衔接好。有点儿像项目管理中优化关键路径。由此可见,指令重排对于提高CPU处理性能是十分必要的,虽然确实带来了乱序的问题,但这点儿牺牲完全值得的。

JMM 参考资料
深入理解JVM—JVM内存模型
Java内存模型
深入理解Java内存模型之系列篇
程晓明-深入理解Java内存模型

哪些指令不能重排:

虽然java虚拟机和执行系统会对指令进行一定的重排,但是指令重排是有原则的。
- 原则基本包括以下:
1 程序顺序原则:一个线程内保证语义的串行性

Eg:
a=1;
b=a+1;
第二条语句依赖于第一条执行结果。所以不允许指令重排。

2 volatile规则:volatile变量的写,先发生与读,这保证了volatile变量的可见性。
3 锁规则:解锁(unlock)必然发生在随后的加锁(lock)前

Eg:
锁规则强调,unlock操作必然发生在后续的对同一个锁的lock之前,也就是说,
如果对一个锁解锁后,在加锁,那么加锁的动作绝对不能重排到解锁动作之前。
很显然,如果这么做,加锁行为是无法获得这把锁的。

4 传递性:A先于B,B先于C,那么A必然先于C
5 线程的start()方法先于它的每一个动作
6 线程的所有操作先于线程的终结(Thread.join())
7 线程的中断(interrupt())先于被中断线程的代码
8 对象的构造函数执行、结束先于finalize()方法

基础

线程生命周期

[置顶]        高并发程序设计入门
线程所有的状态都在Thread.State枚举类中定义

public enum State {
/**
* 表示刚刚创建的线程,这种线程还没开始执行。
**/

NEW,
/**
* 调用start()方法后,线程开始执行,处于RUNNABLE状态,
* 表示线程所需要的一切资源以及准备好。
**/

RUNNABLE,
/**
* 当线程遇到synchronized同步块,就进入了BLOCKED阻塞状态。
* 这时线程会暂停执行,直到获得请求的锁。
**/

BLOCKED,
/**
* WAITING和TIMED_WAITING都表示等待状态,他们是区别是WAITING表示进入一个无时间限制的等待
* TIMED_WAITING会进入一个有时间限制的等待。
* WAITING的状态正是在等待特殊的事件,如notify()方法。而通过join()方法等待的线程,则是等待目标线程的终止。
* 一旦等到期望的时间,线程就会继续执行,进入RUNNABLE状态。
* 当线程执行完后进入TERMINATED状态,表示线程执行结束。
**/

WAITING,
TIMED_WAITING,
TERMINATED;
}

线程的基本操作

启动初始化及基本方法

参考 多线程基础

终止线程

一个线程执行完后会结束,无须手动关闭,但是如一些系统性服务的线程基本都是一个大的循环,一般情况不会终止。
如何才能正常关闭线程呢?JDK提供了一个Thread.stop方法就可以立即关闭一个线程。但是这个方法太暴力,基本不会使用。并且stop()方法也是标记要废弃的方法。stop()强行的将执行中的线程关闭,可能会造成数据不一致问题。
看图说话:
[置顶]        高并发程序设计入门
举个栗子:

public class ThreadStopExample {

public static User u = new User();

public static void main(String[] a){
/**
* 开启读取线程
*/

new Thread(new readObj(),"读--线程").start();

while (true){
Thread t = new Thread(new changeObj(),"写--线程");
t.start();
try {
/**
* 主线程sleep 150毫秒,处理业务
*/

Thread.sleep(150);

} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 将写线程停止
*/

t.stop();
}
/**
* 执行结果:
* 观察这些值,name属性永远比id小,是因为它永远是上一次的值,就是因为stop(),无法完整的完成id和name赋值.
*
* 为什么会不一致呢?
* 因为 User 通过 changeObj()方法不断改变,当changeObj方法设置id后,需要处理其他花费100毫秒的业务.完成后设置name的值.
* 在这100毫秒中,调用changeObj()的主线程恰好执行了stop()方法,
* 虽然已经设置了User的id属性值,但User的name属性依然是上次循环的值.没来得及赋值就stop()了.
* 所以这就是为什么stop()会产生不一致问题.
*
* User{id=1455613327, name='1455613326'}
* User{id=1455613329, name='1455613328'}
* User{id=1455613331, name='1455613330'}
* User{id=1455613331, name='1455613330'}
* User{id=1455613331, name='1455613330'}
* .......
*/

}

/**
* 修改操作
*/

public static class changeObj implements Runnable{
@Override
public void run() {
while (true){

synchronized(u){
int v = (int) (System.currentTimeMillis()/1000);
u.setId(v);
try {
/**
* sleep 100毫秒,处理业务
*/

Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

u.setName(String.valueOf(v));
}
Thread.yield();
}
}
}

/**
* 读取操作
*/

public static class readObj implements Runnable{
@Override
public void run() {
while (true) {
synchronized (u) {
/**
* 当ID 不等于 name时,打印.
*
*/

if (u.getId() != Integer.parseInt(u.getName())){
System.out.println(u);
}
}
Thread.yield();
}
}
}

public static class User{

private int id ;

private String name ;

//getter setter

public User() {
this.id = 0;
this.name = "0";
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}


}

如何正确的stop,如何不写坏对象,请看修改后的代码如下,我们采用自己的方式去达到线程stop,当然还有其他更好的方案。

public static class changeObj implements Runnable{
//定义一个stop标识来实现我们自己的关闭方法
volatile static boolean stopMe = false;
@Override
public void run() {
while (true){
//增加if块
if (stopMe){
System.out.println("exit by stopMe...");
break;
}
synchronized(u){
...
}
...
}
}
}

public static void main(String[] a){

while (true){
...
//t.stop();
changeObj.stopMe = true;

}
}

线程中断

线程中断是重要的线程协作机制,中断就是让线程停止执行,但这个停止执行非stop()的暴力方式。JDK提供了更安全的支持,就是线程中断。
线程中断并不会使线程立即停止,而是给线程发送一个通知,告诉目标线程有人希望你退出。至于目标线程接到通知后什么时候停止,完全由目标线程自行决定。这点很重要,如果线程接到通知后立即退出,我们就又会遇到类似stop()方法的老问题。
与线程有关的三个方法,
1、中断线程
public void Thread.interrupt()
说明:Thread.interrupt() 是一个实例方法,他通知目标线程中断,也就是设置中断标志位。中断标志位表示当前线程已经被中断了。
2、判断是否被中断
public boolean Thread.isInterrupted()
说明:Thread.isInterrupted() 也是实例方法,他判断当前线程是否被中断(通过检查中断标志位)
3、判断是否被中断,并清除当前中断状态
public static boolean Thread.interrupted()
说明:Thread.interrupted() 是静态方法,判断当前线程的中断状态,但同时会清除当前线程的中断标志位状态。
实例1
看起来和stopMe的手法一样,但是中断功能更为强劲,比如遇到sleep()或wait()这样的操作时,就只能用中断标识了。

public class InterruptExample {

public static void main(String [] a){

Thread t1 = new Thread("线程小哥 - 1 "){
@Override
public void run() {
while (true){
/**
* 必须得判断是否接受到中断通知,如果不写退出方法,也无法将当前线程退出.
*/

if (Thread.currentThread().isInterrupted()){
System.out.println(Thread.currentThread().getName() + " Interrupted ... ");
break;
}
Thread.yield();
}
}
};

t1.start();
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 给目标线程发送中断通知
* 目标线程中必须有处理中断通知的代码
* 否则,就算发送了通知,目标线程也无法停止.
*/

t1.interrupt();
}
}

实例2

public class InterruptExample {

public static void main(String [] a){

Thread t1 = new Thread("线程小哥 - 1 "){
@Override
public void run() {
while (true){
/**
* 必须得判断是否接受到中断通知,如果不写退出方法,也无法将当前线程退出.
*/

if (Thread.currentThread().isInterrupted()){
System.out.println(Thread.currentThread().getName() + " Interrupted ... ");
break;
}

try {
/**
* 处理业务逻辑花费10秒.
* 而在这时,主线程发送了中断通知,当线程在sleep的时候如果收到中断
* 则会抛出InterruptedException,如果在异常中不处理,则线程不会中断.
*
*/

Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("我错了....");
/**
* 在sleep过程中,收到中断通知,抛出异常.可以直接退出线程.
* 但如果还需要处理其他业务,则需要重新中断自己.设置中断标记位.
* 这样在下次循环的时候 线程发现中断通知,才能正确的退出.
*/

Thread.currentThread().interrupt();
}

Thread.yield();
}
}
};

t1.start();
try {
/**
* 处理业务500毫秒
* 然后发送中断通知,此时t1线程还在sleep中.
*/

Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 给目标线程发送中断通知
* 目标线程中必须有处理中断通知的代码
* 否则,就算发送了通知,目标线程也无法停止.
*/

t1.interrupt();
}
}

等待(wait)和通知(notify)

为了支持多线程之间的协作,JDK提供了两个非常重要的等待方法wait()和nofity()方法。这两个方法并不是Thread类中的,而是Object类,这意味着任何对象都可以调用这两个方法。
比如线程A调用了obj.wait()方法,那么线程A就会停止执行而转为等待状态,进入obj对象的等待队列。这个等待队列可能有多个线程,因为系统运行多个线程同时等待同一个对象。其他线程调用obj.notify()方法时,它就会从等待队列中随机选择一个线程并将其唤醒。注意着个选择是不公平的,是随机的。
obj.wait()方法并不是可以随便调用。他必须包含在对应的synchronized语句中。无论是wait还是notify都必须首先获得目标对象的一个监视器。而正确执行wait方法后,会释放这个监视器,这样其他等待obj上的线程才能获得这个监视器,不至于全部无法执行。
在调用obj.notify()前,同样也必须获得obj的监视器,索性wait方法已经释放了监视器。唤醒某个线程后(假设唤醒了A),A线程要做的第一件事并不是执行后续的代码,而是要尝试重新获得obj监视器。而这个监视器也正是A执行wait方法前所只有的那个obj监视器。如果暂时无法获得。A还必须要等待这个监视器。当A获得监视器后,才能真正意义上的继续执行。
[置顶]        高并发程序设计入门
注意:wait方法和sleep方法都可以让线程等待若干时间,处理wait方法可以唤醒之外,另外一个主要区别是wait方法会释放目标对象的锁,而sleep方法不会释放。
例子:

public class WaitNotifyExample {

public static void main (String [] a){
Thread a1 = new A();
Thread b1 = new B();
a1.start();
b1.start();

/**
* 执行结果:
* A start ...
* A wait for obj ...
* B start ... notify one Thread...
* B end
* 这里间隔2秒
* A end
* */

}

final static Object obj = new Object();

public static class A extends Thread{
@Override
public void run() {
synchronized (obj){
System.out.println("A start ... ");

try {
System.out.println("A wait for obj ... ");
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A end");
}
}
}

public static class B extends Thread{
@Override
public void run() {
synchronized (obj){
System.out.println("B start ... notify one Thread...");

obj.notify();

System.out.println("B end");

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}

挂起(suspend)和继续执行(resume)线程

这两个方法虽然已经不推荐使用了。但是这里再提一下,不推荐使用suspend挂起线程是因为suspend挂起线程后不释放锁资源,导致其他线程想要访问这个锁资源时都会被等待。无法正常运行。而suspend挂起的线程居然还是RUNNABLE状态,这也严重影响了我们队系统当前状态的判断。
[置顶]        高并发程序设计入门
示例

public class SuspendExample {

public static Object u = new Object();

static ChangeObj c1 = new ChangeObj("T1");
static ChangeObj c2 = new ChangeObj("T2");

public static class ChangeObj extends Thread{

public ChangeObj(String name) {
super(name);
}

@Override
public void run() {
synchronized (u) {
System.out.println("Thread in : " + getName());
/* //注释1
try {
System.out.println("sleep 500ms : " + getName());
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}*/

//进入的线程挂起,且不释放资源
Thread.currentThread().suspend();
System.out.println("resume by : " + getName());
}
}
}

public static void main(String[] a) throws InterruptedException {
//启动c1线程
c1.start();
/**
* 主线程工作100毫秒,(非常关键)
* 这里的意思是
* 第一:为了演示,保证c1能抢占到资源,让主线程sleep后再启动c2
* 第二:保证c1能在执行resume的时候执行完成.这样才能保证c1本身可以有效释放资源.
* 假设c1中执行业务耗时500毫秒后 才执行suspend.(将注释[1]放开).而主线程仅仅sleep100毫秒后执行了c1.resume().
* 这样就导致c1无法释放锁,结果打印的是
* Thread in : T1
* sleep 500ms : T1
* 无法再继续走下去.
*/

Thread.sleep(100);

//启动c2线程,但在c1不释放资源的情况下,c2只能等待.
c2.start();

//c1 释放锁,此时c1应该已经执行了suspend挂起状态,resume继续执行
c1.resume();

/**
* 解决c2挂起无法继续的方法:
* 1 将主线程sleep1000毫秒,保证c1在1000毫秒内执行完成,
* 但是这不是最好的方法,因为c1有可能在1000毫秒内执行不完
* Thread.sleep(1000);
* 2 将c2.resume() 放到c1.join后面.
*/

//c2 继续执行,其实这里提前执行了resume.导致c2在挂起后无法resume.
//因为c1.join导致c2必须在c1执行完后才能执行.
c2.resume();
//c1 用join将主线程挂起,自己先执行完再执行主线程.也就是保证自己必须先执行完成
//System.out.println("c1 将要执行 join");
c1.join();
System.out.println(Thread.currentThread().getName() + " 结束工作...after c1");
//c2 执行完
c2.join();
System.out.println(Thread.currentThread().getName() + " 结束工作...after c2");

}
/**
* 错误的 结果是:
* Thread in : T1
* resume by : T1
* Thread in : T2
* main 结束工作...after c1
* 并且程序一直挂起,无法结束.
* 打印线程信息可以发现
* "T2@431" prio=5 tid=0xd nid=NA runnable
* java.lang.Thread.State: RUNNABLE
* at java.lang.Thread.suspend0(Thread.java:-1)
* at java.lang.Thread.suspend(Thread.java:1029)
* at com.iboray.javacore.Thread.T2.SuspendExample$ChangeObj.run(SuspendExample.java:31)
* - locked <0x1b3> (a java.lang.Object)
*
* "main@1" prio=5 tid=0x1 nid=NA waiting
* java.lang.Thread.State: WAITING
* at java.lang.Object.wait(Object.java:-1)
* at java.lang.Thread.join(Thread.java:1245)
* at java.lang.Thread.join(Thread.java:1319)
* at com.iboray.javacore.Thread.T2.SuspendExample.main(SuspendExample.java:75)
*
* 正确的 结果是:
* Thread in : T1
* resume by : T1
* Thread in : T2
* main 结束工作...after c1
* resume by : T2
* main 结束工作...after c2
*/

}

示例2
通过wait和notify方式实现suspend和resume效果。这种方式类似于我们自己实现stop那样

public class Suspend1Example {

public static Object u = new Object();

public static void main(String[] a) throws InterruptedException {
ChangeObj c = new ChangeObj();
ReadObj r = new ReadObj();
c.start();
r.start();
Thread.sleep(1000);

c.suspendMe();
System.out.println(" suspend ChangeObj 3s... ");
Thread.sleep(3000);
c.resumeMe();
/**
* 执行结果
* 刚开始ChangeObj与ReadObj交叉执行
in ChangeObj...
in ChangeObj...
in ReadObj...
in ChangeObj...
in ReadObj...
suspend ChangeObj 3s... 主线程执行c.suspendMe()
in ChangeObj... ...
in ChangeObj...suspend ChangeObj进入WAIT状态
in ReadObj... ReadObj独自执行
in ReadObj... ...
in ReadObj... ...
in ReadObj... ...
in ReadObj... ...
in ReadObj... ...
in ChangeObj...resume ChangeObj进入RUNNABLE状态
in ChangeObj... 随后ChangeObj与ReadObj又开始交叉执行
in ChangeObj...
in ReadObj...
in ChangeObj...
in ReadObj...
*/

}

public static class ChangeObj extends Thread{
//自定义挂起标识
volatile boolean suspend = false;

//设置挂起标识
public void suspendMe(){
this.suspend = true;
}

//模拟继续执行
public void resumeMe(){
//设置挂起标识为 false 不挂起.
this.suspend = false;
//拿到当前对象锁
synchronized (this){
//唤醒this对象等待队列中的某一个线程.这里单指当前这个
this.notify();
}
}

@Override
public void run() {
while (true){
//拿到当前对象的锁,为什么这里同步的锁一个是this一个是u呢?
//因为this锁的作用是当前类
synchronized (this){
//如果设置了挂起为true
if (suspend){
try {
//让当前对象加入this对象的等待队列.同时可以释放当前对象的锁.
System.out.println(" in ChangeObj...suspend ");
this.wait();
System.out.println(" in ChangeObj...resume ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//拿到指定实例对象的锁
synchronized (u){
//执行业务
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" in ChangeObj... ");
}
Thread.yield();
}
}
}

public static class ReadObj extends Thread{
@Override
public void run() {
while (true){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
//拿到指定实例对象的锁
synchronized (u){
//执行业务
System.out.println(" in ReadObj... ");
}
Thread.yield();
}

}
}
}

等待线程结束(join)和谦让(yield)

一个线程的输入可能依赖于另一或者多个线程的输出,此时,这个线程就需要等待依赖线程执行完毕,才能继续执行,JDK提供了join操作来实现这个功能。方法签名:

//无限等待,它会一直阻塞当前线程,知道目标线程执行完毕
public final void join() throws InterruptedException
//包含最大等待时机,如果超过给定时间目标线程还未执行完成,当前线程会跳出阻塞 继续执行
public final synchronized void join(long mills) throws InterruptedException

join的本质是让调用线程wait()在当前线程对象实例上。当执行完成后,被等待的线程会在退出前调用notifyAll()通知所有的等待线程继续执行。因此,需要注意,不要在应用程序中,在Thread上使用类似wait()或者notify()等方法,因为这很有可能影响系统API的工作,或者被系统API锁影响
yield是一个静态方法,一旦执行,它会使当前线程让出CPU,然后继续加入争抢CPU的线程中。

volatile与JMM

当我们使用volatile来修饰变量,就等于告诉虚拟机这个变量极有可能被某些程序或者线程修改。为了确保这个变量修改后,应用程序范围内的所有线程都能够看到。虚拟机就必须采用一些特殊的手段保证这个变量的可见性。这样就可以解决之前咱们在32位虚拟机上用多线程修改long 的值了。
volatile并不代表锁,他无法保证一些符合操作的原子性。他只能保证一个线程修改了数据后,其他线程能够看到这个改动,但当多个线程同时修改某一个数据时,却依然会产生冲突。他只能保证单个变量的完整性和可见性。保证原子性还的靠类似synchronized方式去解决。

线程组

如果线程数量很多,而且功能分配比较明确,就可以将相同的线程放置在一个线程组里面。

public class ThreadGroupName implements Runnable{

public static void main(String[] a){
ThreadGroup threadGroupName = new ThreadGroup("printGroup");
Thread t1 = new Thread(threadGroupName,new ThreadGroupName(),"T1");
Thread t2 = new Thread(threadGroupName,new ThreadGroupName(),"T2");
t1.start();
t2.start();
//由于线程是动态的,activeCount()是一个估计值
System.out.println("printGroup线程组 活动线程数 : " + threadGroupName.activeCount());
//list()可以打印这个线程组中所有线程的信息
threadGroupName.list();

//threadGroupName.stop(); 不推荐使用,和单个线程stop暴露的问题是一样的。

/**
* printGroup线程组 活动线程数 : 2
* This is printGroup : T1
* java.lang.ThreadGroup[name=printGroup,maxpri=10]
* Thread[T1,5,printGroup]
* Thread[T2,5,printGroup]
* */

}

@Override
public void run() {
String groupName =
Thread.currentThread().getThreadGroup().getName() + " : "
+ Thread.currentThread().getName();
while (true){
System.out.println(" This is " + groupName);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

Thread.yield();
}
}
}

守护线程(daemon)

守护线程是一个特殊的线程,他在后台完成系统性的服务,比如垃圾回收等等。用户线程可以理解为系统工作线程,他们会完成业务操作。当用户线程全部结束后,系统就无事可做了。守护线程守护的对象也不存在了。因此当一个程序中就只有守护线程时,java虚拟机就会自然退出。

public class DaemonExample extends Thread{

@Override
public void run() {
while (true){
System.out.println("I am a Daemon Thread .. ");
try {

Thread.sleep(500);
System.out.println("I am a Daemon Thread ..after sleep ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] a) throws InterruptedException {
Thread t = new DaemonExample();
t.setDaemon(true);
t.start();
//main线程相当于用户线程,当用户线程休眠2秒后,整个程序也随之结束
//如果t不设置为守护线程,t 会一直输出.
//结果是主线程休眠2秒后 守护线程也退出循环
Thread.sleep(2000);
}
}

线程优先级

优先级高的线程在竞争资源的时候回更加有优势,更可能抢占到资源,当然这只是一个概率问题,高优先级的线程可能也会抢占失败。他和线程优先级调度以及底层操作系统有密切关系。在各平台上表现不一。

线程安全与synchronized

非线程安全写入例子
[置顶]        高并发程序设计入门
就算是volatile修饰的变量,也无法保证正确写入,要从根本上解决这个问题,我们就必须保证多个线程之间是完全同步的。也就是Thread1在写入时,Thread2既不能读也不能写。这时我们就得通过synchronized关键字来解决了。它的工作是对同步代码加锁,使得每一次,只能有一个线程进入同步块。从而保证线程间的安全性,说白了就是让并行程序串行执行。

synchronized用法

1、指定加锁对象:对给定对象加锁,进入同步代码块要获得给定对象的锁。

public class SynchronizedExample implements Runnable{
static final Object o = new Object();
static int a = 0;
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
//将锁加到指定static修饰的对象上
synchronized (o){
a++;
}
}
}
public static void main(String[] ac ) throws InterruptedException {
Thread t1 = new Thread(new SynchronizedExample());
Thread t2 = new Thread(new SynchronizedExample());
Thread t3 = new Thread(new SynchronizedExample());
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
System.out.println("add result : " + a);
}
}

2、直接作用于实例方法:相当于对当前实例加锁,进入同步代码块要获得当前实例的锁。

public class SynchronizedExample implements Runnable{
static int a = 0;
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
//System.out.println(Thread.currentThread().getName() + " coming..");
synchronized (this){
a++;
}
}
}

public static void main(String[] ac ) throws InterruptedException {
/* 错误的Thread方法,因为这里synchronized是对实例上锁,而new创建了多个实例,所以锁无意义。
Thread t1 = new Thread(new SynchronizedExample());
Thread t2 = new Thread(new SynchronizedExample());
Thread t3 = new Thread(new SynchronizedExample());
*/

//正确的创建Thread方法如下。所有的Thread都是通过同样的实例创建。所以同步是有用的。
SynchronizedExample aca = new SynchronizedExample();
Thread t1 = new Thread(aca);
Thread t2 = new Thread(aca);
Thread t3 = new Thread(aca);

t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
System.out.println("add result : " + a);
}
}
/**
* 结果是 :
* add result : 30000
*/

3 、 直接作用于静态方法:相当于对当前类加锁,进入同步块要获得当前类的锁。
说明: 这个锁的影响范围更广,只要是调用这个类的方法,都必须拿到这个类的锁。
3.1 、 加到静态方法上

public class SynchronizedExample implements Runnable{
static int a = 0;
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
add();
}
}
public static synchronized void add(){
a++;
}

public static void main(String[] ac ) throws InterruptedException {
//将锁加到静态方法上,就相当于把锁加到类上,就可以将这个类所有的实例创建的线程进行同步。
Thread t1 = new Thread(new SynchronizedExample());
Thread t2 = new Thread(new SynchronizedExample());
Thread t3 = new Thread(new SynchronizedExample());
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
System.out.println("add result : " + a);
}

}

3.2 、 加到类上

public class SynchronizedExample implements Runnable{
static int a = 0;
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
//System.out.println(Thread.currentThread().getName() + " coming..");
synchronized (SynchronizedExample.class){
a++;
}
}
}
public static void main(String[] ac ) throws InterruptedException {

....
}

}

实例1

public class SynchronizedExample implements Runnable{

static SynchronizedExample obj = new SynchronizedExample();

static int a = 0;
/* //方法1
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
//作用在给定的对象上.因此每次当线程进入synchronized时,
//线程都会请求obj实例的锁.
synchronized (obj){
a ++ ;
}
}
}*/


//方法2
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
add();
}
}
public synchronized void add(){
a ++ ;
}

public static void main(String[] ac ) throws InterruptedException {
/**
* 注意Thread的创建方式,这里使用Runnable创建两个线程,并且这两个线程
* 都指向同一个Runnable接口实例(obj对象),这样才能保证两个线程在工作时
* 能够关注到同一个对象的锁上去.从而保证线程安全
*
* 而以下同步方法是错误的,因为创建的线程不是关注在同一个对象锁上.解决方法看示例2
* Thread t1 = new Thread(new SynchronizedExample());
* Thread t1 = new Thread(new SynchronizedExample());
*/

Thread t1 = new Thread(obj);
Thread t2 = new Thread(obj);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("add result : " + a);
}

}

示例2

public class SynchronizedExample implements Runnable{

static SynchronizedExample obj = new SynchronizedExample();

static int a = 0;
@Override
public void run() {
for (int i = 0;i < 10000 ; i ++){
add();
}
}
public static synchronized void add(){
a ++ ;
}

public static void main(String[] ac ) throws InterruptedException {
/**
* 将synchronized作用到静态方法,即使两个线程指向不同的Runnable对象,
* 但由于方法块需要请求的是当前类的锁,而非当前实例,因此线程间还是可以同步的.
*/

Thread t1 = new Thread(new SynchronizedExample());
Thread t2 = new Thread(new SynchronizedExample());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("add result : " + a);
}

}

除了线程同步,确保线程安全外,synchronized还可以保证线程见的可见性和有序性。

并发下的ArrayList

直接看示例

public class ArrayListExample implements Runnable{

static ArrayList<Integer> arrayList = new ArrayList<>();

@Override
public void run() {
for (int i=0;i<100000;i++){
arrayList.add(i);
}
}

public static void main(String[] a) throws InterruptedException {
Thread t1 = new Thread(new ArrayListExample());
Thread t2 = new Thread(new ArrayListExample());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("ArrayList size : " + arrayList.size());

/**
* 执行结果:
*
* 结果一:
* ArrayList size : 100004
* Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException: 49
* at java.util.ArrayList.add(ArrayList.java:459)
* at com.iboray.javacore.Thread.T2.ArrayListExample.run(ArrayListExample.java:15)
* at java.lang.Thread.run(Thread.java:745)
* 这是因为ArrayList在扩容过程中,内部一致性被破坏,但由于没有锁的保护,
* 另外一个线程访问到了不一致的内部状态,导致越界错误.

* 结果二:
* ArrayList size : 199368
* ArrayList大小小于200000,这是因为两个线程同时对ArrayList中的同一个位置进行赋值导致的.
* 而Vector却不会有这个问题
*/

}
}

错误的加锁

将锁加在int类型上。

public void run() {
for (int i = 0;i < 10000 ; i ++){
synchronized (i){
a ++ ;
}
}
}*/

似乎加锁的逻辑没问题,但是Integer在java中属于不变对象,也就是对象一旦创建就不可修改了。和String一样。所以这里的i每次都是一个新的integer对象,锁都加到了不同的对象上。
for循环的i每次实际上是使用了Integer.valueOf()方法创建一个新的integer对象,并将它赋值为变量i。也就是i = Integer.valueOf(i.intValue() + 1);

JDK并发包

同步控制

重入锁ReentrantLock

重入锁可以完全替代synchronized关键字,并且性能也好于synchronized。但从JDK6.0开始,synchronized的性能有所提升,两者在性能上差不多。
需要注意的是使用重入锁时,我们必须指定何时上锁,何时释放。正因为这也,重入锁对逻辑控制的灵活性要远远好于synchronized。在退出临界区时,务必释放锁。否则 后果你懂得。
之所以叫重入锁,是因为一个线程可以两次获得同一把锁,在释放的时候也必须释放相同次数的锁。
示例

public class ReentrantLockExample implements Runnable{

public static ReentrantLock lock = new ReentrantLock();

public static int i = 0;

@Override
public void run() {
for (int j=0;j<10000;j++){
//手动上锁,可以上N把,这里是为了演示
lock.lock();
lock.lock();
lock.lock();
try {
i ++;
} finally {
//无论如何必须释放锁,上几把 释放几把
lock.unlock();
lock.unlock();
lock.unlock();
}
}
}

public static void main(String[] a) throws InterruptedException {
ReentrantLockExample re = new ReentrantLockExample();
Thread t1 = new Thread(re);
Thread t2 = new Thread(re);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.print(i);
}
}

中断响应ReentrantLock.lockInterruptibly()

对synchronized来说,如果一个线程在等待锁,那么结果只有两种,要么它得到了这把锁,要么它保持等待。而重入锁则提供了另外一种可能,那就是线程可以在等待的过程中中断,我们可以根据需要取消对锁的请求。也就是说,如果一个线程正在等待锁,那么它依然可以收到一个通知,被告知无须等待,可以停止了。
lockInterruptibly()方法是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,中断响应。
示例

public class ReentrantLockExample1 implements Runnable{

public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();

int lock;

/**
* 控制加锁顺序,制造死锁
* @param lock
*/
public ReentrantLockExample1(int lock) {
this.lock = lock;
}

@Override
public void run() {
try {
/**
* 1号线程,先占用 1号锁,再申请 2号锁
* 2号线程,先占用 2号锁,再申请 1号锁
* 这样就很容易造成两个线程相互等待.
*/
if (lock == 1){
//加入优先响应中断的锁
lock1.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " 进入...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 这时候,1号线程 想要持有 2号锁 ,但是2号线程已经先占用了2号锁,所以1 号线程等待.
* 2号线程也一样,占用着2号锁 不释放,还想申请1号锁,而1号锁 被1号线程占用且不释放.
*/
lock2.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " 完成...");

}else {
lock2.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " 进入...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " 完成...");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 被中断,报异常...");
e.printStackTrace();
} finally {
if (lock1.isHeldByCurrentThread()) {
System.out.println(Thread.currentThread().getName() + " 释放...");
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
System.out.println(Thread.currentThread().getName() + " 释放...");
lock2.unlock();
}
System.out.println(Thread.currentThread().getName() + " 线程退出...");
}
}

public static void main(String[] a) throws InterruptedException {
ReentrantLockExample1 re1 = new ReentrantLockExample1(1);
ReentrantLockExample1 re2 = new ReentrantLockExample1(2);
Thread t1 = new Thread(re1," 1 号线程 ");
Thread t2 = new Thread(re2," 2 号线程 ");
t1.start();
t2.start();
//主线程sleep 2秒,让两个线程相互竞争资源.造成死锁
Thread.sleep(2000);
//中断2号线程
t2.interrupt();

/* 执行结果:

1 号线程 进入...
2 号线程 进入...
2 号线程 被中断,报异常... // 执行 t2.interrupt();
java.lang.InterruptedException
2 号线程 释放...
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
2 号线程 线程退出...
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
1 号线程 完成... // 只有1号线程能执行完成
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
1 号线程 释放...
at com.iboray.javacore.Thread.T3.ReentrantLockExample1.run(ReentrantLockExample1.java:55)
1 号线程 释放...
at java.lang.Thread.run(Thread.java:745)
1 号线程 线程退出...
*/


}
}

锁申请等待限时ReentrantLock.tryLock

除了等待外部通之外,避免死锁还有另外一种方法,就是限时等待,给定一个等待时间让线程自动放弃。
tryLock(时长,计时单位),若超过设定时长还没得到锁就返回false,若成功获得锁就返回true。
tryLock(),若没有参数,当前线程会尝试获得锁,如果申请锁成功,则返回true,否则立即返回false。这种模式不会引起线程等待,因此不会产生死锁。
示例

public class ReentrantLockExample2 implements Runnable{

public static ReentrantLock lock = new ReentrantLock();

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 申请资源...");
try {
//申请3秒,如果获取不到,返回false,退出.
if (lock.tryLock(3, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " 获得资源,开始执行...");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " 执行完成...");
}else {
System.out.println(Thread.currentThread().getName() + " 申请不到资源,先走了...");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 中断...");
e.printStackTrace();
}finally {
if (lock.isHeldByCurrentThread()) {
System.out.println(Thread.currentThread().getName() + " 释放锁...");
lock.unlock();
}
}

}

public static void main(String[] a) throws InterruptedException {
ReentrantLockExample2 re = new ReentrantLockExample2();
Thread t1 = new Thread(re," 1 号线程 ");
Thread t2 = new Thread(re," 2 号线程 ");
t1.start();
t2.start();

/*
执行结果:

1 号线程 申请资源...
2 号线程 申请资源...
1 号线程 获得资源,开始执行...
2 号线程 申请不到资源,先走了... //等待了3秒后,依然申请不到锁,就返回false
1 号线程 执行完成...
1 号线程 释放锁...
*/
}
}

公平锁ReentrantLock(true)

公平锁会按照实际的先后顺序,保证先到先得,它不会产生饥饿,只要排队,最终都可以等到资源。在创建重入锁时,通过有参构造函数,传入boolean类型的参数,true表示是公平锁。实现公平所必然要维护一个有序队列,所以公平锁的实现成本高,性能相对也非常低,默认情况下,锁是非公平的。
示例

public class ReentrantLockExample3 implements Runnable{

//创建公平锁
public static ReentrantLock lock = new ReentrantLock(true);

static int i = 0;

@Override
public void run() {

for (int j = 0;j<5;j++){
lock.lock();
try {
i++;
System.out.println(Thread.currentThread().getName() + " 获得锁 " + i);
} finally {
lock.unlock();
}
}

}

public static void main(String[] a) throws InterruptedException {
ReentrantLockExample3 re = new ReentrantLockExample3();
Thread t1 = new Thread(re," 1 号线程 ");
Thread t2 = new Thread(re," 2 号线程 ");
Thread t3 = new Thread(re," 3 号线程 ");
Thread t4 = new Thread(re," 4 号线程 ");
t1.start();
t2.start();
t3.start();
t4.start();

/*
执行结果:

1 号线程 获得锁 1
2 号线程 获得锁 2
3 号线程 获得锁 3
4 号线程 获得锁 4
1 号线程 获得锁 5
2 号线程 获得锁 6
3 号线程 获得锁 7
4 号线程 获得锁 8
.....
4 号线程 获得锁 16
1 号线程 获得锁 17
2 号线程 获得锁 18
3 号线程 获得锁 19
4 号线程 获得锁 20
*/


}
}

ReentrantLock的以上几个重要的方法

lock() 获取锁,如果锁被占用,则等待
lockInterruptibly() 获取锁,但优先响应中断
tryLock() 尝试获取锁,如果成功返回true,否则返回false。该方法不等待,立即返回。
tryLock(long time,TimeUnit unit) 在给定时间内获取锁。
unlock() 释放锁。

就重入锁实现来看,他主要集中在java 层面。在重入锁实现中,主要包含三个要素:
1 原子状态。原子状态使用CAS操作来存储当前锁的状态,判断锁是否已经被别的线程持有。
2 等待队列。所有没有请求成功的线程都进入等待队列进行等待。当有线程释放锁后,系统就从当前等待队列中唤醒一个线程继续工作。
3 阻塞原语park()和unpack(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。

Condition条件

Condition是与重入锁ReentrantLock相关联的,通过Lock接口的Condition newCondition()方法可以生产一个和当前重入锁绑定的Condition实例,利用Condition对象,我们就可以让线程在合适的时间等待,或者在特定的时刻得到通知继续执行。
Condition接口提供了如下基本方法
await() 会使当前线程等待,同时释放当前锁,当其他线程中使用singal()或者singalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和wait()类似。
awaitUninterruptibly()和await()方法类似,但它不会再等待过程中响应中断。
singal() 用于唤醒一个等待队列中的线程。singalAll()是唤醒所有等待线程。
示例

public class ConditionExample implements Runnable{
public static ReentrantLock rel = new ReentrantLock();
public static Condition condition = rel.newCondition();


@Override
public void run() {

try {
rel.lock();
System.out.println(Thread.currentThread().getName() + " 获取到锁...");
//等待
condition.await();
System.out.println(Thread.currentThread().getName() + " 执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放锁
rel.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁");
}


}

public static void main(String[] a) throws InterruptedException {
ConditionExample re = new ConditionExample();
Thread t1 = new Thread(re,"1 号线程 ");
t1.start();
//主线程sleep,1号线程会一直等待.直到获取到1号线程的锁资源,并将其唤醒.
Thread.sleep(2000);
//获得锁
rel.lock();
//唤醒前必须获得当前资源对象的锁
condition.signal();
//释放锁
rel.unlock();

}
}

信号量 Semaphore

Semaphore可以指定多个线程同时访问某一个资源,在构造Semaphore对象时,必须指定信号量的准入数,即同时能申请多少个许可,当每个线程每次只能申请一个许可时,就相当于有多少线程可以同时访问某个资源。
示例

public class SemaphoreExample implements Runnable {

//指定信号量,同时可以有3个线程访问资源
public static final Semaphore s = new Semaphore(5);

@Override
public void run() {

try {
//申请信号量,也可以直接使用 s.acquire();
if (s.tryAcquire(1500, TimeUnit.SECONDS)) {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 完成了任务..");
//释放信号量
s.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] a) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemaphoreExample re = new SemaphoreExample();
for (int i=0;i<20;i++){
exec.submit(re);
}
exec.shutdown();
}
}

ReadWriteLock 读写锁

读写分离锁可以有效的减少所竞争,以提升系统性能。但需要注意是的线程间 读读、读写、写写中后两者依然需要互斥。

非阻塞 阻塞
阻塞 阻塞

系统中,读的次数远远大于写的操作,读写锁就可以发挥最大的功效
示例

public class ReadWriteLockExample {

//创建普通重入锁
private static Lock lock = new ReentrantLock();

//创建读写分离锁
private static ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();

//创建读锁
private static Lock readLock = rwlock.readLock();

//创建写锁
private static Lock writeLock = rwlock.writeLock();

private int value;

public Object HandleRead(Lock lock) throws InterruptedException {
try {
//上锁
lock.lock();
//处理业务
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " Read...");
return value;
} finally {
//释放锁
lock.unlock();
}
}

public void HandleWrite(Lock lock,int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value = index;
System.out.println(Thread.currentThread().getName() + " Write...");
}finally {
lock.unlock();
}
}

public static void main(String[] a ) throws InterruptedException {
final ReadWriteLockExample rwle = new ReadWriteLockExample();

//创建读方法
Runnable readR = new Runnable() {
@Override
public void run() {
try {
//rwle.HandleRead(lock); //普通锁
rwle.HandleRead(readLock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

//创建写方法
Runnable writeR = new Runnable() {
@Override
public void run() {
try {
//rwle.HandleWrite(lock,new Random().nextInt()); //普通锁
rwle.HandleWrite(writeLock,new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};

//18次读
for (int i=0;i<18;i++){
Thread s = new Thread(readR);
s.start();
}
//2次写
for (int i=18;i<20;i++){
Thread s = new Thread(writeR);
s.start();
}

/**
* 结论:
*
* 用普通锁运行,大约执行20秒左右
*
* 用读写分离锁,大约执行3秒左右
*
*/


}

}

倒计时 CountDownLatch

CountDownLatch主要用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
[置顶]        高并发程序设计入门
示例

public class CountDownLatchExample implements Runnable{

static final CountDownLatch cdl = new CountDownLatch(10);
static final CountDownLatchExample cdle = new CountDownLatchExample();

@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println(Thread.currentThread().getName() + " 部件检查完毕...");
//一个线程完成工作,倒计时器减1
cdl.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] a) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i=0;i<10;i++){
exec.submit(cdle);
}
//等待所有线程完成,主线程才继续执行
cdl.await();
System.out.println(Thread.currentThread().getName() + " 所有检查完成,上跑道起飞...");
//关闭线程池
exec.shutdown();
}
}

循环栅栏 CyclicBarrier

CyclicBarrier是另一种多线程并发控制工具,和CountDownLatch类似,但是它可以在计数器完成一次计数后,执行某个动作。
[置顶]        高并发程序设计入门
示例

public class CyclicBarrierExample {

public static class Soldier implements Runnable{

private String name;
private CyclicBarrier cyclicBarrier;

public Soldier(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try {
System.out.println(name + " 来报道..");
//等待所有士兵到齐
cyclicBarrier.await();
doWork();
//等待所有士兵完成任务
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

void doWork(){
try {
Thread.sleep(new Random().nextInt(10) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " 任务已完成..");
}
}

public static class doOrder implements Runnable{

boolean flag;
int n;

public doOrder(boolean flag, int n) {
this.flag = flag;
this.n = n;
}

@Override
public void run() {
if (flag){
System.out.println("司令 : 士兵 " + n +"个 任务完成");
}else {
System.out.println("司令 : 士兵 " + n +"个 集合完毕");
//执行完后 改变完成标记.当下一次调用doOrder时,可以进入if
flag = true;
}
}
}

public static void main(String[] a){
final int n = 10;
//是否完成了任务
boolean flag = false;

//创建10个士兵线程
Thread[] allSoldier = new Thread[n];
//创建CyclicBarrier实例
//这里的意思是,等待10个线程都执行完,就执行doOrder()方法
CyclicBarrier c = new CyclicBarrier(n, new doOrder(flag,n));
for (int i=0;i<n;i++){
//System.out.println("士兵" + i + " 报道");
//装配士兵线程
allSoldier[i] = new Thread(new Soldier("士兵" + i,c));
/**
* 开启士兵线程,但是执行到第一个cyclicBarrier.await()栅栏时,
* 要等待,等到10个士兵线程都到这里等着,等到执行完doOrder()方法后,完成第一次计数.
*
* 这样才能继续执行下一个方法doWork(),而doWork()完成后,又需要第二次等待,
* 等待全部士兵线程都到等待队列后,再次调用doOrder()方法.完成第二次计数.
* 而这个方法中,每个线程的flag都已经改变,利用flag,完成任务.
*
*/

allSoldier[i].start();

/*
执行结果:

士兵0 来报道..
士兵1 来报道..
......
士兵8 来报道..
士兵9 来报道..
司令 : 士兵 10个 集合完毕
士兵2 任务已完成..
士兵8 任务已完成..
......
士兵9 任务已完成..
士兵4 任务已完成..
司令 : 士兵 10个 任务完成
*/


}
}

}

线程阻塞 LockSupport

LockSupport可以在线程任意位置让线程阻塞,它弥补了由resume()在前发生导致线程无法正常执行的问题。和wait()相比,它不需要获得锁也不会抛出InterruptedException异常。
LockSupport的静态方法park()可以阻塞当前线程。还有parkNanos()、parkUntil()等方法,它们实现了一个限时的等待。
unpack()继续执行。即使unapt()操作发生在park()之前它也可以使下一次的park()操作立即返回。
示例

public class LockSupportExample {
public static final Object o = new Object();
public static class ChangeObj extends Thread{

public ChangeObj(String name) {
super(name);
}
@Override
public void run() {
synchronized (o){
System.out.println(Thread.currentThread().getName() + " coming....");
LockSupport.park();
// suspend();
}
}
}

public static void main(String[] a) throws InterruptedException {
Thread t1 = new ChangeObj(" T1 ");
Thread t2 = new ChangeObj(" T2 ");
t1.start();
//这个意思是保证t1可以执行到挂起suspend();方法
Thread.sleep(1000);
/**
* t2开始后直接就停止,都没执行到挂起方法.随后t1.resume()方法可以正确执行,
* 但t2都没有挂起却执行了resume.导致t2永远是挂起.
* 还有一个麻烦的问题是t2的线程信息中显示居然还是RUNNABLE状态.后患无穷.
* 所以,我们采用LockSupport来替代suspend和resume
*/

t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
// t1.resume();
// t2.resume();
t1.join();
t2.join();
}

}

线程复用:线程池

多线程的软件设计方法确实可以最大限度地发挥现代处理器多核处理的计算能力,提高系统的吞吐量和性能,但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响。
大量的线程会抢占宝贵的内存资源,如果处理不当可能会导致Out ofMemory异常。大量的线程回收也会给GC带来很大的压力。
在实际生产环境中,线程的数量必须得到控制,盲目的大量创建线程对系统性能是有伤害的。
为了避免频繁的创建和销毁线程,可以让创建的线程进行复用。当完成工作时,并不着急关闭,而是将这个线程退回到线程池,方便其他人使用。
[置顶]        高并发程序设计入门

JDK对线程池的支持

ThreadPoolExecutor表示一个线程池。Executors类则扮演线程池工厂角色,通过Executors可以取得一个具有特定功能的线程池。从UML图中亦可知,ThreadPoolExecutor实现了Executor接口,因此通过这个接口,任何Runnable对象都可以被ThreadPoolExecutor线程池调度。
[置顶]        高并发程序设计入门
Executor框架提供了各种类型的线程池,主要有以下工厂方法。

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

以上方法返回了具有不同工作特性的线程池,具体说明如下
1. newFixedThreadPool,返回一个固定数量的线程池。当一个新任务提交时,如果有空闲线程,则执行。否则新任务暂存在一个任务队列中,待有空闲时,便处理在任务队列中的任务。
2. newSingleThreadExecutor,返回一个线程的线程池。当多余一个新任务提交时,会暂存在一个任务队列中,待有空闲时,按先入先出的顺序处理在任务队列中的任务。
3. newCachedThreadPool,返回一个可根据实际情况调整线程数量的线程池,线程数量不确定,若有空闲,则会有限复用线程。否则创建新线程处理任务。所有线程在当前任务执行完后,将返回线程池待复用。
4. newSingleThreadScheduledExecutor,返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService在Executor接口之上扩展了在给定时间执行某任务的功能。如果在某个固定的延时之后执行,或周期性执行某个任务。可以用这个工厂。
newScheduledThreadPool,返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。

固定线程池

示例:

public class ExecutorExample {
public static class MyTask implements Runnable{

@Override
public void run() {
System.out.println(Thread.currentThread().getId() + " to do sth...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] a ){
MyTask myTask = new MyTask();

//创建固定大小线程池
ExecutorService es = Executors.newFixedThreadPool(3);
for (int i =0;i<12;i++){
es.submit(myTask);
}
es.shutdown();

/*
结果:
隔1秒输出3个且ID都是10,11,12

11 to do sth...
12 to do sth...
10 to do sth...

10 to do sth...
12 to do sth...
11 to do sth...

......

10 to do sth...
11 to do sth...
12 to do sth...

*/


/*
//创建动态线程池
ExecutorService escache = Executors.newCachedThreadPool();
for (int i =0;i<12;i++){
escache.submit(myTask);
}
escache.shutdown();*/


}
}

计划任务

newScheduledThreadPool返回一个ScheduledExecutorService对象,可以根据实际对线程进行调度。

//在给定的实际,对任务进行一次调度
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//对任务进行周期性调度,任务调度的频率一定的,它是以上一个任务开始执行时间为起点,之后的period时间后调度下一次任务。
//如果任务的执行时间大于调度时间,那么任务就会在上一个任务结束后,立即被调用。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//对任务进行周期性调度,在上一个任务结束后,再经过delay长的时间进行任务调度。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

ScheduledExecutorService不会立即安排执行任务,他类似Linux中的crontab工具。如果任务遇到异常,则后续的所有子任务都会停止执行。因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。
示例

public class ScheduledExecutorExample {

public static void main(String[] a){

ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
long s = System.currentTimeMillis();
try {
System.out.println(Thread.currentThread().getId() + " start doSth...");

/**
* 在这种情况下,输出结果1
* 意思就是,执行时间大于调度时间后,线程会在上一个任务结束后,立即被调用。
*/

Thread.sleep(new Random().nextInt(10) * 1000);


/* 在这种情况下,输出结果2
* 每隔3秒进行一次调度.
*/

//Thread.sleep(1000);

long e = System.currentTimeMillis();

System.out.println(Thread.currentThread().getId() + " finish..." +( (e -s)/1000) +"s");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},1,3, TimeUnit.SECONDS);

/*
* 输出结果1
* 10 start doSth...
10 finish...8s
10 start doSth...
10 finish...4s
12 start doSth...
12 finish...8s
10 start doSth...
10 finish...4s
13 start doSth...
13 finish...3s
12 start doSth...
12 finish...7s
14 start doSth...
* */


/*
* 输出结果2
*
* 10 start doSth...
10 finish...1s
隔3秒
10 start doSth...
10 finish...1s
隔3秒
12 start doSth...
12 finish...1s
隔3秒
10 start doSth...
10 finish...1s
.....
* */



/*
//这个是执行完任务后,等待2秒才进行下一次调度
//如果这个任务执行1秒,那么第一次任务的开始到第二次任务开始的时间是3秒.
ses.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
long s = System.currentTimeMillis();
try {
System.out.println(Thread.currentThread().getId() + " start doSth...");
//Thread.sleep(new Random().nextInt(10) * 1000);
Thread.sleep(1000);
long e = System.currentTimeMillis();

System.out.println(Thread.currentThread().getId() + " finish..." +( (e -s)/1000) +"s");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);*/



}

}

核心线程池的内部实现

对于核心的几个线程池,无论是newFixedThreadPool()、newSingleThreadExecutor()还是newCacheThreadPool方法,虽然看起来创建的线程具有完全不同的功能特点,但其内部均使用了ThreadPoolExecutor实现。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

由以上线程池的实现可以看到,它们都只是ThreadPoolExecutor类的封装。我们看下ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor(
//指定了线程池中的线程数量
int corePoolSize,
//指定了线程池中的最大线程数量
int maximumPoolSize,
//当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁。
long keepAliveTime,
//keepAliveTime的单位
TimeUnit unit,
//任务队列,被提交但尚未被执行的任务。
BlockingQueue<Runnable> workQueue,
//线程工厂,用于创建线程,一般用默认的即可
ThreadFactory threadFactory,
//拒绝策略,当任务太多来不及处理,如何拒绝任务。
RejectedExecutionHandler handler)

workQueue
只提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用以下几种BlockingQueue。
1. 直接提交的队列:该功能由synchronousQueue对象提供,synchronousQueue对象是一个特殊的BlockingQueue。synchronousQueue没有容量,每一个插入操作都要等待一个响应的删除操作,反之每一个删除操作都要等待对应的插入操作。如果使用synchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建线程,如果线程数量已经达到了最大值,则执行拒绝策略,因此,使用synchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
2. 有界的任务队列:有界任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue构造函数必须带有一个容量参数,表示队列的最大容量。public ArrayBlockingQueue(int capacity)。当使用有界任务队列时,若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程。若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见有界队列仅当在任务队列装满后,才可能将线程数量提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。
3. *的任务队列:*队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,*队列的任务队列不存在任务入队失败的情况。若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程执行。但当系统的线程数量达到corePoolSize后就不再创建了,这里和有界任务队列是有明显区别的。若后续还有新任务加入,而又没有空闲线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,*队列会保持快速增长,知道耗尽系统内存。
4. 优先任务队列:带有优先级别的队列,它通过PriorityBlokingQueue实现,可以控制任务执行的优先顺序。它是一个特殊的*队列。无论是ArrayBlockingQueue还是LinkedBlockingQueue实现的队列,都是按照先进先出的算法处理任务,而PriorityBlokingQueue根据任务自身优先级顺序先后执行,在确保系统性能同时,也能很好的质量保证(总是确保高优先级的任务优先执行)。

回顾

ThreadPoolExecutor的任务调度逻辑
[置顶]        高并发程序设计入门
newFixedThreadPool()方法的实现,它返回一个corePoolSize和maximumPoolSize一样的,并使用了LinkedBlockingQueue任务队列(*队列)的线程池。当任务提交非常频繁时,该队列可能迅速膨胀,从而系统资源耗尽。
newSingleThreadExecutor()返回单线程线程池,是newFixedThreadPool()方法的退化,只是简单的将线程池数量设置为1.
newCachedThreadPool()方法返回corePoolSize为0而maximumPoolSize无穷大的线程池,这意味着没有任务的时候线程池内没有现场,而当任务提交时,该线程池使用空闲线程执行任务,若无空闲则将任务加入SynchronousQueue队列,而SynchronousQueue队列是直接提交队列,它总是破事线程池增加新的线程来执行任务。当任务执行完后由于corePoolSize为0,因此空闲线程在指定时间内(60s)被回收。对于newCachedThreadPool(),如果有大量任务提交,而任务又不那么快执行时,那么系统变回开启等量的线程处理,这样做法可能会很快耗尽系统的资源,因为它会增加无穷大数量的线程。
使用自定义线程池时,要根据具体应用的情况,选择合适的并发队列作为任务的缓冲。当线程资源紧张时,不同的并发队列对系统行为和性能的影响均不同。
ThreadPoolExecutor核心调度代码

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* workerCountOf(c)获取当前线程池线程总数
* 当前线程数 小于 corePoolSize核心线程数时,会将任务通过addWorker(command, true)方法直接调度执行。
* 否则进入下个if,将任务加入等待队列
**/

if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* workQueue.offer(command) 将任务加入等待队列。
* 如果加入失败(比如有界队列达到上限或者使用了synchronousQueue)则会执行else。
*
**/

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* addWorker(command, false)直接交给线程池,
* 如果当前线程已达到maximumPoolSize,则提交失败执行reject()拒绝策略。
**/

else if (!addWorker(command, false))
reject(command);
}

拒绝策略

线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK内置的拒绝策略如下:
1. AbortPolicy : 直接抛出异常,阻止系统正常运行。
2. CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
3. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
4. DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下。

public interface RejectedExecutionHandler {
/**
* @param r 请求执行的任务
* @param executor 当前线程池
*/

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

实例1:

/**
* 这个案例中,MyTask处理任务需要100毫秒,必然会在执行机构任务后,执行拒绝策略,导致大量的任务直接丢弃.
* 在实际应用中,我们可以将更详细的信息记录在日志中,来分析系统的负载和任务丢失的情况.
*
*
*/
public class RejectedPolicyExample {

public static class MyTask implements Runnable{


@Override
public void run() {
System.out.println(Thread.currentThread().getId() + " coming...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

public static void main(String[] a) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(
5,
5,
0L,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)
/**
* 注释块 2
* 自定义拒绝策略.
* 我们不抛出异常,因为万一在任务提交端没有进行异常处理
* 则有可能使得整个系统崩溃,这不是我们希望遇到的.这比只内置的discardPolicy高级一点点
* 查看 结果2
*/
/*,new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "discard...");
}
}*/);
MyTask myTask = new MyTask();
for (int i =0;i<100;i++){
/**
* 注释块 3
* 如果没有拒绝策略,则很有可能抛出异常,在这里进行捕获,保证系统一直执行完成.
* 查看 结果3
*/
// try {
es.submit(myTask);
// } catch (Exception e) {
// System.out.println(" i am discard...");
// }
Thread.sleep(10);
}
es.shutdown();
}

/*
* 结果1:
*
* 注释块2 和注释块3 都注释.
*
* 运行一段时间后抛出异常,
* 继续执行等待队列中保存的任务,
* 执行完后无法关闭线程池,主线程一直保持.
*
* 11 coming...
12 coming...
13 coming...
14 coming...
Exception in thread "main" java.util.concurrent.RejectedExecutionException:
Task java.util.concurrent.FutureTask@63961c42 rejected from
java.util.concurrent.ThreadPoolExecutor@65b54208
[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 10]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:2047)
.......
报错后,执行10个任务,线程都处于WAITING状态,无法继续执行.
10 coming...
11 coming...
12 coming...
13 coming...
14 coming...
10 coming...
11 coming...
12 coming...
13 coming...
14 coming...



最后打印出线程信息如下:
........
"pool-1-thread-5" #14 prio=5 os_prio=31 tid=0x00007fa6bc066000 nid=0x5703 waiting on condition [0x0000700001658000]
java.lang.Thread.State: WAITING (parking)
...

"pool-1-thread-4" #13 prio=5 os_prio=31 tid=0x00007fa6bb8a6000 nid=0x5503 waiting on condition [0x0000700001555000]
java.lang.Thread.State: WAITING (parking)
...

"pool-1-thread-3" #12 prio=5 os_prio=31 tid=0x00007fa6bd06d000 nid=0x5303 waiting on condition [0x0000700001452000]
java.lang.Thread.State: WAITING (parking)
...

"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007fa6bd06c000 nid=0x5103 waiting on condition [0x000070000134f000]
java.lang.Thread.State: WAITING (parking)
...

"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa6bb8a5000 nid=0x4f03 waiting on condition [0x000070000124c000]
java.lang.Thread.State: WAITING (parking)
...
* */


/*
* 结果2:
* 直到运行完成,并且线程池关闭.
*
* 0 coming...
11 coming...
12 coming...
13 coming...
14 coming...
java.util.concurrent.FutureTask@77459877discard....
....
java.util.concurrent.FutureTask@33c7353adiscard...
10 coming...
11 coming...
...

* */

/*
* 结果3:
* 直到运行完成,并且线程池关闭.
* 执行一段时间后,可以看到线程是55个执行的.
* 因为设置了核心线程数和最大线程数都是是5个.
*
* 14 coming...
10 coming...
11 coming...
12 coming...
13 coming...
14 coming...
i am discard...
i am discard...
....
10 coming...
11 coming...
....
12 coming...
* */


}

自定义线程创建ThreadFactory

自定义线程池
线程池的作用就是为了线程复用,也就是避免线程频繁的创建
但是,最开始的线程从何而来,就是ThreadFactory.

ThreadFactory是一个接口,它有一个方法是创建线程
Thread newThread(Runnable r);

自定义线程可以跟踪线程何时创建,自定义线程名称/组/优先级信息.
甚至可以设置为守护线程.总之自定义线程池可以让我们更加*的设置线程池中的所有线程状态.
实例1

public class ThreadFactoryExample {

public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " coming...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

public static void main(String[] a ) throws InterruptedException {
MyTask myTask = new MyTask();
ExecutorService es = new ThreadPoolExecutor(
5
, 5
, 0L
, TimeUnit.SECONDS
, new ArrayBlockingQueue<Runnable>(10)
, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("T " + t.getId() + "_" +System.currentTimeMillis());
t.setDaemon(true);
System.out.println("Create a Thread Name is : "+t.getName());
return t;
}
});
for (int i=0;i<10;i++){
es.submit(myTask);
}
Thread.sleep(2000);
// es.shutdown();

}
}

扩展线程池

我们想监控每个人物的执行开始时间 结束时间等细节,我们可以通过扩展ThreadPoolExecutor扩展线程池.他提供了beforExecute(),afterExecute(),和terminated()三个接口对线程池进行控制.
实例1

public class ThreadPoolExecutorExample {

public static class MyTask implements Runnable{

private String name;

public MyTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
System.out.println(name + " do sth..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] a) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(
5
,5
,0L
, TimeUnit.SECONDS
,new LinkedBlockingDeque<Runnable>()) {
/**
* 创建ThreadPoolExecutor的匿名内部类的子类
* @param t the thread that will run task 将要运行任务的线程
* @param r the task that will be executed 将要执行的任务
*/


@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("start execute .." + ((MyTask)r).name);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("after execute .." + ((MyTask)r).name);
}

@Override
protected void terminated() {
System.out.println("exit execute ..");
}
};


for (int i=0;i<10 ;i++){
MyTask myTask = new MyTask("T_" + i);
es.execute(myTask);//execute 和 submit 的区别在future模式中再说
Thread.sleep(100);
}

/**
* 他不会暴力的关闭,而会等待所有线程执行完后关闭线程
* 可以简单的理解为shutdown只是发送一个关闭信号,
* 但在shutdown之后,线程就不能再接受其他任务了.
*/

es.shutdown();



}

/*
* 结果:
* start execute ..T_0
T_0 do sth..
start execute ..T_1
T_1 do sth..
....
after execute ..T_3
start execute ..T_8
T_8 do sth..
after execute ..T_4
start execute ..T_9
T_9 do sth..
after execute ..T_5
...
after execute ..T_9
exit execute ..
* */


}

优化线程池线程数量

线程池的大小对系统性能有一定的影响,过大或过小的线程数量都无法发挥最优的系统性能,因此要避免极大和极小两种情况。
在《java Concurrency in Practice》中给出了一个估算线程池大小的经验公式:
Ncpu = CPU数量
Ucpu = 目标CPU的使用率(0 ≤ Ucpu ≤ 1 )
W/C = 等待时间与计算时间的比率
最优的池大小等于
Nthreads = Ncpu * Ucpu * (1+W/C)
在java中可以通过Runtime.getRuntime().availableProcessors()取得可用CPU数量。

JDK并发容器

JDK除了提供主语同步控制,线程池等基本工具外,为了提高大家的效率,还未大家准备了一批好用的容器类
包括链表,队列,HashMap等.它们都是线程安全的.
ConcurrentHashMap : 一个高效的线程安全的HashMap
CopyOnWriteArrayList : 在读多写少的场景中,性能非常好,远远高于vector.
ConcurrentLinkedQueue : 高效并发队列,使用链表实现,可以看成线程安全的LinkedList.
BlockingQueue : 一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用作数据共享通道.
ConcurrentSkipListMap : 跳表的实现,这是一个Map,使用跳表数据结构进行快速查找.
另外Collections工具类可以帮助我们将任意集合包装成线程安全的集合.

线程安全的HashMap

大家都制定HashMap在多线程环境中是线程不安全的,会产生相互引用的错误.
通过Collections.synchronizedMap(new HashMap<>());来包装一个线程安全的HashMap.
它使用委托,将自己所有的Map相关的功能交给HashMap实现,而自己负责包装线程安全.
它其实是通过指定对象mutex实现对这个m的互斥操作.

private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable {
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
......
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}

public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
......
}

虽然可以实现线程安全,但无论写入还是读取都需要获取mutex这把锁,所以性能不是太好,
我们倾向于使用ConcurrentHashMap来实现高并发下的安全的HashMap.

高效读写队列 ConcurrentLinkedQueue

队列Queue是常用的数据结构之一,JDK提供了一个ConcurrentLinkedQueue类用来实现高并发的队列

高效读取 CopyOnWriteArrayList

很多场景中都是读远远高于写操作,那么每次对读取进行加锁其实是一种资源浪费.根据读写锁的思想,读锁和读锁之间不冲突.
但是读操作会受到写操作的阻碍,在写操作发生时,读就必须等待,否则可能读到不一致的数据.同理读操作正在进行的时候,
程序也不能进行写入.
JDK提供了CopyOnWriteArrayList类,读取是完全不加锁的,并且写入也不会阻塞读取操作,这样一来性能大大提升了.
其实,就是在写操作时,进行一次自我复制,当List需要修改时,并不修改原有内容(这对于保证当前读线程的数据一致性非常重要)
而对原内容进行一次复制,将修改内容写入副本.写完后,再将修改完的副本替换原来的操作.这就保证了写操作不会影响读了.

public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {


final transient ReentrantLock lock = new ReentrantLock();

private transient volatile Object[] array;

final void setArray(Object[] a) {
array = a;
}

//关于读取的实现(不上锁):
final Object[] getArray() {
return array;
}
private E get(Object[] a, int index) {
return (E) a[index];
}

public E get(int index) {
return get(getArray(), index);
}

//其他代码
.......

//写入操作:
public boolean add(E e) {
//上重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取原内容
Object[] elements = getArray();
int len = elements.length;
//复制原内容 并且数组长度增加1
Object[] newElements = Arrays.copyOf(elements, len + 1);
//放入新内容
newElements[len] = e;
//替换老的数组,并且读线程可以立即"察觉"到,因为array变量是volatile类型
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

数据共享通道 BlockingQueue

解决多线程数据共享问题,可以使用 BlockingQueue 接口来实现.
public interface BlockingQueue<E> extends Queue<E>
具体实现类如下:
1. ArrayBlockingQueue
2. DelayedWorkQueue
3. DelayQueue
4. LinkedBlockingQueue
5. SynchronousQueue
6. BlockingDeque
7. PriorityBlockingQueue

ArrayBlockingQueue 基于数组实现,更适合做有界队列,因为可容纳的最大元素需要在创建时指定.毕竟数组动态扩展不太方便.
LinkedBlockingQueue 基于链表实现,适合做*队列,或者边界值非常大的队列,它不会因为初始容量大,而一口气吃掉内存.

BlockingQueue之所以适合作为数据共享通道,关键还在blocking上.blocking阻塞的意思,
当服务线程(指不断获取队列中消息进行处理的线程)处理完成队列中的消息后,它如何知道吓一跳消息何时到来.
一种简单的办法是不断间隔循环和监控这个队列,但会造成不必要的资源浪费.循环周期也难以确定.而blockingQueue会
让服务线程在队列为空的时候等待,当有新消息进入队列后自动将线程唤醒.

ArrayBlockingQueue 内部元素都放置在一个对象数组中.final Object[] items;
向队列中压入元素可以使用offer()方法和put()方法,对于offer(),如果队列已经满了,它会立即返回false.这不是我们需要的.
put()方法是将元素压入队列末尾.但如果队列满了,它会一直等待,知道队列中有空闲的位置.
从队列中弹出元素可以使用pull()和take()方法,它们都是从头部获取一个元素,不同的是如果队列为空pull()直接返回null,
而take()方法会等待,知道队列内有可用元素.
因此put()方法和take()方法提现了blocking的关键.
为了做好等待和通知两件事在ArrayBlockingQueue内部定义了一些字段,当执行take()操作时,如果队列为空,则让当前线程
等待在notEmpty上,新元素入队时,则进行一次notEmpty通知.

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

final ReentrantLock lock;
//如果队列为空take时等待,put时通知,因为有新元素入队,通知notEmpty上等待的任务可以取出元素啦.
private final Condition notEmpty;
//如果队列满时put等待,take时通知,因为有元素拿走了,通知notFull上等待的任务可以增加元素啦.
private final Condition notFull;
//其他代码
.....

//提取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空
while (count == 0)
//等待增加操作通知
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

//压入元素
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列已经满了?
while (count == items.length)
//等待提取操作通知
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

//增加操作
//Inserts element at current put position, advances, and signals.
//Call only when holding lock.
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知 可以开始取元素啦
notEmpty.signal();
}


//提取操作
//Extracts element at current take position, advances, and signals.
//Call only when holding lock.
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知 可以插入元素啦.
notFull.signal();
return x;
}


}

随机数据结构 跳表 SkipList

跳表是一种用来快速查找的数据结构,有点类似平衡树,对于平衡树的插入和删除往往可能导致平衡树进行一次全局的调整.
而对跳表的插入和删除只需要对局部进行操作即可.这样带来的好处是,在高并发情况下,如果是平衡树,那么会需要一个全局锁
来保证线程安全.而跳表只需要部分锁即可.,所以在并发数据结构中,JDK使用跳表来实现一个Map
跳表的另一个特点是随机算法,跳表本身维护了多个链表,并且链表是分层的.没上面一层链表都是下面一层的子集,一个元素插入到哪些层完全是随机的.
跳表内所有链表的元素都是排序的.查找时可以从顶链表开始,一旦发现被查找的元素大于当前链表中的值,就会转入下一层链表继续查找.也就是说搜索是跳跃式的.
跳表是一种空间换时间的算法.
使用跳表实现Map和哈希算法实现Map的另一个不同是,哈希不会保存元素的顺序,而跳表内的所有元素都是排序的.因此对跳表遍历时会得到一个有序的结果.

跳表的内部有几个关键的数据结构组成.首先是Node,一个Node表示一个节点,
每个Node还会指向下一个Node,因此还有next元素.
对Node的所有操作,使用CAS方法.

static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
.....
//设置value值
boolean casValue(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
//设置next字段
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
}

另一个重要的是Index元素,顾名思义是索引的意思,内部还包装了Node,同时增加了向下down和向右right的引用.
整个跳表是根据Index进行全网的组织.

static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
.....
}

此外对于每一层的表头,还需要记录当前处于那一层,为此还需要一个HeadIndex的数据结构,表示链表的头部第一个Index.

    //Nodes heading each level keep track of their level.
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}

对于跳表的操作就是组织好这些Index之间的连接关系.

锁的优化及注意事项

锁是最常用的同步方法之一,在高并发环境下,激烈的锁竞争会导致性能下降,
对多线程来说,系统除了处理功能需求之外,还需额外的维护多线程环境的特有信息,如线程本身的元数据,线程的调度,上下文切换等.
合理的并发,才能将多核CPU的性能发挥到极致.

提高”锁”性能的几点建议

减少锁持有的时间

在锁的竞争中,单个线程对锁的持有时间与系统性能有直接的关系.应该尽可能的减少锁的占有时间,以减少线程之间互斥的可能.
减少锁的持有时间有助于降低锁冲突的可能性,进而提高系统的并发能力.


// 说明:otherMethod()是没有线程安全问题的.
// 同步整个方法,如果在并发量较大时,使用这种对整个方法做同步的方案.会导致等待线程大量增加.
public synchronized void method(){
otherMethod();
needSyncMethod();
otherMethod();
}


//优化方法之一是,只在必要时进行同步,这样就能明显的减少线程持有锁的时间,提高系统吞吐量.
public void method(){
otherMethod();
synchronized(this){
needSyncMethod();
}

otherMethod();

}

减小锁粒度

减小锁的粒度也是一种削弱多线程锁竞争的有效手段.这种技术典型的应用场景就是ConcurrentHashMap类的实现.
对于HashMap来说,最重要的两个方法是put()和get().concurrentHashMap内部进一步分了若干个小的HashMap,称之为(SEGMENT).
默认情况下,一个ConcurrentHashMap进一步细分为16个段.如果增加表项,并不是将整个HashMap加锁,而是首先根据hashcode得到该
表项应该被放在哪个段中,然后对该段加锁,完成put()操作.只要被加入的数据不存放在同一个表项,则多个线程的put()操作可以做到真正的并行.
由于默认16个段所以ConcurrentHashMap最多可以同时接受16个线程同时插入.
所谓减少锁粒度,就是指缩小锁定对象范围,从而减少锁冲突的可能性,进而提高系统的并发能力.

读写分离锁替换独占锁

使用读写锁ReadWriteLock可以提高系统性能.如果说减少锁粒度是通过分割数据结构实现的,那么读写锁则是对系统功能点的分割.
在读多写少的场合使用读写锁可以有效替身系统的并发能力.

锁分离.

将读写锁思想进一步延伸就是锁分离.读写锁依据读写操作功能上的不同,进行了有效的锁分离.
依据应用程序的功能特点,使用类似的分离思想,也可以对独占锁进行分离.一个典型的案例就是LinkedBlockingQueue的实现.
take()和put()方法虽然都对队列进行了修改操作,但由于是链表,因此,两个操作分别作用于队列的前端和末尾,理论上两者并不冲突.
使用独占锁,则要求在进行take和put操作时获取当前队列的独占锁,那么take和put酒不可能真正的并发,他们会彼此等待对方释放锁.
在JDK的实现中,取而代之的是两把不同的锁,分离了take和put操作.削弱了竞争的可能性.实现类取数据和写数据的分离,实现了真正意义上成为并发操作.

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {


//take和put之间不存在锁竞争关系,只需要take和take之间,put和put之间进行竞争.
// Lock held by take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();

// Wait queue for waiting takes
private final Condition notEmpty = takeLock.newCondition();

// Lock held by put, offer, etc
private final ReentrantLock putLock = new ReentrantLock();

// Wait queue for waiting puts
private final Condition notFull = putLock.newCondition();

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //上锁,不能有两个线程同时写数据
try {
while (count.get() == capacity) { //当队列满时,等待拿走数据后唤醒.
notFull.await();
}
enqueue(node);
c = count.getAndIncrement(); //更新总数,count是加(getAndIncrement先获取当前值,再给当前值加1,返回旧值)
if (c + 1 < capacity) //如果旧值+1 小于 队列长度
notFull.signal(); //唤醒等待的写入线程.继续写入.
} finally {
putLock.unlock(); //释放锁
}
if (c == 0) //take操作拿完数据后就一直在notEmpty等待,这个时候的count为0,而当put操作后,成功后就可以唤醒take操作继续执行了.而当队列中count很多时,这一步是不需要执行的.
signalNotEmpty(); //唤醒在notEmpty等待的线程.
}

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); //上锁
try {
while (count.get() == 0) { //如果队列为0,等待
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement(); //先取原值,再减1
if (c > 1) //如果队列大于1,自己继续执行.
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) //当长度等于设定的队列长度,就唤醒take操作.
signalNotFull();
return x;
}

private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

}

锁粗化

如果对一个锁不停地进行请求,同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能优化.
虚拟机需要一连串对同一把锁不断进行请求和释放操作时,便会把所有的锁操作整合成对锁的一次请求,从而减少对锁的请求同步次数,这就是锁的粗化.

public void demoMethod(){
synchronized(lock){
//doSth...
}

//其他不需要同步但很快完成的事情
.....

synchronized(lock){
//doSth...
}


}

整合如下:

public void demoMethod(){
synchronized(lock){
//doSth...

//其他不需要同步但很快完成的事情
.....
}
}

性能优化就是根据运行时的真实情况对各个资源点进行权衡折中的过程.锁粗化的思想和减少锁持有时间是相反的,但在不同场合,他们的效果并不相同.所以大家要根据实际情况进行权衡.

Java虚拟机对锁优化所做的努力

锁偏向

偏向锁是一种针对加锁操作的优化手段,他的核心思想是:如果一个线程获得了锁,那么锁就进行偏向模式.当这个线程再次请求锁时,无需再做任何同步操作.这样就节省了大量操作锁的动作,从而提高程序性能.
因此,对于几乎没有锁竞争的场合,偏向锁有比较好的优化效果.因为极有可能连续多次是同一个线程请求相同的锁.而对于锁竞争激烈的程序,其效果不佳.
使用Java虚拟机参数:-XX:+UseBiasedLocking 可以开启偏向锁.

轻量级锁

如果偏向锁失败,虚拟机并不会立即挂起线程.它还会使用一种称为轻量级的锁的优化手段.轻量级锁只是简单的将对象头部作为指针,指向持有锁的线程堆栈内部,来判断一个线程是否持有对象锁.
如果线程获得轻量锁成功,则可以顺利进入临界区.如果失败,则表示其他线程争抢到了锁,那么当前线程的锁请求就会膨胀为重量级锁.

自旋锁

锁膨胀后,虚拟机为了避免线程真实的在操作系统层面挂起,虚拟机还做了最后的努力就是自旋锁.如果一个线程暂时无法获得索,有可能在几个CPU时钟周期后就可以得到锁,
那么简单粗暴的挂起线程可能是得不偿失的操作.虚拟机会假设在很短时间内线程是可以获得锁的,所以会让线程自己空循环(自旋),如果尝试若干次后,可以得到锁,那么久可以顺利进入临界区,
如果还得不到,才会真实地讲线程在操作系统层面挂起.

锁消除

锁消除是一种更彻底的锁优化,Java虚拟机在JIT编译时,通过对运用上下文的扫描,取出不可能存在的共享资源竞争锁,节省毫无意义的资源开销.
锁消除设计的一项关键技术是逃逸分析,就是观察某个变量是否会跳出某个作用域,(比如对Vector的一些操作).

    public String[] createStrings(){
//v首先是局部变量,局部变量是在线程栈上分配的,属于线程私有的数据.
//在这种情况下,Vector内部的所有加锁同步操作都没有必要.如果虚拟机检测到,就会将这些无用的锁去除.
Vector<String> v = new Vector<>();
for (int i=0;i<10;i++)
v.add(Integer.toString(i));
//逃逸分析:如果返回的是v而不是String数组,那么就认为变量v逃逸出来当前函数,有可能被其他线程访问到.如果这样虚拟机就不能进行锁消除操作.
return v.toArray(new String[]{});
}

逃逸分析必须在-server模式下进行,可以使用-XX:+DoEscapeAnalysis 参数打开逃逸分析
使用-XX:+EliminateLocks 参数可以打开锁消除.

ThreadLocal

从名字上看,这是一个线程的局部变量,只有当前线程才可以访问到.
注意:为每个线程分配不同的对象,需要在应用层面保证.ThreadLocal只起到了简单的容器作用.
正确使用ThreadLocal保证线程安全的前后对比示例

public class ThreadLocalExample {
/**
* 不使用ThreadLocal,且无锁的日期转换
*/

private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static class ParseDate implements Runnable{

@Override
public void run() {
try {
Date t = sdf.parse("2016-03-01 12:13:14");
System.out.println("date : "+t);
} catch (ParseException e) {
e.printStackTrace();
}
}
}

/**
* 使用ThreadLocal,且无锁的日期转换
*/

private static ThreadLocal<SimpleDateFormat> tsdf = new ThreadLocal<>();
public static class TParseDate implements Runnable{

@Override
public void run() {
if (tsdf.get() == null){
//这里一定要注意,需要给每个线程传入的对象实例是不同的.(使用new关键字).
//如果传入了相同的实例,那么也不能保证线程是安全的.
tsdf.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") );
}
Date t = null;
try {
t = tsdf.get().parse("2016-03-01 12:13:14");
} catch (ParseException e) {
e.printStackTrace();
}
System.out.println("date : "+t);
}
}

public static void main(String[] a ){

ExecutorService es = Executors.newFixedThreadPool(10);
for (int i=0;i<1000;i++){
//安全的日期转换
es.execute(new TParseDate());

//这个方法是线程不安全的日期转换,会发生异常
//es.execute(new ParseDate());

/*
错误结果如下:
Exception in thread "pool-1-thread-6" Exception in thread "pool-1-thread-3"
java.lang.NumberFormatException: For input string: ""
java.lang.NumberFormatException: multiple points

发生这种错误是因为simpleDateFormat的parse方法不是线程安全的,
解决这种问题可以通过加锁的方式,也可以通过ThreadLocal.
*/

}
es.shutdown();
}
}

ThreadLocal实现原理

关注核心方法 get() 和 set()

public class ThreadLocal<T> {

//设置到ThreadLocal中的数据也正是写入了这个Map,key为ThreadLocal当前对象,value就是我们需要的值.
//而threadLocals本身就保存在了当前的线程的局部变量中.
//@param value the value to be stored in the current thread's copy of this thread-local.
public void set(T value) {
Thread t = Thread.currentThread(); //获得当前线程对象
//返回一个线程的ThreadLocalMap(ThreadLocal内部类,但它是定义在Thread内部的成员`ThreadLocal.ThreadLocalMap threadLocals = null;`),
//可以理解为是一个定制的哈希映射只适合维护线程局部变量的对象.
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

//@return the current thread's value of this thread-local
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
//将自己作为key取得内部实际的数据.
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

//@param t the current thread
//@return the map
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}


}

线程退出后Thread的清理工作

如果我们使用了线程池,那就意味着当前线程未必会退出(如固定线程池),如果将一些非常大的对象设置在ThreadLocal中,但不清理它,可能会使系统内存泄露.
Thread源码

public class Thread implements Runnable {

//ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class.
ThreadLocal.ThreadLocalMap threadLocals = null;

//This method is called by the system to give a Thread
//a chance to clean up before it actually exits.
//在线程退出前,由系统回调,进行资源清理.将Object设置为null可以加速回收.
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
//Aggressively null out all reference fields: see bug 4006245
target = null;
//Speed the release of some of these resources
//加速释放资源
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}

}

如果希望及时回收对象,最好使用ThreadLocal的remove方法将变量移除.对于ThreadLocal变量,也可以手动设为null,那么ThreadLocal对应的所有线程局部变量都可能被回收.
加速回收简单示例

public class ThreadLocalExample1 {

static volatile ThreadLocal<SimpleDateFormat> tsdf = new ThreadLocal<SimpleDateFormat>(){
@Override
protected void finalize() throws Throwable {
System.out.println(this.toString() + " is gc..first.");
}

};
static volatile CountDownLatch cdl = new CountDownLatch(10000);
public static class TParseDate implements Runnable{

@Override
public void run() {
if (tsdf.get() == null){
tsdf.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"){
//这里重写,是希望看到当ThreadLocalMap的key为NULL时,被清楚.(暂时没有打印出来.)
@Override
protected void finalize() throws Throwable {
System.out.println(this.toString() + " is gc...");
}
});
System.out.println(Thread.currentThread().getId() + " create simpleDateFormat...");
}
Date t = null;
try {
t = tsdf.get().parse("2016-03-01 12:13:14");
} catch (ParseException e) {
e.printStackTrace();
}finally {
cdl.countDown();
}
}
}

public static void main(String[] a ) throws InterruptedException {

ExecutorService es = Executors.newFixedThreadPool(10);
for (int i=0;i<10000;i++){
es.execute(new TParseDate());
}
cdl.await();
System.out.println("mission complete ...");
tsdf = null; //加速释放资源
System.gc(); //垃圾回收
System.out.println("GS complete ...");

//在设置新的ThreadLocal时,会清除ThreadLocalMap中的无效对象.
tsdf = new ThreadLocal<SimpleDateFormat>(){};
cdl = new CountDownLatch(10000);
for (int i=0;i<10000;i++){
es.execute(new TParseDate());
}
cdl.await();
Thread.sleep(1000);
/*垃圾回收,
理论上是可以清除ThreadLocalMap中的无效对象
(说明:我用的是jdk8,反复测试,无法看到ThreadLocalMap中的无效对象被gc.)*/

System.gc();
System.out.println("second GS complete ...");
//shutdown会执行Thread的exit()方法,在exit中会将threadLocals设置为null.这一步很重要.
es.shutdown();

System.out.println("terminated ...");
}
}
/*执行结果:
18 create simpleDateFormat...
13 create simpleDateFormat...
16 create simpleDateFormat...
14 create simpleDateFormat...
17 create simpleDateFormat...
19 create simpleDateFormat...
15 create simpleDateFormat...
12 create simpleDateFormat...
10 create simpleDateFormat...
11 create simpleDateFormat...
mission complete ...
GS complete ...
com.iboray.javacore.Thread.T5.ThreadLocalExample1$1@6299b640 is gc..first.//ThreadLocal对象被回收
19 create simpleDateFormat...
17 create simpleDateFormat...
11 create simpleDateFormat...
13 create simpleDateFormat...
12 create simpleDateFormat...
14 create simpleDateFormat...
10 create simpleDateFormat...
16 create simpleDateFormat...
18 create simpleDateFormat...
15 create simpleDateFormat...
second GS complete ...
terminated ...
*/

以上示例执行过程中ThreadLocal的内部,注意ThreadLocalMap。
[置顶]        高并发程序设计入门

ThreadLocalMap

它是一个类似HashMap的东西,更精确的说,它更加雷系WeakHashMap.它的实现使用了若引用,Java虚拟机在垃圾回收的时候,如果发现若引用,就会立即回收.
它的内部由一系列的Entry构成,每个Entry都是WeakHashMap:

static class ThreadLocalMap {
........

static class Entry extends WeakReference<ThreadLocal<?>> {
//The value associated with this ThreadLocal.
Object value;

//k 是ThreadLocal的实例.
//v 我们set的值(这里是例子就是simpleDateFormat实例)
Entry(ThreadLocal<?> k, Object v) {
//弱引用,调用WeakReference的构造函数.并不真正持有ThreadLocal的引用.
//当ThreadLocal在外部强引用被回收时,ThreadLocal的key就变为null.
//当系统进行ThreadLocalMap清理时,就会自然将这些垃圾数据回收. (在jdk8环境下没测试成功.)
super(k);
value = v;
}
}
}

ThreadLocal回收机制

[置顶]        高并发程序设计入门

对性能有何帮助

为每一个线程分配一个独立的对象对系统性能也许是有帮助的,如果共享对象对于竞争的处理容易引起性能损失.
我们可以考虑为每一个线程使用ThreadLocal分配单独的对象.
性能测试示例

public class ThreadLocalExample2 {

//定义随机次数
public static final int GEN_COUNT = 10000000;

//定义线程数
public static final int THREAD_COUNT = 4;

//定义线程池
static ExecutorService es = Executors.newFixedThreadPool(THREAD_COUNT);

//定义共享单一实例(Random(1)保证随机同样的随机数)
public static Random random = new Random(1);

//设置ThreadLocal
public static ThreadLocal<Random> threadLocalRandom = new ThreadLocal<Random>(){
@Override
protected Random initialValue() {
return new Random(1);
}
};

public static class TRandom implements Callable<Long> {

private int mode = 0;

public TRandom(int mode) {
this.mode = mode;
}

private Random getRandom(){
if (mode == 0)
return random;
else if (mode == 1)
return threadLocalRandom.get();
else
return null;
}
@Override
public Long call() throws Exception {
long s = System.currentTimeMillis();
for (int i =0;i<GEN_COUNT;i++){
getRandom().nextInt();
}
long e = System.currentTimeMillis();
System.out.println( e -s );
return e -s;
}
}


public static void main(String[] a) throws InterruptedException, ExecutionException {

Future<Long>[] futures = new Future[THREAD_COUNT];
for (int i=0;i<THREAD_COUNT;i++){
futures[i] = es.submit(new TRandom(0));
}
long totalTime = 0;
for (int i=0;i<THREAD_COUNT;i++){
totalTime +=futures[i].get();
}
System.out.println("多线程访问同一个实例 : " + totalTime + " ms");

/**
* 改为ThreadLocal后
*/

totalTime=0;
for (int i=0;i<THREAD_COUNT;i++){
futures[i] = es.submit(new TRandom(1));
}
for (int i=0;i<THREAD_COUNT;i++){
totalTime +=futures[i].get();
}
System.out.println("多线程使用ThreadLocal后 : " + totalTime + " ms");
es.shutdown();
}
}
/**运行结果:
1079
1157
1181
1203
多线程访问同一个实例 : 4620 ms
192
226
244
254
多线程使用ThreadLocal后 : 916 ms
**/

无锁

无锁是一种乐观的策略.它假设对资源的访问没有冲突.遇到冲突使用CAS(Compare And Swap)策略来鉴别线程的冲突,一旦检测到冲突,就重试当前操作指导没有冲突为止.

比较交换CAS

相比锁,使用CAS的程序看起来会比较复杂一些,但由于其非阻塞性,和天生免疫,并且线程间的相互影响也是远远比基于锁的方式要小.
更为重要的是,使用无锁的方式完全没有锁竞争带来的系统开销,也没有线程间频繁调度带来的系统开销,因此,它要比基于锁的方式拥有更优越的性能.

CAS包含三个参数CAS(V,E,N).V表示要跟新的变量,E表示预期值,N表示新值.仅当V值等于E值的时候,才会将V设置为N.反之说明已有其他线程做了更新.
则当前线程说明也不做.最后CAS返回当前V的真实值.失败的线程不会被挂起,仅是被告知失败,并允许再次尝试.当然也允许失败的线程放弃操作.基于这样的原理
CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理.
在硬件层面,大部分现代CPU都已经支持原子化的CAS指令,在JDK5.0之后,虚拟机便可以使用这个指令来实现并发操作和并发数据结构.

无锁的线程安全整数 : AtomicInteger

JDK并发包中有一个atomic包,里面实现了一些直接使用CAS操作的线程安全类型.

 //获取当前的值
public final int get()
//设置当前值
public final void set(int newValue)
//取当前的值,并设置新的值
public final int getAndSet(int newValue)
//如果当前值为expect,则设置为update
public final boolean compareAndSet(int expect,int update)
//获取当前的值,并自增
public final int getAndIncrement()
//获取当前的值,并自减
public final int getAndDecrement()
//获取当前的值,并加上预期的值
public final int getAndAdd(int delta)
//当前值增加delta,返回新值
public final int addAndGet(int delta)
//当前值+1,返回新值
public final int incrementAndGet()
//当前值-1,返回新值
public final int decrementAndGet()

无锁实现整数累加示例

public class UnLockExample1 {
public static final int THREAD_NUM = 20;
public static final int NUM = 100000;

//无锁变量区
static AtomicInteger in = new AtomicInteger();

//有锁变量区
static int a = 0;

public static class AddThread implements Runnable{
@Override
public void run() {
for (int k=0;k<NUM;k++) {
in.incrementAndGet(); //详情参考 -->Java中的指针,Unsafe类
}
}
}

public static class lockExample implements Runnable{
static final lockExample ue = new lockExample();
@Override
public void run() {
for (int i = 0; i < NUM; i++) {
//加在这里是为了保持和CAS范围一致.
synchronized (ue) {
a += 1;
}
}
}
}
public static void main(String[] a) throws InterruptedException {

Thread[] threads = new Thread[THREAD_NUM];
for (int i=0;i<THREAD_NUM ;i++){
threads[i] = new Thread(new AddThread());
}
long s = System.currentTimeMillis();
for (int i=0;i<THREAD_NUM ;i++)
threads[i].start();
for (int i=0;i<THREAD_NUM ;i++)
threads[i].join();
long e = System.currentTimeMillis();
System.out.println("AddThread result : "+in.get()+" 耗时 : "+(e-s)+" ms");

for (int i=0;i<THREAD_NUM ;i++){
threads[i] = new Thread(new lockExample());
}
s = System.currentTimeMillis();
for (int i=0;i<THREAD_NUM ;i++)
threads[i].start();
for (int i=0;i<THREAD_NUM ;i++)
threads[i].join();
e = System.currentTimeMillis();
System.out.println("lockExample result : "+in.get()+" 耗时 : "+(e-s)+" ms");


}
}
/**测试结果
AddThread result : 2000000 耗时 : 48 ms
lockExample result : 2000000 耗时 : 108 ms
**/

Java中的指针,Unsafe类

当我们进入AtomicInteger.incrementAndGet();方法的实现中(与JDK7不一样),可以看到具体的实现代码:
我们看到一个特殊的变量unsafe,它是sun.misc.Unsafe类型,从名字上看这个类是封装了一些不安全的操作.也就是针对指针的操作.

 public class AtomicInteger extends Number implements java.io.Serializable {
//Atomically increments by one the current value.
//@return the updated value
//jdk8
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

//jdk7
public final int incrementAndGet() {
for(;;){
int current = get();
int next = current +1;
if(compareAndSet(current,next))
return next;
}
}
}


public final class Unsafe {

//JDK开发人员并不希望我们使用这个类,调用工厂方法getUnsafe(),返回的却是Exception;
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
//检查调用getUnsafe()的函数类,如果这个类的ClassLoader不是null,直接抛出异常,因此,使得我们的应用程序无法直接调用.
//它是JDK内部使用的专属类.

if(!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

//注意:
//根据Java类加载器的工作原理,应用程序的类由App Loader加载.而系统核心类,如 rt.jar中的类由Bootstrap类加载器加载.Bootstrap加载器没有Java对象的对象,
//因此视图获得这个类的加载器会返回null.所以,当一个类的类加载器为null时,说明它是由Bootstrap加载的,而这个类也极有可能是rt.jar中的类.
public static boolean isSystemDomainLoader(ClassLoader var0) {
return var0 == null;
}

//一个native方法,
//var1 给定的对象
//var2 为对象内的偏移量(其实就是一个字段到对象头部的偏移量,通过偏移量可以快速定位字段)
//var3 期望值
//var4 要设置的值
//不难看出这个方法的内部必然是CAS原子指令来完成的.
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}
}

AtomicReference

AtomicReference是对普通对象的引用,也就是它保证你在修改对象引用的时候线程是安全的.
我们之前说保证当前值和预期值一致,并且写入新值,是没问题的安全的.这个逻辑在一般情况下是正确的,但有个例外是,当获得对象当前数据后,在准备设置新值之前,被其他线程连续修改了两次,经过两次修改后值又变为旧值.这种情况发生概率极小,但在现实中,可能存在另外的场景是,我们是否能修改对象的值,不仅取决于当前值,还和对象的过程变化有关.
忽略过程的小例子

public class AtomicReferenceExample {
//场景 : 商铺赠送活动,不满20用户一次充值20.

//用户余额
static AtomicReference<Integer> userAmount = new AtomicReference<>();
static {
userAmount.set(19);
}

public static class AddThread implements Runnable{
@Override
public void run() {
//扫描目前不够20的用户
while (true)
while (true){
Integer a = userAmount.get();
if (a < 20){
if (userAmount.compareAndSet(a,a+20)){
System.out.println(" 为用户充值成功 , 余额为 : "+userAmount.get());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}else {
System.out.println(" 无须充值 , 余额为 : " + userAmount.get());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}
}
}

public static class SubThread implements Runnable{
@Override
public void run() {
for (int i=0;i<100;i++)
while (true){
Integer a = userAmount.get();
if (a > 10){
if (userAmount.compareAndSet(a,a-10)){
System.out.println(" 消费成功 , 余额为 : "+userAmount.get());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}else {
System.out.println(" 余额不足 : " + userAmount.get());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}
}
}


public static void main(String[] a) throws InterruptedException {

Thread[] addThreads = new Thread[10];
Thread[] subThreads = new Thread[10];
for (int i=0;i<10;i++){
addThreads[i] = new Thread(new AddThread());
subThreads[i] = new Thread(new SubThread());
}
for (int i=0;i<10;i++){
addThreads[i].start();
subThreads[i].start();
}

}
}
/**测试结果
为用户充值成功 , 余额为 : 39
消费成功 , 余额为 : 29
无须充值 , 余额为 : 29
消费成功 , 余额为 : 19
为用户充值成功 , 余额为 : 39
消费成功 , 余额为 : 29
无须充值 , 余额为 : 29
消费成功 , 余额为 : 19
为用户充值成功 , 余额为 : 39
消费成功 , 余额为 : 29
无须充值 , 余额为 : 29
消费成功 , 余额为 : 19
为用户充值成功 , 余额为 : 39
消费成功 , 余额为 : 29
无须充值 , 余额为 : 29
消费成功 , 余额为 : 19
为用户充值成功 , 余额为 : 39
消费成功 , 余额为 : 29
无须充值 , 余额为 : 29
消费成功 , 余额为 : 19
......
**/

AtomicStampedReference

AtomicReference无法解决对象在修改过程中丢失了状态信息的问题,因此我们通过 AtomicStampedReference 解决这个问题.
它内部不仅维护了对象值,还维护了一个时间戳(实际上它可以使任何一个整数来表示状态).当 AtomicStampedReference 对应的数值被修改时除了更新数据本身,还必须要更新时间戳.当设置值时,AtomicStampedReference 保证对象值以及时间戳都必须满足期望值,才能成功写入.
关注状态和数据的例子

public class AtomicStampedReferenceExample {
//场景 : 商铺赠送活动,不满20用户一次充值20.

//用户余额及时间戳
static AtomicStampedReference<Integer> userAmount = new AtomicStampedReference<>(19,0);

//保证多线程环境下,定位第一次.如果放到循环内,就任然会重复多次充值,因为Stamp每次都在变.无法保证第一次,后续的赠予操作已这个为依据
static final int t = userAmount.getStamp();
public static class AddThread implements Runnable{
@Override
public void run() {
//扫描目前不够20的用户
while (true)
while (true){
Integer a = userAmount.getReference();
if (a < 20){
// //赠予成功修改时间戳,将首次充值的时间戳修改,使得不可能发生第二次充值.
if (userAmount.compareAndSet(a,a+20,t,t+1)){
System.out.println(" 为用户充值成功 , 余额为 : "+userAmount.getReference());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}else {
System.out.println(" 无须充值 , 余额为 : " + userAmount.getReference());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}
}
}

public static class SubThread implements Runnable{
@Override
public void run() {
for (int i=0;i<100;i++)
while (true){
final int c = userAmount.getStamp();
Integer a = userAmount.getReference();
if (a > 10){
//每次更新时间戳,不能重复.
if (userAmount.compareAndSet(a,a-10,c,c+1)){
System.out.println(" 消费成功 , 余额为 : "+userAmount.getReference());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}else {
System.out.println(" 余额不足 : " + userAmount.getReference());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
}
}
}


public static void main(String[] a) throws InterruptedException {

Thread[] addThreads = new Thread[10];
Thread[] subThreads = new Thread[10];
for (int i=0;i<10;i++){
addThreads[i] = new Thread(new AddThread());
subThreads[i] = new Thread(new SubThread());
}
for (int i=0;i<10;i++){
addThreads[i].start();
subThreads[i].start();
}
}
}
/**测试结果
为用户充值成功 , 余额为 : 39
消费成功 , 余额为 : 29
无须充值 , 余额为 : 29
消费成功 , 余额为 : 19
消费成功 , 余额为 : 9
余额不足 : 9
余额不足 : 9
余额不足 : 9
余额不足 : 9
余额不足 : 9
余额不足 : 9
余额不足 : 9
余额不足 : 9
......
**/

无锁数组 AtomicIntegerArray

AtomicIntegerArray本质上是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性.
示例

public class AtomicIntegerArrayExample {
public static final int TN = 20;

static AtomicIntegerArray arr = new AtomicIntegerArray(10);
public static class ArrayWorker implements Runnable{
@Override
public void run() {
for (int k=0;k<10000;k++){
//每个线程分别在10个下标插入1000次.
arr.getAndIncrement(k % arr.length());
}
}
}

public static void main(String[] a ) throws InterruptedException {
Thread[] ts = new Thread[TN];
for (int i=0;i<TN;i++)
ts[i] = new Thread(new ArrayWorker());
for (int y=0;y<TN;y++)
ts[y].start();
for (int y=0;y<TN;y++)
ts[y].join();

System.out.println(arr);
/*结果,每个元素永远都一样的.
[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]
*/

}
}

神奇的 AtomicIntegerFieldUpdater

如果还在打算一个个修改线程安全的变量,那么AtomicIntegerFieldUpdater是你的救星,
它可以让你在改动极少代码的情况下,实现CAS策略的线程安全.
根据数据类型不同,Updater有三种
1. AtomicIntegerFieldUpdater (对应int)
2. AtomicLongFieldUpdater (对应long)
3. AtomicReferenceFieldUpdater (对应普通对象)

虽然AtomicIntegerFieldUpdater很好用,但是需要注意几点
1. Updater只能修改它可见范围的变量.因为Updater是通过反射得到的这个变量.如果不可见,会出错.比如score设置为private,就不行.
2. 为了确保变量被正确的读取,必须是volatile修饰.
3. 由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(public native long objectFieldOffset(Field var1);不支持静态变量)
3.1 静态变量存储在静态存储区,程序启动时就分配空间,程序退出时释放。普通成员变量在类实例化时分配空间,释放类的时候释放空间,存储在栈或堆中。

无锁算法

大家可以看下Amino CBB (Concurrent Building Blocks) 类库,有很多例子都是CAS算法实现.这里就不细说了.如果以后用到再深入吧.
示例

public class AtomicIntegerFieldUpdaterExample {
//投票人数
public static final int TN = 10000;
public static class Candidate{
int id;
//得分
volatile int score;
}
public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
public static AtomicInteger check = new AtomicInteger();

public static void main(String[] a ) throws InterruptedException {
final Candidate candidate = new Candidate();
Thread[] ts = new Thread[TN];
for (int i=0;i<TN;i++){
ts[i] = new Thread(){
@Override
public void run() {
//Math.random() 解释 : a pseudorandom {@code double} greater than or equal to 0.0 and less than 1.0.
if (Math.random() > 0.4){
scoreUpdater.incrementAndGet(candidate);
check.incrementAndGet();
}
}
};
ts[i].start();
}
for (int j=0;j<TN;j++){
ts[j].join();
}
System.out.println("神奇的 AtomicIntegerFieldUpdater : " + candidate.score);
System.out.println("瞧一瞧正确结果 : " + check.get());
/*
结果 : 两个永远相等.
神奇的 AtomicIntegerFieldUpdater : 5954
瞧一瞧正确结果 : 5954
*/

}
}

SynchronousQueue

是非常特殊的等待队列,它的容量为0,任何一个对SynchronousQueue的写都要等待一个对应的读,反之亦然.
因此与其说是一个队列,不如说是一个数据交互通道.
我们就来研究一下它的实现方式

对SynchronousQueue来说,它将put()和take()两个截然不同的功能抽象为一个共通的方法
Transferer.transfer().字面上来说是数据传递的意思.方法签名是E transfer(E e, boolean timed, long nanos)
- 当参数e为非空时,表示当前操作传递给一个消费者
- 如果为空,则表示当前操作需要请求一个数据.
- timed 决定是否存在timeout时间
- nanos 决定了timeout的时长
如果返回值非空,则表示数据已经接受或正常提供,如果为空,则表示失败.(超时或中断)

SynchronousQueue 内部会维护一个县城等待队列,等待队列中会保存等待县城以及相关数据信息,比如,生产者将数据放入SynchronousQueue时
如果没有消费者接收,那么数据本身和线程对象都会打包在队列中等待.因为SynchronousQueue容量为0,没有数据可以正常存储.

Transferer.transfer函数是实现SynchronousQueue的核心,它的基本算法循环尝试三个算法之一.
1. 如果等待队列为空,或者队列中节点的类型和本次操作是一致的,那么当前操作压入等待队列.如,等待队列中的读线程,本次操作也是读,因此这两个读都需要等待(等待匹配的写操作)
2. 如果等待队列和本次操作是互补的(如,等待队列中是读,本次操作是写),那么久插入一个”完成”状态的节点,并让他”匹配”到一个等待节点上.接着弹出这两个节点,并且使得对应的两个线程继续执行.
3. 如果线程发现等待队列中的节点就是”完成”节点,那么帮助这个节点完成任务.其流程和步骤2一样.
源码分析

public class SynchronousQueueExample {

/*public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

static final class TransferStack<E> extends Transferer<E> {

//Modes for SNodes, ORed together in node fields
// Node represents an unfulfilled consumer
static final int REQUEST = 0;
// Node represents an unfulfilled producer
static final int DATA = 1;
// Node is fulfilling another unfulfilled DATA or REQUEST
static final int FULFILLING = 2;
// Returns true if m has fulfilling bit set.
static boolean isFulfilling(int m) {
return (m & FULFILLING) != 0;
}

// Node class for TransferStacks.
static final class SNode {
volatile SNode next; // next node in stack next节点
volatile SNode match; // the node matched to this 与当前匹配的节点
volatile Thread waiter; // to control park/unpark 当前线程
Object item; // data; or null for REQUESTs 数据内容
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.

SNode(Object item) {
this.item = item;
}

boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

//
// Tries to match node s to this node, if so, waking up thread.
// Fulfillers call tryMatch to identify their waiters.
// Waiters block until they have been matched.
//
// @param s the node to match
// @return true if successfully matched to s
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}


//Tries to cancel a wait by matching node to itself.
void tryCancel() {
//this 给定的对象
//matchOffset 对象内的偏移量
//null 期望值
//this 要设置的值
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}

boolean isCancelled() {
return match == this;
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

// The head (top) of the stack
volatile SNode head;

boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}

// Creates or resets fields of a node. Called only from transfer
// where the node to push on stack is lazily created and
// reused when possible to help reduce intervals between reads
// and CASes of head and to avoid surges of garbage when CASes
// to push nodes fail due to contention.
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}


E transfer(E e, boolean timed, long nanos) {
SNode s = null; // SNode 等待队列中的节点,内部封装了当前线程\next节点\匹配节点\数据内容等信息.
int mode = (e == null) ? REQUEST : DATA; //操作模式
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode 判断当前节点是否为空,或操作模式是否一样
if (timed && nanos <= 0) { // can't wait 存在timeout时间,并且时间没了.就不进行等待了.
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node 弹出取消节点
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) { //snode 是创建一个节点s,并置于队列头部,这个节点就代表当前线程.如果入队成功,就继续.
SNode m = awaitFulfill(s, timed, nanos); //自旋等待,时间到后挂起.直到有匹配操作出现将其唤醒.
if (m == s) { // wait was cancelled 等待被取消(表示已经读取到数据或者自己产生的数据被别的线程读取)
clean(s);
return null;
}
if ((h = head) != null && h.next == s) //尝试帮助对应的线程完成两个头部节点的出队操作,仅仅是友情帮助.
casHead(h, s.next); // help s's fulfiller 帮助s的完成者
return (E) ((mode == REQUEST) ? m.item : s.item); //返回读取或者写入的数据.
}
} else if (!isFulfilling(h.mode)) { // try to fulfill 是否处于完成状态,如果不是继续,
if (h.isCancelled()) // already cancelled 如果以前取消了
casHead(h, h.next); // pop and retry 弹出并重试
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //生成一个SNode元素,设置为fulfill模式并将其压入队列头部.
for (;;) { // loop until matched or waiters disappear 一直循环到匹配或者没有等待者了
SNode m = s.next; // m is s's match 设置m(原始的队列头部)是s的匹配节点
if (m == null) { // all waiters are gone 已经没有等待者了
casHead(s, null); // pop fulfill node 弹出完成节点
s = null; // use new node next time 下一次使用新节点.
break; // restart main loop 重新开始主循环
}
SNode mn = m.next;
if (m.tryMatch(s)) { //激活一个等待线程,并将m传递给那个线程.如果设置成功,则表示数据投递完成,将s和m两个节点弹出即可.
casHead(s, mn); // pop both s and m 弹出m和s
return (E) ((mode == REQUEST) ? m.item : s.item ); /返回读取或者写入的数据.
} else // lost match 匹配失败,表示其他线程帮我完成了操作,删除m节点就可以.因为这个节点的数据已经被投递了.不需要再次处理,然后继续循环这个小循环.进入下一个等待线程的匹配和数据投递,直到队列中没有等待线程为止.
s.casNext(m, mn); // help unlink 帮助删除节点.
}
}
} else { // help a fulfiller 在线程执行时,发现头部元素恰好是fulfill模式,就会帮助这个fufill节点尽快被执行.
SNode m = h.next; // m is h's match m是h的匹配线程
if (m == null) // waiter is gone 如果没有等待线程
casHead(h, null); // pop fulfilling node 弹出完成的节点
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match 尝试匹配
casHead(h, mn); // pop both h and m 弹出m和h
else // lost match 匹配失败
h.casNext(m, mn); // help unlink 删除节点.
}

//注意 这里完成后不会返回.因为最后这步else是帮助其他线程尽快投递他们的数据,而自己并没有完成对应的操作,因此线程进入这里后再次进入大循环体
}
}
}

}

}*/
}

结论:
从整个数据投递过程中,可以看出SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系,更重要的是他们彼此还会互相帮助
在一个线程内部可能帮助其他线程完成它们的工作,这种模式更大程度上减少饥饿的可能,提高系统整体的并行度.

并行模式与算法

单例模式

它是一种对象创建模式,用于产生一个对象的具体实例,确保系统中一个类只有一个实例.这样带来的好处主要有两点
1. 对于斌犯使用的对象,可以省略new操作花费的时间,这样对于那些重量级对象而言,可以节省非常可观的一笔系统开销.
2. 由于new操作的次数减少,因而对系统内存的使用频率也会降低,这将减轻GC压力,缩短GC停顿时间.

模式1

这个单例模式的性能很好没有任何锁操作,但是这种方式有一点明显不足的就是Singleton实例在什么时候创建是不受控制的.
对于静态成员s1,它会在类第一次初始化的时候被创建,但这个时刻并不一定是getInstance()方法第一次调用的时候.因为如果类中
还包含一个静态变量STATUS,那在任何地方引用这个STATUS都会导致getInstance被创建(任何对Singleton1方法或字段引用,
都会导致类初始化并创建s1实例)

模式2

如果大家不在乎这个小小的不足,这个单例模式是个不错的选择.但如果想精确控制s1创建时间,就需要一种新的方法,延迟加载的策略
它只会在第一次使用s1的时候创建对象.它的核心思想是:最初我们并不需要实例化s1,而当getInstance()方法被第一次调用时,创建
单例对象,为了防止对象被多次创建,我们不得不使用synchronized进行同步,这种实现的好处是,充分利用延迟加载,只有在真正需要时创建对象
但坏也很明显,在并发环境下加锁,竞争激烈的场合对性能可能产生一定的影响.

模式3–终极单例

首先getInstance()方法没有锁,在高并发下性能优越,其次只有在第一次调用getInstance()方法时才会创建Singleton1的实例.
因为这种方法巧妙的使用了内部类和类的初始化方式.内部类 SingletonHolder 被声明为private,使得我们不可能在外部访问并初始化它.
而我们只能在getInstance内部对SingletonHolder类进行初始化,利用虚拟机的类初始化机制创建单例.
示例

 public class Singleton {

//------------------模式1--------------------------
/*public static int STATUS=1;

//1. 构造函数必须为private,警告所有人,不能随便创建这个类的实例,避免该类错误的创建
private Singleton() {
System.out.println("normal Singleton is created..");
}

//3 实例化对象必须是private,保证Singleton1的安全,防止外部修改实例对象.同时又因为 第2点,所以要加static
private static Singleton s1 = new Singleton();

//2. 工厂方法getInstance必须是static的,因此instance也必须是static.
public static Singleton getInstance(){
return s1;
}*/


//------------------模式2--------------------------

/* private Singleton() {
System.out.println("Lazy Singleton is created..");
}
private static Singleton s1 = null;

public static synchronized Singleton getInstance(){
if (s1 == null)
return new Singleton();
return s1;
}*/

//------------------模式3--------------------------

private Singleton() {
System.out.println("Static innerClass Singleton is created..");
}

private static class SingletonHolder{
private static Singleton s1 = new Singleton();
}

public static Singleton getInstance(){
return SingletonHolder.s1;
}

}

关于类初始化参考以下文章就搞明白了.
http://blog.****.net/moreevan/article/details/6968718
http://www.cnblogs.com/mengdd/p/3562003.html
还有一种是双重检查模式,这里不推荐大家在这种模式上花费太多时间.

不变模式

在并行软件的开发过程中,同步操作似乎是不可避免的,当多线程对同一个对象进行读写操作时,为了保证数据一致性和正确性,有必要对对象进行同步.
而同步操作对系统的性能是有相当的损耗的,为了尽可能的取出这些同步操作,提高程序并行能力,可以使用一种不可变对象,依靠对象的不变性
可以确保其在没有同步操作的多线程环境中依然时钟保持内部状态一致性和正确性,这就是不变模式.
不变模式天生就是多线程友好的,它的核心思想是,一个对象一旦被创建,则它的状态将永远不会发生改变,所以没有一个线程可以修改其内部状态和数据,
同时其内部状态也绝不会自行发生改变,基于这些特征,对不变对象的多线程操作不需要进行同步控制.
需要注意,不变模式比只读模式具有更强一致性和不变性.
因此不变模式的主要使用场景需要满足2个条件
1. 当前对象创建后,其内部状态和数据不再发生任何变化
2. 对象需要被共享,被多线程频繁访问.
java对象实现不变(参考 NoChange 对象)
1. 取出所有setter方法及所有修改自身属性的方法.
2. 将所有属性设置为私有,并用final修饰,确保其不可修改.
3. 确保没有子类可以重载修改它的行为,即final class
4. 有一个可以创建完整对象的构造函数.
示例:

public final class NoChange {  //final修饰,无子类

private final long id; //final修饰,确保数据只能在对象构造时赋值1次.private修改,确保外部无法访问.
private final String name;
private final double price;

public NoChange(long id, String name, double price) {
this.id = id;
this.name = name;
this.price = price;
}

public String getName() {
return name;
}

public double getPrice() {
return price;
}

public long getId() {

return id;
}
}

在JDK中,不变模式非常广泛,典型的就是java.lang.String类,此外所有的元数据包装类,都是不变模式实现的.
java.lang.String
java.lang.Boolean
java.lang.Byte
java.lang.Character
java.lang.Double
java.lang.Float
java.lang.Integer
java.lang.Long
java.lang.Short

提示:

不变模式通过回避问题而不是解决问题的态度来处理多线程并发访问控制,不变对象是不需要同步操作的.
由于并发同步会产生对性能不良的影响,因此,在需求允许的情况下,不变模式可以提高系统的并发性能和并发量.

生产者-消费者模式

这个模式的经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案,在这个模式中,通常有两类线程,
即生产者线程和消费者线程,生产者线程负责提交用户需求,消费者线程负责从具体处理生产者提交的任务,
生产者和消费者之间则通过共享内存缓冲进行通信.
生产者-消费者模式中的内存缓冲区的主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者间的性能差.
生产者-消费者模式的核心组件是共享内存缓冲区,它作为生产者消费者间的通信桥梁,避免了两者直接通信,从而将生产者和消费者进行解耦
生产者不需要知道消费者存在,消费者也不需要知道生产者的存在.
同时,由于内存缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论谁快谁慢,都可以通过共享缓冲区得到缓解,确保系统稳定允许.

表:生产者-消费者模式主要角色

角色 作用
生产者 用于提交用户请求,提取用户任务,并装入内存缓冲区
消费者 在内存缓冲区提取并处理任务
内存缓冲区 缓存生产者提交的任务或数据,供消费者使用
任务 生产者向内存缓冲区提交的数据结构
Main 使用生产者和消费者的客户端

实例:求整数平方

BlockQueue充当了共享内存缓冲区,用于维护任务或数据队列(PCData对象),PCData对象表示一个生产任务,或者相关任务的数据.生产者对象和消费者对象均引用同一个BlockingQueue实例.
生产者负责创建PCData对象,并将它加入BlockingQueue中.消费者则冲BlockingQueue队列中获取PCData.
示例:

/**
* 生产者和消费者的任务元数据,也就是共享模型
*/

public class PCData {
private final int intData;

public PCData(int d) {
intData = d;
}

public PCData(String d){
intData = Integer.parseInt(d);
}

public int getIntData() {
return intData;
}

@Override
public String toString() {
return "PCData{" +
"intData=" + intData +
'}';
}
}
/**
* 生产者
* 构造PCData对象,并放入BlockingQueue(内存缓冲区)中.
*/

public class Producter implements Runnable{

private volatile boolean isRunning = true;
private BlockingDeque<PCData> queue; //内存缓冲区,通过构造时外部引入,保证和消费者用的是同样的内存缓冲区.
private static AtomicInteger count = new AtomicInteger(); //总数,原子操作.
private static final int SLEEPTIME = 1000;

public Producter(BlockingDeque<PCData> queue) {
this.queue = queue;
}

@Override
public void run() {
PCData data = null;
Random random = new Random();
System.out.println("start producter .."+Thread.currentThread().getId());
try {
while (isRunning){
Thread.sleep(random.nextInt(SLEEPTIME)); //模拟执行过程
data = new PCData(count.incrementAndGet()); //现获取当前值再+1
System.out.println(data + " is put into Queue");
//提交数据到缓冲队列中.设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败
if (!queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("failed to put data "+data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
//因为BlockingQueue的offer操作上的锁是重入锁中的可以中断的锁,所以如果有异常,就中断,防止死锁.
Thread.currentThread().interrupt();
}
}

public void stop(){
isRunning = false;
}
}
/**
消费者
从BlockingQueue中获取PCData对象,并进行处理.
*/

public class Consumer implements Runnable {

private BlockingDeque<PCData> queue;
private static final int SLEEPTIME = 1000;

//同理,和Producter共用同一个BlockingQueue,保证存/取都在一个缓冲区
public Consumer(BlockingDeque<PCData> queue) {
this.queue = queue;
}

@Override
public void run() {
System.out.println("start Consumer id : "+Thread.currentThread().getId());
Random r = new Random();
try {
while (true){
PCData data = queue.take();
if (null != data){
int re = data.getIntData() * data.getIntData();
System.out.println(MessageFormat.format("{0} * {0} = {1}",data.getIntData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}

}
}
public class ConsumerProducter {

public static void main(String[] a) throws InterruptedException {
//建立共享缓冲区
BlockingDeque<PCData> queue = new LinkedBlockingDeque<>(10);
//建立生产者
Producter producter1 = new Producter(queue);
Producter producter2 = new Producter(queue);
Producter producter3 = new Producter(queue);
Producter producter4 = new Producter(queue);
Producter producter5 = new Producter(queue);
//建立消费者
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
//建立线程池
ExecutorService es = Executors.newCachedThreadPool();
//运行生产者
es.execute(producter1);
es.execute(producter2);
es.execute(producter3);
es.execute(producter4);
es.execute(producter5);
//运行消费者
es.execute(consumer1);
es.execute(consumer2);
es.execute(consumer3);
//运行时间
Thread.sleep(1000 * 10);
//停止生产者
producter1.stop();
producter2.stop();
producter3.stop();
producter4.stop();
producter5.stop();
//停止生产者后,预留时间给消费者执行
Thread.sleep(1000 * 5);
System.out.println("关闭线程池...");
//关闭线程池
es.shutdown();
}
}

总结:
生产者-消费者模式很好地对生产者线程和消费者线程解耦,优化了系统整体结构,同时,由于缓冲区的作用,
允许生产者和消费者线程存在执行上的性能差异,从一定程度上缓解了性能瓶颈对系统性能的影响.

高性能生产者消费者:无锁实现

BlockingQueue实现生产者-消费者是一个不错的选择,它很自然地实现了作为生产者和消费者的内存缓冲区.
但是,BlockingQueue并不是一个高性能的实现,它完全使用锁和阻塞等待来实现线程间的同步.在高并发场合,它的性能并不是特别优越.
就像我们之前提过的ConcurrentLinkedQueue是一个高性能的队列,但是BlockingQueue只是为了方便数据共享.
而ConcurrentLinkedQueue的秘诀就是大量使用了无锁的CAS操作,同理,如果我们使用了CAS来实现生产者-消费者模式,也同样可以获得可观的性能提升.

无锁缓存框架:Disruptor

Disruptor框架是由于LMAX公司开发的一款高效的无锁内存队列,它使用无锁的方式实现了一个环形队列,非常适合生产者-消费者模式.
在Disruptor中,使用了环形队列来代替普通的线性队列,这个环形队列内部实现为一个普通的数组.
对于一般的队列,势必要提供队列头部head和尾部tail两个指针,用于出队和入队,这样无疑就增加了线程协作的负责度.
但如果队列的环形的,则只需要提供一个当前队列的位置cursor,利用这个cursor既可以出队也可以入队.
由于是环形队列的缘故,队列的总大小必须事先指定,不能动态扩展.
为了能够快速从一个序列sequence对应数组的实际位置(每次有元素入队,序列就加1),Disruptor要求我们必须将数组的大小设置为2的整数次方.
这样通过sequence&(queueSize-1)就能立即定位到实际的元素位置index.这个要比取余(%)操作快得多.
[置顶]        高并发程序设计入门
如图所示,显示了RingBuffer的结构,生产者向缓冲区中写入数据,而消费者从中读取数据,生产者写入数据使用CAS操作,消费者读取数据时,
为了防止多个消费者处理同一个数据,也使用CAS操作进行保护.
这种固定大小的环形队列的另一个好处就是可以做到完全内存复用.在系统运行过程中,不会有新的空间需要分配或者老的空间需要回收,因此,可以大大减少系统分配空间以及回收空间的额外开销.

用Disruptor实现生产者消费者案例

 public class DisruptorTest {

/**
* 生产者消费者主函数
*/

public static void main(String[] a ) throws InterruptedException {

Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
//设置缓冲区大小,一定要是2的整数次幂
int bufferSize = 1024;
//创建disruptor,它封装了整个Disruptor的使用,提供了一些便捷的API.
Disruptor<DisruptorPCData> disruptor = new Disruptor<DisruptorPCData>(
factory,
bufferSize,
executor,
ProducerType.MULTI,
new BlockingWaitStrategy()
);
//设置消费者,系统会将每一个消费者实例映射到一个系统中,也就是提供4个消费者线程.
disruptor.handleEventsWithWorkerPool(new DisruptorConsumer(),
new DisruptorConsumer(),
new DisruptorConsumer(),
new DisruptorConsumer());
//启动并初始化disruptor系统.
disruptor.start();
RingBuffer<DisruptorPCData> ringBuffer = disruptor.getRingBuffer();
//创建生产者
DisruptorProductor productor = new DisruptorProductor(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
//生产者不断向缓冲区中存入数据.
int i=0;
for (long l=0;i < 20;l++){
byteBuffer.putLong(0,l);
productor.pushData(byteBuffer);
Thread.sleep(new Random().nextInt(500));
//System.out.println("add data "+l);
i++;
}
}

//模拟Disruptor的序列sequence定位 和生产者消费者无关
static void testSequence(){

//模拟序列,每次入队就+1;
int sequence = 0;
//队列总大小2的幂次方,这里为256
int QueueSize = (int) Math.pow(2,8);
//模拟环形队列
String[] ringBuffer = new String[QueueSize];
//模拟put的元素大小
int putNum1 = new Random().nextInt(QueueSize/2);
System.out.println("即将插入 " + putNum1 + " 个元素");
//put到环形队列
for (int i = 0; i<putNum1; i++) {
ringBuffer[i] = "st_DATA_" + i;
sequence ++;
}
int putNum2 = new Random().nextInt(QueueSize/2);
System.out.println("即将插入 " + putNum2 + " 个元素");
//put到环形队列
for (int i = putNum1; i < (putNum2 + putNum1); i++){
ringBuffer[i] = "st_DATA_"+i;
sequence++;
}
System.out.println("QueueSize : "+ringBuffer.length);
System.out.println("QueueSize 二进制 : "+Integer.toBinaryString(QueueSize));//QueueSize是2的整数次幂,二进制必然是10\100\1000等形式
System.out.println("QueueSize-1 二进制 : "+Integer.toBinaryString(QueueSize-1)); //那么QueueSize-1的二进制则全是1,因此可以将sequence限定在QueueSize-1范围内.
System.out.println("sequence : "+sequence);
System.out.println("sequence 二进制 : "+Integer.toBinaryString(sequence));
System.out.println("sequence & (QueueSize-1)二进制 : "+Integer.toBinaryString(sequence & (QueueSize-1)));
System.out.println("sequence & (QueueSize-1) : "+(sequence & (QueueSize-1)));//位"与"操作,sequence相与2的整数幂-1(全部为1)的总长度,目的是定位到实际元素的index,
/**
* 执行结果:
即将插入 23 个元素
即将插入 66 个元素
QueueSize : 256
QueueSize 二进制 : 100000000
QueueSize-1 二进制 : 11111111
sequence : 89
sequence 二进制 : 1011001
sequence & (QueueSize-1)二进制 : 1011001
sequence & (QueueSize-1) : 89
*/

}

}

public class DisruptorPCData {

private long value;

public void set(long value){
this.value = value;
}
public long get(){
return value;
}
}

/**
PCData工厂类
在Disruptor系统初始化时,构造所有缓冲区中的对象实例,之前说过Disruptor会预先分配空间.

*/

public class PCDataFactory implements EventFactory<DisruptorPCData> {
@Override
public DisruptorPCData newInstance() {
return new DisruptorPCData();
}
}

/**
消费者实现WorkHandler,来自Dispruptor框架
消费者的作用的读取数据进行的处理,这里,数据的读取已经由Disruptor进行封装,onEvent是框架的回调函数.
因此,这里只需要简单地进行数据处理即可.
*/

public class DisruptorConsumer implements WorkHandler<DisruptorPCData>{
@Override
public void onEvent(DisruptorPCData disruptorPCData) throws Exception {
long result = disruptorPCData.get() * disruptorPCData.get();
System.out.println(Thread.currentThread().getId() + " PCData :"+result);
}
}

/**
生产者
*/

public class DisruptorProductor {

//RingBuffer环形缓冲区
private final RingBuffer<DisruptorPCData> ringBuffer;


public DisruptorProductor(RingBuffer<DisruptorPCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}

/**
* 将传入的ByteBuffer中的数据提取出来,并装载到环形缓冲区(RingBuffer)中
* @param byteBuffer 可以包装任何数据类型,这里用来存储long整数.
*/

public void pushData(ByteBuffer byteBuffer){
//得到环形索引中下一个可用的序列号
long sequence = ringBuffer.next();

System.out.println(" pushData "+byteBuffer.getLong(0));

try {
//通过可用序列号获得一个空闲可用的PCData.
DisruptorPCData disruptorPCData = ringBuffer.get(sequence);
//并将PCData的数据设为期望值.这个值最终会传递给消费者.
disruptorPCData.set(byteBuffer.getLong(0));
} finally {
//进行数据发布,只有发布的数据才会真正被消费者看见.
ringBuffer.publish(sequence);
}
}
}

提高消费者的响应时间:选择合适的策略

Disruptor为我们提供了几个策略,这些策略由WaitStrategy接口进行封装,
1. BlockingWaitStrategy:默认策略,和BlockingQueue是非常类似的,他们都使用了Lock(锁)和Condition(条件)进行数据监控和线程唤醒.
因为涉及到线程的切换,BlockingWaitStrategy策略是最省CPU的,但在高并发下性能表现是最差的一种等待策略.
2. SleepingWaitStrategy,这个策略也是对CPU非常保守的.它会在循环中不断等待数据.它会先进行自选等待,如果不成功,则使用Thread.yield()让出CPU,
并最终使用LockSupport.parkNanos(1)进行线程休眠,已确保不占用太多的CPU数据,因此可能产生比较高的平均延时.适用于对延时要求不是特别高的场合,
好处是他对生产者线程的影响最小.典型的场景是异步日志.
3. YieldWaitStrategy,用用于低延时场合,消费者线程会不断循环监控缓冲区编号,在循环内部,它会使用Thread.yield()让出CPU给别的线程执行时间.
如果需要高性能系统,并且对延迟有较高要求,则可以使用这种策略.这种策略相当于消费者线程变成了一个内部执行Thread.yield()的死循环,
因此最好有多于消费者线程的逻辑CPU(“双核四线程”中的四线程),否则整个应用会受到影响.
4. BusySpinWaitStrategy,疯狂等待策略,他就是一个死循环,消费者线程会尽最大努力监控缓冲区的变化.它会吃掉CPU所有资源.
所以只在非常苛刻的场合使用它.因为这个策略等同于开一个死循环监控.因此,物理CPU数量必须大于消费者线程数,因为如果是逻辑核,
那么另外一个逻辑核必然会受到这种超密集计算的影响而不能正常工作.

CPU Cache的优化:解决伪共享问题.

除了使用CAS和提供不同等待策略来提高系统吞吐量外,Disruptor还尝试解决CPU缓存的伪共享问题.
简单说下什么是伪共享
我们知道,为了提高CPU的速度,CPU有一个高速缓存Cache.在高速缓存中,读写数据的最小单位是缓存行(Cache Line),它是主内存(memory)复制到
缓存(Cache)的最小单位,一般为32~128byte(字节).
假如两个变量存放在同一个缓存行中,在多线程访问中,可能互相影响彼此的性能.如图,运行在CPU1上的线程更新了X,那么CPU2伤的缓存行就会失效,
同一行的Y即使没有修改,也会变成无效,导致Cache无法命中.这无疑是一个潜在的性能杀手,如果CPU经常不能命中缓存,那么系统的吞吐量会急剧下降.
那应该怎么破呢?
[置顶]        高并发程序设计入门
一种可行的做法就是在X变量前后空间都占据一定的位置(暂叫padding,用来填充Cache Line).这样当内存被读入缓存中时,这个缓存行中,就只有X一个变量实际是有效的.
因此就不会发生如上的问题.
注意,由于各版本JDK内部实现不一致,在某些JDK版本中(如JDK8),会自动优化不适用的字段,这将导致这种padding的伪共享解决方案失效.

public class FalseSharing implements Runnable {
//线程数量
public final static int NUM_THREADS = 2;
//业务量
public final static long ITERATIONS = 500L * 1000L * 1000L;
//每个线程的数组索引
private final int arrayIndex;
//存放优化后的long,且长度和线程数相等,让每个线程都会访问自己对应的longs中的元素
private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
static {
for (int i = 0; i < longs.length; i++) {
longs[i] = new VolatileLong();
System.out.println("1. 初始化VolatileLong数组" + i);
}
}

public FalseSharing(int arrayIndex) {
System.out.println("2. 线程 "+Thread.currentThread().getName() + " 设置数组索引:" + arrayIndex);
this.arrayIndex = arrayIndex;
}

private static void runTest() throws InterruptedException {
//模拟两个线程
Thread[] threads = new Thread[NUM_THREADS];
for (int i = 0; i < threads.length; i++) {
//设置每个线程的数组索引
threads[i] = new Thread(new FalseSharing(i),"myThread"+i);
}
for (Thread t : threads)
t.start();
for (Thread t : threads)
t.join();
}

@Override
public void run() {
System.out.println("3. "+Thread.currentThread().getName()+"开始执行...");
long i = ITERATIONS +1;
//执行业务量
while (0 != --i)
//访问线程自己对应的longs元素
longs[arrayIndex].value = i;
}

//经过填充的long
public final static class VolatileLong{
//实际有用的变量,并且是安全的,
public volatile long value = 0L;
//填充Cache Line,仅用于将第一个VolatileLong.value和第二个VolatileLong.value分开,防止进入同一个Cache Line.
public long p1,p2,p3,p4,p5,p6,p7;
}

public static void main(String[] args) throws InterruptedException {
final long start = System.currentTimeMillis();
runTest();
System.out.println("4. 执行结束,耗时(ms):"+(System.currentTimeMillis() -start));
}
}

Future模式

它的核心是异步调用,如果我们不着急要结果,可以让被调用者立刻返回,随后让它在后台慢慢处理这个请求,对于调用者来说则可以处理其他任务,在真正需要数据的场合再去尝试获得需要的结果.
[置顶]        高并发程序设计入门
[置顶]        高并发程序设计入门

Future模式的主要角色

参与者 作用
Main 系统启动,调用Client发出请求
Client 返回Data对象,立即返回FutureData,并开始ClientThread线程装配RealData
Data 返回数据的接口
FutureData Future数据,构造很快,但是一个虚拟的数据,需要装配RealData
FutureRealData 真实的数据,其构造比较慢

[置顶]        高并发程序设计入门

Future模式的简单实现

/**
* 客户端程序
*/

public class FutureClient {
/**
* 获取结果数据
* @param queryStr 处理的数据
* @return
*/

public FutureBaseData request(String queryStr){
final FutureData fd = new FutureData();
//单独开启构造真实数据的线程
new Thread(){
@Override
public void run() {
//较长的构造时间
FutureRealData rd = new FutureRealData(queryStr);
//构造完后,为FutureDelta设置真实数据
fd.setRealData(rd);
}
}.start();
//立即返回包装数据.
return fd;
}

public static void main(String[] args) {
long t = System.currentTimeMillis();
FutureClient client = new FutureClient();
//这里会立即返回,因为得到的是FutureData而不是RealData
long s = System.currentTimeMillis();
System.out.println("开始请求.");
FutureBaseData data = client.request("tName");
System.out.println("请求完毕." + (System.currentTimeMillis()-s));
s = System.currentTimeMillis();
System.out.println("开始日志.");
try {
//模拟处理其他业务,同时,在这个过程中,RealData被构造.充分利用了等待时间.
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("日志处理完毕." + (System.currentTimeMillis()-s));

System.out.println("总时长"+(System.currentTimeMillis()-t)+",请求结果 :"+data.getResult());

}

}

/**
* 核心接口
* 客户端希望获取的数据
* 有两个重要实现:
* 1. FutureRealData 真实的数据,这就是我们要最终获得的
* 2. FutureData 立即返回的数据,它是提取RealData的一个"凭据".
*/

public interface FutureBaseData {
/**
* 获取真实数据
* @return
*/

public String getResult();
}

/**
* 即刻返回的对象,包装了RealData,或者说是RealData的虚拟实现
* 当使用getResult()时,如果RealData数据没有准备好,那么程序会阻塞,
* 等待RealData准备好并设置到FutureData中,才能最终响应返回真实数据.
*
* FutureData是Future模式的关键,它实际上是真实数据的代理,封装了获取RealData的等待过程.
*/

public class FutureData implements FutureBaseData {
//FutureData是RealData的包装
private FutureRealData realData;
//真实数据是否准备好
private boolean isReady = false;
//设置真实数据
public synchronized void setRealData(FutureRealData realData) {
if (isReady)
return;
this.realData = realData;
isReady = true;
//如果真实数据设置完,就通知在getResult()上等待的线程.
notifyAll();

}

@Override
//过去真实数据
public synchronized String getResult() {
//如果没准备好就等待,直到真实数据调用setRealData,唤醒这里的等待.
if (!isReady) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//否则返回真实的数据
return realData.getResult();
}
}

/**
* 最终需要使用的数据模型
* 它的构造很慢.
*/

public class FutureRealData implements FutureBaseData {
private final String result;

public FutureRealData(String result) {
long s = System.currentTimeMillis();
System.out.println("开始构造真实数据");
//开始构造一个很慢的数据
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(result);
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("构造完毕." + (System.currentTimeMillis()-s));
this.result = sb.toString();
}

@Override
public String getResult() {
return this.result;
}
}

JDK中的Future模式

JDK为Future提供的一些简单控制功能:

 //取消任务
boolean cancel(boolean mayInterruptIfRunning);
//是否已经取消
boolean isCancelled();
//是否已经完成
boolean isDone();
//取得返回对象
V get() throws InterruptedException, ExecutionException;
//取得返回对象,并设置超时时间
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

简单实现:

public class JDKFutureMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long t = System.currentTimeMillis();
//构造FutureTask,表示这个任务是所有返回值的,并通过传入的Callable接口,告诉FutureTask我们需要的数据是如何产生的.
FutureTask<String> futureTask = new FutureTask<String>(new JDKFutureRealData("a"));
//创建一个任务的线程池
ExecutorService es = Executors.newSingleThreadExecutor();
//执行FutureTask,相当于上例的client.request
//在这里开启线程进行RealData的call()执行
long s = System.currentTimeMillis();
System.out.println("开始请求.");
//立即返回
es.submit(futureTask);
System.out.println("请求完毕." + (System.currentTimeMillis()-s));
System.out.println("真实数据是否构造完毕:" + futureTask.isDone());
s = System.currentTimeMillis();

//做一些别的操作,日日志操作
Thread.sleep(2000);

System.out.println("日志处理完毕." + (System.currentTimeMillis()-s));

System.out.println("真实数据是否构造完毕:" + futureTask.isDone());
if (futureTask.isDone())
es.shutdown();
//通过futureTask.get()得到实际数据
System.out.println("总时长"+(System.currentTimeMillis()-t)+",请求结果 :"+futureTask.get());

}
}

public class JDKFutureRealData implements Callable<String> {

private String para;

public JDKFutureRealData(String para) {
this.para = para;
}

//返回真实数据
@Override
public String call() throws Exception {
long s = System.currentTimeMillis();
System.out.println("开始构造真实数据");
//开始构造一个很慢的数据
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(para);
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("构造完毕." + (System.currentTimeMillis()-s));
return sb.toString();
}
}

并行流水线

虽然并发算法可以充分发挥多核CPU的性能,但并非所有的计算都可以改造成并发的形式,简单来说,执行过程中有数据相关性的运算都无法完美并行化.
如果完美计算(B+C)*B/2,那么这个运算过程是无法并行的,原因是B+C没有执行完,则永远算不出(B+C)*B,这就是数据相关性.
那有什么补救措施呢,就是借鉴日常生产中的流水线思想.
我们将计算过程拆分成三步
P1:A=B+C
P2:D+A+B
P3:D=D/2
P1接受B和C的值求和,并将结果给P2.P2乘积后给P3.P3将结果除以2得到最终值.

public class AssemblyLine {

public static void main(String[] args) throws InterruptedException {

AssemblyLinePlus p = new AssemblyLinePlus();
AssemblyLineMultiply m = new AssemblyLineMultiply();
AssemblyLineDiv d = new AssemblyLineDiv();
new Thread(p).start();
new Thread(m).start();
new Thread(d).start();
for (int i = 1; i < 101; i++) {
for (int j = 1; j < 101; j++) {
AssemblyLineMsg msg = new AssemblyLineMsg();
msg.i = i;
msg.j = j;
msg.orgStr = "("+i+"+"+j+")*"+i+"/2";
//将数据提交给加法线程,开启流水线计算.
p.blockingQueue.put(msg);
}
}

}
}
/**
* 携带结果进行信息交换的载体
*/

public class AssemblyLineMsg {
public double i;
public double j;
public String orgStr;
}
/**
* 加法
*/

public class AssemblyLinePlus implements Runnable{
public static BlockingQueue<AssemblyLineMsg> blockingQueue = new LinkedBlockingQueue<>();
@Override
public void run() {
while (true)
try {
AssemblyLineMsg receive = blockingQueue.take();
receive.j = receive.i + receive.j;//receive.i=B
AssemblyLineMultiply.blockingQueue.put(receive);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 乘法
*/

public class AssemblyLineMultiply implements Runnable{
public static BlockingQueue<AssemblyLineMsg> blockingQueue = new LinkedBlockingQueue<>();
@Override
public void run() {
while (true)
try {
AssemblyLineMsg receive = blockingQueue.take();
receive.i = receive.i * receive.j;//receive.j=(B+C) , receive.i=B
AssemblyLineDiv.blockingQueue.put(receive);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 减法
*/

public class AssemblyLineDiv implements Runnable{
public static BlockingQueue<AssemblyLineMsg> blockingQueue = new LinkedBlockingQueue<>();
@Override
public void run() {
while (true)
try {
AssemblyLineMsg receive = blockingQueue.take();
//receive.i=(B+C)*B
System.out.println(receive.orgStr + " = "+(receive.i / 2));
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

并行搜索

给定一个无序数组,查找满足条件的元素,我们需要增加一些线程间的通信机制,使各线程可以有效运行,一个简单的策略就是将数据按照期望的线程数进行分割.
把数组分成N(线程数)个,每个线程各自独立搜索,当其中有一个线程找到数据后,立即返回结果即可.

/**
* 并行搜索
*/

public class ConcurrentSearch {

//定义线程个数
public static final int THREADNUM=2;
//需要查找的数组
static Integer[] arr;
//创建不固定的线程池
static ExecutorService pool = Executors.newCachedThreadPool();
//整型CAS操作,并且线程间共享
static AtomicInteger result = new AtomicInteger(-1);

/**
* 搜索,都是CAS操作,所以不需要上锁
* @param searchValue 需要查找的元素
* @param beginPos
* @param endPos
* @return
*/

public static int search(int searchValue,int beginPos,int endPos){
System.out.println(beginPos+"----"+endPos);
int i=0;
for (i=beginPos;i<endPos;i++){
if (result.get() > 0){
return result.get();
}
if (arr[i] == searchValue) {
if (!result.compareAndSet(-1, i)) {
return result.get();
}
return i;
}
}
return -1;
}

/**
* 有返回值的线程
*/

public static class SearchTask implements Callable<Integer>{

int begin,end,sv;

public SearchTask(int begin, int end, int sv) {
this.begin = begin;
this.end = end;
this.sv = sv;
}

@Override
public Integer call() throws Exception {
return search(sv,begin,end);
}
}

/**
* 搜索前执行的方法,目的是
* 1. 根据线程分割数组块
* 2. 放入线程池并执行
* 3. 获取执行结果
* @param searchValue
* @return
* @throws ExecutionException
* @throws InterruptedException
*/

public static int preSearch(int searchValue) throws ExecutionException, InterruptedException {
//计算一个搜索块的大小
int subArrSize = arr.length/THREADNUM+1;
//有返回值的线程模式
List<Future<Integer>> futures = new ArrayList<>();
//循环搜索块次数,i+=subArrSize表示跳到第二个搜索块的起始位置
for (int i = 0; i < arr.length; i+=subArrSize) {
//每个搜索块的结尾元素下标
int end = i + subArrSize;
//如果最后一个搜索块的结尾下标都大于需要搜索的数组,那么就设为数组的末尾下标就OK了.
if (end >=arr.length)
end = arr.length;
//提交到线程池并执行,然后将结果放到Future中.
futures.add(pool.submit(new SearchTask(i,end,searchValue)));
}
//循环获取结果
for (Future<Integer> fu : futures){
//如果找到了就返回
if (fu.get()>0)
return fu.get();
}

return -1;
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
long st = System.currentTimeMillis();
List<Integer> s = new ArrayList<>();
for (int i = 0; i < 50000000; i++) {
s.add(new Random().nextInt(200));
}
arr = s.toArray(new Integer[s.size()]);
System.out.println("构造数组,耗时"+(System.currentTimeMillis() - st));
st = System.currentTimeMillis();
int r = preSearch(199);
if (r > 0)
System.out.println("搜索结果为:"+r+",取值:"+arr[r] +",耗时"+(System.currentTimeMillis() - st));
else
System.out.println("搜索结果为:"+r+",耗时"+(System.currentTimeMillis() - st));

pool.shutdown();
}

}

并行排序

分离数据相关性:奇偶交换排序

改进的插入排序:希尔排序

插入排序的基本思想是,一个未排序的数组,可以分为两部分,前半部分是已经排好序的,后半部分是未排序的,排序时只需要在未排序的部分中选一个元素,将其插入到有序的数组中即可.
最终未排序的部分原来越少,直到为0.
插入排序是很难并行化的,因为这一次的插入依赖上一次得到的有序序列.
希尔排序可以理解为插入排序的扩展,它将整个数组根据h间隔分割,如果h为3,就可以将数组分割成3个元素为一个子数组,每次排序就交换间隔为h的两个元素.
在每次排序完成后,递减h,进行下一轮更精确的排序,直到h为1,此时等价于插入排序.
希尔排序的优点是即使一个较小的元素在数组末尾,因为每次元素移动都以h为间隔,所以较小的那个元素可以在很少的交换次数下,被置换到最接近最终位置的地方.

/**
* 并行排序
*/

public class ConcurrentSort {
static int[] arr = new int[]{72,23,56,69,9,345,67,32,12,4,15,6,3,1,8};
public static void main(String[] args) throws InterruptedException {

// sort1(arr);
// sort2(arr);
// preOddEvenSort();
// insertSort();
// shellSort();
preShellSort();
System.out.println(Arrays.toString(arr));
}

/**
* 希尔排序
* 参考:http://www.cnblogs.com/jingmoxukong/p/4303279.html
* 算法最开始以一定的步长进行排序。然后会继续以一定步长进行排序,最终算法以步长为1进行排序。当步长为1时,算法变为插入排序,这就保证了数据一定会被排序。
*/

public static void shellSort(){
//步长:举例说明 假设步长为5,那么第0,5,15下标的元素是一组,进行插入排序.
int h = arr.length/2;

System.out.println(h);
while (h > 0){
for (int i = h; i < arr.length; i++) {
//和上一个步长的元素比较.如果当前步长比上一个步长下标的元素小,就交换,和插入排序是一样的,只不过是按步长进行了分组
if (arr[i] < arr[i - h]){
//拿出当前元素
int tmp = arr[i];
//找到上一个步长元素下标
int j = i - h;
while (j >=0 && arr[j] > tmp){
//按步长向后挪
arr[j + h] = arr[j];
j -= h;
}
//放到合适的位置插入
arr[j + h] = tmp;
}
}
//缩小步长
h = h/2;
}
}


/**
* 插入排序 串行
*/

public static void insertSort(){
/**
* i : 当前需要和之前排序号的元素比较的元素下标
* j : 以排序号的之前的所有元素下标
* key : 当前值
*/

int i,j,key;
//这里认为第一个元素是已经排好序的,也就是下标为0的元素,所以i=1
for (i=1;i<arr.length;i++){
//取出当前要比较的元素
key = arr[i];
//j初始设置为当前元素的前一个元素
j = i -1;
//原理是:将所有之前排好序的元素和当前比较,如果大于当前数据,就往后挪一个位置
while (j >=0 && arr[j] > key){
arr[j+1] = arr[j];//向后挪一位
j--;
}
//直到当前元素比之前拍好序的那一位大,就插入.
arr[j+1] = key;
}


}

/**
* 冒泡排序 串行
*/

public static void sort1(int[] arr){
for (int i = 0; i < arr.length; i++) {
for (int j = 0; j < i; j++) {
//如果比较过的元素 大于 当前元素,就和当前元素交换.
if (arr[j] > arr[i]){
//把当前的元素拿出来
int t = arr[i];
//当前元素放入大的
arr[i] = arr[j];
//比较的元素放入小的
arr[j] = t;
}
}
}
}

/**
* 奇偶交换 串行
*/

public static void sort2(int[] arr){
//是否进行了交换
boolean isSort=true;
//0为偶排序
int start = 0;
//如果上一次发生了交换,或者当前要进行奇交换,就进入
//意思是,如果偶交换没有进行交换,也需要进入奇交换再比较一次,如果奇交换也没有进行交换,那么就说明比较完了.可以结束了.
//因为最后必然是isSort = false,并且start = 0;
while (start==1 || isSort){
//先设为没发生交换
isSort = false;
//从偶交换开始, i+=2 表示 奇偶交换.
for (int i = start; i < arr.length-1; i+=2) {
//和相邻的数据比较
if (arr[i] > arr[i+1]){
int tmp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = tmp;
isSort = true;
}
}
//如果是偶交换,下一次就是奇交换
if (start == 0)
start = 1;
//否则下一次就是偶交换
else
start = 0;
}
}


/**
* 奇偶交换 并行
* ==============奇偶交换并行=================开始========
*/


static boolean isSort = true;

public static synchronized boolean isSort() {
return isSort;
}
public static synchronized void setIsSort(boolean isSort) {
ConcurrentSort.isSort = isSort;
}

public static class OddEvenSortTask implements Runnable{
int i;
CountDownLatch latch;

public OddEvenSortTask(int i, CountDownLatch latch) {
this.i = i;
this.latch = latch;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getId());
if (arr[i] > arr[i+1]){
int tmp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = tmp;
setIsSort(true);
}
latch.countDown();
}
}

public static void preOddEvenSort() throws InterruptedException {
int c = 1;
ExecutorService pool = Executors.newCachedThreadPool();
//0为偶排序,
int start = 0;
//如果上一次发生了交换,或者当前要进行奇交换,就进入
//意思是,如果偶交换没有进行交换,也需要进入奇交换再比较一次,如果奇交换也没有进行交换,那么就说明比较完了.可以结束了.因为最后必然是isSort = false,并且start = 0;
//先进行偶交换(以奇交换结束),因为初始isSort为true
while (isSort() || start == 1 ){
//先设为没发生交换查询
setIsSort(false);
//偶交换的次数,当start为1时,只有length/2-1次
CountDownLatch latch = new CountDownLatch(arr.length/2 - (arr.length%2==0?start:0));
System.out.println("第"+(c++)+"次,"+(start == 0 ? "偶交换":"奇交换")+",长度为"+latch.getCount());
for (int i = start; i < arr.length-1; i+=2) {
pool.submit(new OddEvenSortTask(i,latch));
}
//等待所有线程执行完
latch.await();
if (start == 0)
start =1;
else
start = 0;
}
}


/**
* ===================奇偶交换并行======结束===========
*/



/**
* 希尔排序 并行
* ==============希尔排序并行=================开始========
*/

public static class ShellSortTask implements Runnable{
//当前比较下标
int i = 0;
//比较的步长下标
int h = 0;
//子组个数
CountDownLatch sc;

public ShellSortTask(int i, int h, CountDownLatch sc) {
this.i = i;
this.h = h;
this.sc = sc;
}

@Override
public void run() {
// System.out.println(Thread.currentThread().getId());
//这里不再多说,按步长向后挪
if (arr[i] < arr[i - h]){
int tmp = arr[i];
int j = i - h;
while (j >=0 && arr[j] > tmp){
arr[j + h] = arr[j];
j -= h;
}
arr[j+h] = tmp;
}
sc.countDown();
}
}

public static void preShellSort() throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
int h = arr.length /2;
CountDownLatch pc = null;
while (h > 0){
System.out.println("步长"+h);
if (h >= 3)
//因为是从步长开始循环到数组长度(参考for循环),所以可以认为,计数器为长度减去步长.
pc = new CountDownLatch(arr.length - h);
System.out.println("排序一组需要的次数:"+pc.getCount());
for (int i = h; i < arr.length; i++) {
//如果步长小于3,就按单线程排序完.
if (h >= 3)
pool.execute(new ShellSortTask(i,h,pc));
else {
if (arr[i] < arr[i - h]){
int tmp = arr[i];
int j = i - h;
while (j >=0 && arr[j] > tmp){
arr[j + h] = arr[j];
j -= h;
}
arr[j+h] = tmp;
}
}
}
pc.await();
h = h/2;
}
pool.shutdown();


}

/**
* ===================希尔排序并行======结束===========
*/




}

并行算法:矩阵乘法

这里我就不研究了,大家可以使用并参考 jMatrices 开源软件作为矩阵计算的工具.

准备好再通知我,网络NIO

后期补充

读完了再通知我,AIO

后期补充

UPDATE LOG
2016年03月18日 最近项目需求紧,自己也偷懒,更新太慢,望各位见谅。
2016年04月14日 忙着骑摩托…放松下OK吗?or 借口!!!