Java并发基础<2>

时间:2021-06-05 19:41:36

Volatile 变量的声明


11/21/2016 7:21:53 PM From 《高并发程序设计》

volatile 保证一个线程修改变量之后,另一个线程能看到这个改动。

  声明一个volatile变量相当于告诉虚拟机,这个变量的值极有可能被修改,为了确保该变量能被应用程序的所有线程访问看到,需要用volatile去声明,在本人所学的Linux C中,定义的volatile变量意味着不对这个变量去进行编译优化,即每次都是从内存中读取该数据,而不是从被优化存储到cache或者寄存器中读取。我想意思大概是相近的吧,也许就是一个意思呢,不知道。

 /**
* Created by loveqh on 2016/11/19.
*/
public class JoinMain {
public volatile static int i = 0;

public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (i = 0; i < 100000; i++) ;
}
});
t.start();
t.join();
System.out.println(i);
}
}

输出为

10000

  虽然volatile对于原子性起了很大的帮助,但是,但是,volatile不能代替锁,无法保证一些复合操作的原子性,如 i++ 如下 因为2个线程同时写入i时,一个线程的结果会覆盖另外一个线程的结果。

package ParallelBasic;

/**
* 文件描述:
* 作者: bamboo
* 时间: 2016/11/21
*/
public class MultiThreadLong {
public volatile static long i = 0; //volatile 不能代替锁
public static class PlusTask implements Runnable {

@Override
public void run() {
for (int k = 0; k < 100000; k++) {
i++;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(new PlusTask());
threads[i].start();
}

for (int i = 0; i < 10; i++) {
threads[i].join();
}
System.out.println(i);
}
}

  输出935458。因为2个线程同时写入i时,一个线程的结果会覆盖另外一个线程的结果。

Java并发基础<2>


  来看另一个的例子:


package ParallelBasic;

/**
* 文件描述:
* 作者: bamboo
* 时间: 2016/11/21
*/
public class MultiThreadLong {

private static boolean ready;
private static int number;

private static class ReaderThread implements Runnable {


@Override
public void run() {
while (!ready) ;
System.out.println(number);
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new ReaderThread());
t1.start();
Thread.sleep(1000);
number = 42;
ready = true;
Thread.sleep(1000);
}
}

  如果就这样执行,那么该程序永远不会停止,原因是在Server模式下(常用的普通模式 通过java -version可查看) 系统对代码进行了优化(具体就是发现ready变量一直使用,将其放在cache或者寄存器中)所有ready永远不会为false。正确的做法是在ready定义处添加volatile修饰。
在此处提到Server模式。顺便说一下:

  • 虚拟机的Server模式,启动较慢(10%),但是一旦运行性能提升明显,适合做服务器。

  • 虚拟机的Client模式,适合GUI界面的交互应用。

  • JVM在client模式默认-Xms是1M,-Xmx是64M;JVM在Server模式默认-Xms是128M,-Xmx是1024M

  • -Xms指的是初始堆大小 -Xmx指的是最大堆大小.


ThreadGroup 线程组


  线程组就是将相同职能的线程放在一个组里,利于分类和管理。

package ParallelBasic;

/**
* 文件描述:
* 作者: bamboo
* 时间: 2016/11/21
*/
public class ThreadGroup implements Runnable {

@Override
public void run() {
String groupAndName = Thread.currentThread().getThreadGroup().getName() +
"_" + Thread.currentThread().getName();
while (true) {
System.out.println("I am" + groupAndName);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
ThreadGroup tg = new ThreadGroup("PrintGroup");
Thread t1 = new Thread(tg, new ThreadGroupName(), "T1");//"T1" the name of the new thread
Thread t2 = new Thread(tg, new ThreadGroupName(), "T2");
t1.start();
t2.start();
System.out.println(tg.activeCount());//tg.activeCount() 由于线程的动态的,所以这个值是估计值
tg.list();//打印线程组
}
}

输出

2
I am PrintGroup_T1
java.lang.ThreadGroup[name=PrintGroup,maxpri=10]
Thread[T1,5,PrintGroup]
Thread[T2,5,PrintGroup]
I am PrintGroup_T2
I am PrintGroup_T2
I am PrintGroup_T1
I am PrintGroup_T1
I am PrintGroup_T2 ....

  值得注意的是:tg.stop()方法, 会停止所有线程组中的线程,很暴力 和Thread.sleep()方法一样,所以现在被废弃了。


Daemon 守护线程


守护线程是系统的守护者,运行在后台的线程。当用户线程全部结束时,守护线程无事可做自然退出。

package ParallelBasic;

/**
* 文件描述:守护线程如垃圾回收线程 JIT线程
* 当用户线程结束时,只有守护线程时,无事可做,就会自然退出
* 作者: bamboo
* 时间: 2016/11/21
*/
public class Daemon {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("I am alive");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.setDaemon(true);
t1.start();
Thread.sleep(3000);
}
}

值得注意的是

        t1.setDaemon(true);
t1.start();

  应该先设置守护线程,再start,如果设置错了,会抛出设置守护线程错误异常,但是程序依然运行下去,原因是被当做用户线程继续执行了。当主线程结束时,守护线程也消失了。


Priority 线程优先级


java中线程优先级1-10 10为最高 和Linux值相反,linux优先级1最大

  • MIN_PRIORITY=1;
  • MAX_PRIORITY=10;
  • NORM_PRIORITY=5;

`

package ParallelBasic;

/**
* 文件描述:
* java中线程优先级1-10 10为最高 和Linux值相反,linux优先级1最大
* MIN_PRIORITY=1;
* MAX_PRIORITY=10;
* NORM_PRIORITY=5;
* 作者: bamboo
* 时间: 2016/11/21
*/
public class ThreadPriority {
public static class HighPriority implements Runnable {

static int count = 0;

@Override
public void run() {
while (true) {
synchronized (ThreadPriority.class) {
count++;
if (count > 100000000) {
System.out.println("HighPriority is complete");
break;
}
}
}
}
}

public static class LowPriority implements Runnable {

static int count = 0;

@Override
public void run() {
while (true) {
//synchronized (ThreadPriority.class) 产生一次资源竞争,让效果更明显
synchronized (ThreadPriority.class) {
count++;
if (count > 100000000) {
System.out.println("LowPriority is complete");
break;
}
}
}
}
}

public static void main(String[] args) {
Thread h = new Thread(new HighPriority());
Thread l = new Thread(new LowPriority());
h.setPriority(Thread.MAX_PRIORITY);
l.setPriority(Thread.MIN_PRIORITY);
l.start();
h.start();
}

}

输出的结果是

HighPriority is complete
LowPriority is complete
Process finished with exit code 0

  虽然体现出的结果是这样,但是高优先级只是在大部分情况下会优先于低优先级线程执行,不一定每次都是这样。使用了synchronized (ThreadPriority.class) 产生一次资源竞争,让效果更明显。


synchronized 同步关键字


实现线程间的同步,工作就是对同步代码加锁。

  一次只能有一个线程进入到代码块中,从而保证线程的安全性。即当线程A写入的时候,线程B不仅不能写,还不能读(因为进入不到代码块中)。

synchronized的多种用法

  • 指定加锁对象:对给定对象加锁,进入同步代码块前要获得给定对象的锁。
  • 直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获取当前实例的锁。
  • 直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获取当前类的锁。

下列代码用来解决volatile复合操作原子性问题的。

package ParallelBasic;

/**
* 文件描述:
* 作者: bamboo
* 时间: 2016/11/21
*/
public class Synchronized {
public static long i = 0; //volatile 不能代替锁
static PlusTask plusTask= new PlusTask();
public static class PlusTask implements Runnable {

@Override
public void run() {
for (int k = 0; k < 10000; k++) {
increase();
}
}

synchronized void increase() {
i++;
}

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(new PlusTask());
threads[i].start();
}

for (int i = 0; i < 10; i++) {
threads[i].join();
}
System.out.println(i);
}
}
}

结果却输出了

95262

注意到上述使用synchronized的方法,分析上述代码得知,synchronized void increase(){}

是直接作用于实例方法的,而在main函数中,new了非常10个实例,说明加锁都加到了不同的对象上,这样就会造成, 2个线程同时写入i时,一个线程的结果会覆盖另外一个线程的结果。

正确的做法是有

  • 在main函数中,使用同一个对象创建线程。

  • 将increase方法提升为static类方法,这样每次都锁定类。

  • 如果i不是数据类型,而是一个对象的话,那么可以直接synchronized(Object instance) 此处不受用。

并行程序中隐藏的Bug 虽然小,但是容易致命


  • 溢出

  • 并发下的ArrayList

  • 并发下的HashMap

  • 加锁对象的错误


溢出,无法提示的错误


 /**
* 文件描述:
* 作者: bamboo
* 时间: 2016/11/21
*/
public class OutIndex {
public static void main(String[] args) {
int a =1023454656;
int b =1232234234;
System.out.println(a+b);
}
}


out:
-2039278406

Process finished with exit code 0

  如果是单单这样的一个程序当然会想到,是溢出的问题,如果是在很复杂的环境中,出现了这样的错误,那种调不出来的bug,真是令人难受。


并发下的ArrayList


并发下的ArrayList,非线程安全。

package ParallelBasic;

import java.util.ArrayList;
import java.util.Vector;

/**
* 文件描述:线程不安全容器,多线程下容易出错
* 作者: bamboo
* 时间: 2016/11/21
*/
public class ArrayListMultiThread {
static ArrayList<Integer> al = new ArrayList<Integer>(10);
// static Vector<Integer> al = new Vector<>(10);

public static class AddThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
al.add(i);
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new AddThread());
Thread t2 = new Thread(new AddThread());
t1.start();t2.start();
t1.join();
t2.join();
System.out.println(al.size());
}
}

输出为

Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException: 549
at java.util.ArrayList.add(ArrayList.java:441)
at ParallelBasic.ArrayListMultiThread$AddThread.run(ArrayListMultiThread.java:28)
at java.lang.Thread.run(Thread.java:744)

或者:

100430

  原因是ArrayList在扩容过程中,内部的一致性被破坏,但由于没有锁保护,另一个线程访问到了不一致的内部状态,导致越界。

  也可能直接打印一个小于如178233 这样的 值,但是不会报错,两个线程访问了同一位置导致覆盖。

解决办法用vector代替ArrayList

static Vector<Integer> al = new Vector<>(10);


并发下的HashMap

并发下的HashMap,非线程安全。

下列代码有毒

package ParallelBasic;

import java.util.HashMap;
import java.util.Map;

/**
* 文件描述:并发下的hashMap也不是线程安全的
* 出现死循环,用ConcurrentHashMap
* 作者: bamboo
* 时间: 2016/11/21
*/
public class HashMapMultiThread {
static Map<String, String> map = new HashMap<>();

public static class AddThread implements Runnable {
int start = 0;

public AddThread(int start) {
this.start = start;
}

@Override
public void run() {
for (int i = start; i < 100000; i += 2) {
map.put(Integer.toString(i), Integer.toBinaryString(i));
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new AddThread(0));
Thread t2 = new Thread(new AddThread(1));
t1.start();t2.start();
t1.join();
t2.join();
System.out.println(map.size());
}
}
}

  当我运行代码的时候,机子卡主了,吓得我赶紧退出程序,然后将循环数值改小一点,发现CPU占用率高的飞起,马上就99%了。然后再改小一点,发现ok了。


于是我使用jps命令查看线程运行状态。

C:\Users\bamboo>jps
8816 Launcher
8232 AppMain
7684 Jps
7936


C:\Users\bamboo>jstack 8232


....

"Thread-1" prio=6 tid=0x000000000a0bf000 nid=0x1a80 runnable [0x000000000a8ae000]
java.lang.Thread.State: RUNNABLE
at java.util.HashMap.put(HashMap.java:498)
at ParallelBasic.HashMapMultiThread$AddThread.run(HashMapMultiThread.java:25)
at java.lang.Thread.run(Thread.java:744)

"Thread-0" prio=6 tid=0x000000000a0be800 nid=0x249c runnable [0x000000000ab4e000]
java.lang.Thread.State: RUNNABLE
at java.util.HashMap.put(HashMap.java:498)
at ParallelBasic.HashMapMultiThread$AddThread.run(HashMapMultiThread.java:25)
at java.lang.Thread.run(Thread.java:744)

...

"main" prio=6 tid=0x000000000246f000 nid=0x63c in Object.wait() [0x000000000267e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000eb151580> (a java.lang.Thread)
at java.lang.Thread.join(Thread.java:1280)
- locked <0x00000000eb151580> (a java.lang.Thread)
at java.lang.Thread.join(Thread.java:1354)
at ParallelBasic.HashMapMultiThread$AddThread.main(HashMapMultiThread.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

....

  注意到HaspMap.put函数的498行似乎发生了什么,于是我就跑去看了,下列第一行就为498行。

   for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}

  看起来就是如同链表遍历一样,一个一个往下,但是由于在多线程环境下,可能发生冲突,链表结构被破坏,形成一个环,这样下去上述的迭代过程是一个死循环。 最简单的就是key1和key2 相互指向对方。
在JDK1.8已经做了大改动,很大程度上解决这个问题,但是还是有问题,最好的方法就是使用ConcurrentHashMap


加锁对象的错误


下列就是一个加锁加错对象的例子

package ParallelBasic;

public class LockInteger implements Runnable {
public static Integer i = 0;
static BadLockOnInteger instance = new BadLockOnInteger();
@Override
public void run() {
for (int j = 0; j < 1000000; j++) {
synchronized (i) {
i++; //对于此处,就是新建一个Integer对象,并将它的引用赋值给i;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1= new Thread(instance);
Thread t2= new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}


输出:
1883900

Process finished with exit code 0

  为什么呢,讲道理,该加锁的加锁了。问题出现在这个Integer上。Integer这个类的对象是不可变对象。对其进行i++时,就是新建一个Integer对象,并将它的引用赋值给i,意思就是可能将锁加到了不同的Integer对象上,导致了2个线程同时写入i时,一个线程的结果会覆盖另外一个线程的结果。证明一下:
- 将经过编译的class文件使用javap反编译一下,并找到run方法

C:\Users\bamboo\Desktop\单源\Leetcode\out\ParallelBasic>javap -c -public BadLockOnInteger.class
Compiled from "BadLockOnInteger.java"
public class ParallelBasic.BadLockOnInteger implements java.lang.Runnable {
public static java.lang.Integer i;

public ParallelBasic.BadLockOnInteger();

.....
public void run();
Code:
0: iconst_0
1: istore_1
2: iload_1
3: ldc #2 // int 1000000
5: if_icmpge 55
8: getstatic #3 // Field i:Ljava/lang/Integer;
11: dup
12: astore_2
13: monitorenter
14: getstatic #3 // Field i:Ljava/lang/Integer;
17: astore_3
18: getstatic #3 // Field i:Ljava/lang/Integer;
21: invokevirtual #4 // Method java/lang/Integer.intValue:()I
24: iconst_1
25: iadd
26: invokestatic #5 // Method java/lang/Integer.valueOf:(I)Ljava/lang/Integer;
29: dup
30: putstatic #3 // Field i:Ljava/lang/Integer;
33: astore 4
35: aload_3
36: pop
37: aload_2
38: monitorexit
39: goto 49
42: astore 5
44: aload_2
45: monitorexit
46: aload 5
48: athrow
49: iinc 1, 1
52: goto 2
55: return
Exception table:
from to target type
14 39 42 any
42 46 42 any

...
}

  第26行出现了感兴趣的东西,于是就去找到了Integer.valueOf(Int i)方法:
public static Integer valueOf(int i) {
assert IntegerCache.high >= 127;
if (i >= IntegerCache.low && i <= IntegerCache.high)
return IntegerCache.cache[i + (-IntegerCache.low)];
return new Integer(i);
}

  果然,不可变对象原来是这样的。又学到了,这让我想到了另一个String和StringBuffer的区别,大体上也是这样的吧。很关键。

上述代码的修改方法就是将

synchronized (i);

改为

synchronized (instance);