线程通信用来保证线程协调运行,一般在做线程同步的时候才需要考虑线程通信的问题。
1、传统的线程通信
通常利用Objeclt类提供的三个方法:
- wait() 导致当前线程等待,并释放该同步监视器的锁定,直到其它线程调用该同步监视器的notify()或者notifyAll()方法唤醒线程。
- notify(),唤醒在此同步监视器上等待的线程,如果有多个会任意选择一个唤醒
- notifyAll() 唤醒在此同步监视器上等待的所有线程,这些线程通过调度竞争资源后,某个线程获取此同步监视器的锁,然后得以运行。
这三个方法必须由同步监视器对象调用,分为两张情况:
同步方法时,由于同步监视器为this对象,所以可以直接调用这三个方法。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
public class SyncMethodThreadCommunication {
static class DataWrap{
int data = 0 ;
boolean flag = false ;
public synchronized void addThreadA(){
if (flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data++;
System.out.println(Thread.currentThread().getName() + " " + data);
flag = true ;
notify();
}
public synchronized void addThreadB() {
if (!flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data++;
System.out.println(Thread.currentThread().getName() + " " + data);
flag = false ;
notify();
}
}
static class ThreadA extends Thread {
private DataWrap data;
public ThreadA(DataWrap dataWrap) {
this .data = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++) {
data.addThreadA();
}
}
}
static class ThreadB extends Thread {
private DataWrap data;
public ThreadB(DataWrap dataWrap) {
this .data = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++) {
data.addThreadB();
}
}
}
public static void main(String[] args) {
//实现两个线程轮流对数据进行加一操作
DataWrap dataWrap = new DataWrap();
new ThreadA(dataWrap).start();
new ThreadB(dataWrap).start();
}
}
|
同步代码块时,需要使用监视器对象调用这三个方法。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public class SyncBlockThreadComminication {
static class DataWrap{
boolean flag;
int data;
}
static class ThreadA extends Thread{
DataWrap dataWrap;
public ThreadA(DataWrap dataWrap){
this .dataWrap = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++) {
synchronized (dataWrap) {
if (dataWrap.flag) {
try {
dataWrap.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
dataWrap.data++;
System.out.println(getName() + " " + dataWrap.data);
dataWrap.flag = true ;
dataWrap.notify();
}
}
}
}
static class ThreadB extends Thread{
DataWrap dataWrap;
public ThreadB(DataWrap dataWrap){
this .dataWrap = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++) {
synchronized (dataWrap) {
if (!dataWrap.flag) {
try {
dataWrap.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
dataWrap.data++;
System.out.println(getName() + " " + dataWrap.data);
dataWrap.flag = false ;
dataWrap.notify();
}
}
}
}
public static void main(String[] args) {
//实现两个线程轮流对数据进行加一操作
DataWrap dataWrap = new DataWrap();
new ThreadA(dataWrap).start();
new ThreadB(dataWrap).start();
}
}
|
2、使用Condition控制线程通信
当使用Lock对象保证同步时,则使用Condition对象来保证协调。
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.sun.media.sound.RIFFInvalidDataException;
import javafx.scene.chart.PieChart.Data;
public class SyncLockThreadCommunication {
static class DataWrap {
int data;
boolean flag;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void addThreadA() {
lock.lock();
try {
if (flag) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data++;
System.out.println(Thread.currentThread().getName() + " " + data);
flag = true ;
condition.signal();
} finally {
lock.unlock();
}
}
public void addThreadB() {
lock.lock();
try {
if (!flag) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data++;
System.out.println(Thread.currentThread().getName() + " " + data);
flag = false ;
condition.signal();
} finally {
lock.unlock();
}
}
}
static class ThreadA extends Thread{
DataWrap dataWrap;
public ThreadA(DataWrap dataWrap) {
this .dataWrap = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++) {
dataWrap.addThreadA();
}
}
}
static class ThreadB extends Thread{
DataWrap dataWrap;
public ThreadB(DataWrap dataWrap) {
this .dataWrap = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 10 ; i++) {
dataWrap.addThreadB();
}
}
}
public static void main(String[] args) {
//实现两个线程轮流对数据进行加一操作
DataWrap dataWrap = new DataWrap();
new ThreadA(dataWrap).start();
new ThreadB(dataWrap).start();
}
}
|
其中Condition对象的await(), singal(),singalAll()分别对应wait(),notify()和notifyAll()方法。
3、使用阻塞队列BlockingQueue控制线程通信
BlockingQueue是Queue接口的子接口,主要用来做线程通信使用,它具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果队列已空,则该线程被阻塞。这两个特征分别对应两个支持阻塞的方法,put(E e)和take()
示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueThreadComminication {
static class DataWrap{
int data;
}
static class ThreadA extends Thread{
private BlockingQueue<DataWrap> blockingQueue;
public ThreadA(BlockingQueue<DataWrap> blockingQueue, String name) {
super (name);
this .blockingQueue = blockingQueue;
}
@Override
public void run() {
for ( int i = 0 ; i < 100 ; i++) {
try {
DataWrap dataWrap = blockingQueue.take();
dataWrap.data++;
System.out.println(getName() + " " + dataWrap.data);
sleep( 1000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class ThreadB extends Thread{
private BlockingQueue<DataWrap> blockingQueue;
private DataWrap dataWrap;
public ThreadB(BlockingQueue<DataWrap> blockingQueue, DataWrap dataWrap, String name) {
super (name);
this .blockingQueue = blockingQueue;
this .dataWrap = dataWrap;
}
@Override
public void run() {
for ( int i = 0 ; i < 100 ; i++) {
try {
dataWrap.data++;
System.out.println(getName() + " " + dataWrap.data);
blockingQueue.put(dataWrap);
sleep( 1000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
///实现两个线程轮流对数据进行加一操作
DataWrap dataWrap = new DataWrap();
BlockingQueue<DataWrap> blockingQueue = new ArrayBlockingQueue<>( 1 );
new ThreadA(blockingQueue, "Consumer" ).start();
new ThreadB(blockingQueue, dataWrap, "Producer" ).start();
}
}
|
BlockingQueue共有五个实现类:
ArrayBlockingQueue 基于数组实现的BlockingQueue队列
LinkedBlockingQueue 基于链表实现的BlockingQueue队列
PriorityBlockingQueue 中元素需实现Comparable接口,其中元素的排序是按照Comparator进行的定制排序。
SynchronousQueue 同步队列,要求对该队列的存取操作必须是交替进行。
DelayQueue 集合元素必须实现Delay接口,队列中元素排序按照Delay接口方法getDelay()的返回值进行排序。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。