本文实例讲述了java线程同步方法。分享给大家供大家参考,具体如下:
1. semaphore
1.1 二进制semaphore
semaphore算是比较高级点的线程同步工具了,在许多其他语言里也有类似的实现。semaphore有一个最大的好处就是在初始化时,可以显式的控制并发数。其内部维护这一个c计数器,当计数器小于等于0时,是不允许其他线程访问并发区域的,反之则可以,因此,若将并发数设置为1,则可以确保单一线程同步。下面的例子模拟多线程打印,每个线程提交打印申请,然后执行打印,最后宣布打印结束,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import java.util.concurrent.semaphore;
public class program{
public static void main(string[] agrs){
printqueue p= new printqueue();
thread[] ths= new thread[ 10 ];
for ( int i= 0 ;i< 10 ;i++){
ths[i]= new thread( new job(p), "thread" +i);
}
for ( int i= 0 ;i< 10 ;i++){
ths[i].start();
}
}
}
class printqueue{
private semaphore s;
public printqueue(){
s= new semaphore( 1 ); //二进制信号量
}
public void printjob(object document){
try {
s.acquire();
long duration=( long )(math.random()* 100 );
system.out.printf( "线程名:%s 睡眠:%d" ,thread.currentthread().getname(),duration);
thread.sleep(duration);
}
catch (interruptedexception e){
e.printstacktrace();
}
finally {
s.release();
}
}
}
class job implements runnable{
private printqueue p;
public job(printqueue p){
this .p=p;
}
@override
public void run(){
system.out.printf( "%s:正在打印一个任务\n " ,thread.currentthread().getname());
this .p.printjob( new object());
system.out.printf( "%s:文件已打印完毕\n " ,thread.currentthread().getname());
}
}
|
执行结果如下:
thread0:正在打印一个任务
thread9:正在打印一个任务
thread8:正在打印一个任务
thread7:正在打印一个任务
thread6:正在打印一个任务
thread5:正在打印一个任务
thread4:正在打印一个任务
thread3:正在打印一个任务
thread2:正在打印一个任务
thread1:正在打印一个任务
线程名:thread0 睡眠:32 thread0:文件已打印完毕
线程名:thread9 睡眠:44 thread9:文件已打印完毕
线程名:thread8 睡眠:45 thread8:文件已打印完毕
线程名:thread7 睡眠:65 thread7:文件已打印完毕
线程名:thread6 睡眠:12 thread6:文件已打印完毕
线程名:thread5 睡眠:72 thread5:文件已打印完毕
线程名:thread4 睡眠:98 thread4:文件已打印完毕
线程名:thread3 睡眠:58 thread3:文件已打印完毕
线程名:thread2 睡眠:24 thread2:文件已打印完毕
线程名:thread1 睡眠:93 thread1:文件已打印完毕
可以看到,所有线程提交打印申请后,按照并发顺序一次执行,没有任何并发冲突,谁先获得信号量,谁就先执行,其他剩余线程均等待。这里面还有一个公平信号与非公平信号之说:基本上java所有的多线程工具都支持初始化的时候指定一个布尔变量,true时表明公平,即所有处于等待的线程被筛选的条件为“谁等的时间长就选谁进行执行”,有点first in first out的感觉,而false时则表明不公平(默认是不non-fairness),即所有处于等待的线程被筛选执行是随机的。这也就是为什么多线程往往执行顺序比较混乱的原因。
1.2 多重并发控制
若将上面的代码改为s=new semaphore(3);//即让其每次可以并发3条线程
,则输出如下:
thread0:正在打印一个任务
thread9:正在打印一个任务
thread8:正在打印一个任务
thread7:正在打印一个任务
thread6:正在打印一个任务
thread5:正在打印一个任务
thread3:正在打印一个任务
thread4:正在打印一个任务
thread2:正在打印一个任务
thread1:正在打印一个任务
线程名:thread9 睡眠:26线程名:thread8 睡眠:46线程名:thread0 睡眠:79 thread9:文件已打印完毕
线程名:thread7 睡眠:35 thread8:文件已打印完毕
线程名:thread6 睡眠:90 thread7:文件已打印完毕
线程名:thread5 睡眠:40 thread0:文件已打印完毕
线程名:thread3 睡眠:84 thread5:文件已打印完毕
线程名:thread4 睡眠:13 thread4:文件已打印完毕
线程名:thread2 睡眠:77 thread6:文件已打印完毕
线程名:thread1 睡眠:12 thread1:文件已打印完毕
thread3:文件已打印完毕
thread2:文件已打印完毕
很明显已经并发冲突了。若要实现分组(每组3个)并发吗,则每一组也要进行同步,代码修改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
import java.util.concurrent.semaphore;
import java.util.concurrent.locks.lock;
import java.util.concurrent.locks.reentrantlock;
public class program{
public static void main(string[] agrs){
printqueue p= new printqueue();
thread[] ths= new thread[ 10 ];
for ( int i= 0 ;i< 10 ;i++){
ths[i]= new thread( new job(p), "thread" +i);
}
for ( int i= 0 ;i< 10 ;i++){
ths[i].start();
}
}
}
class printqueue{
private semaphore s;
private boolean [] freeprinters;
private lock lock;
public printqueue(){
s= new semaphore( 3 ); //二进制信号量
freeprinters= new boolean [ 3 ];
for ( int i= 0 ;i< 3 ;i++){
freeprinters[i]= true ;
}
lock= new reentrantlock();
}
public void printjob(object document){
try {
s.acquire();
int printerindex=getindex();
long duration=( long )(math.random()* 100 );
system.out.printf( "线程名:%s 睡眠:%d\n" ,thread.currentthread().getname(),duration);
thread.sleep(duration);
freeprinters[printerindex]= true ; //恢复信号,供下次使用
}
catch (interruptedexception e){
e.printstacktrace();
}
finally {
s.release();
}
}
//返回一个内部分组后的同步索引
public int getindex(){
int index=- 1 ;
try {
lock.lock();
for ( int i= 0 ;i<freeprinters.length;i++){
if (freeprinters[i]){
freeprinters[i]= false ;
index=i;
break ;
}
}
}
catch (exception e){
e.printstacktrace();
}
finally {
lock.unlock();
}
return index;
}
}
class job implements runnable{
private printqueue p;
public job(printqueue p){
this .p=p;
}
@override
public void run(){
system.out.printf( "%s:正在打印一个任务\n " ,thread.currentthread().getname());
this .p.printjob( new object());
system.out.printf( " %s:文件已打印完毕\n " ,thread.currentthread().getname());
}
}
|
其中getindex()
方法主要为了维护内部分组后(支持并发3个)组内数据的同步(用lock来同步)。
输出如下:
thread0:正在打印一个任务
thread9:正在打印一个任务
thread8:正在打印一个任务
thread7:正在打印一个任务
thread6:正在打印一个任务
thread5:正在打印一个任务
thread4:正在打印一个任务
thread3:正在打印一个任务
thread2:正在打印一个任务
thread1:正在打印一个任务
线程名:thread0 睡眠:82 打印机:0号
线程名:thread8 睡眠:61 打印机:2号
线程名:thread9 睡眠:19 打印机:1号
thread9:文件已打印完毕
线程名:thread7 睡眠:82 打印机:1号
thread8:文件已打印完毕
线程名:thread6 睡眠:26 打印机:2号
thread0:文件已打印完毕
线程名:thread5 睡眠:31 打印机:0号
thread6:文件已打印完毕
线程名:thread4 睡眠:44 打印机:2号
thread7:文件已打印完毕
线程名:thread3 睡眠:54 打印机:1号
thread5:文件已打印完毕
线程名:thread2 睡眠:48 打印机:0号
thread4:文件已打印完毕
线程名:thread1 睡眠:34 打印机:2号
thread3:文件已打印完毕
thread2:文件已打印完毕
thread1:文件已打印完毕
2. countdownlatch
countdownlatch同样也是支持多任务并发的一个工具。它主要用于“等待多个并发事件”,它内部也有一个计数器,当调用await()
方法时,线程处于等待状态,只有当内部计数器为0时才继续(countdown()
方法来减少计数),也就说,假若有一个需求是这样的:主线程等待所有子线程都到达某一条件时才执行,那么只需要主线程await,然后在启动每个子线程的时候进行countdown操作。下面模拟了一个开会的例子,只有当所有人员都到齐了,会议才能开始。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import java.util.concurrent.countdownlatch;
public class program{
public static void main(string[] agrs){
//开启可容纳10人的会议室
videoconference v= new videoconference( 10 );
new thread(v).start();
//参与人员陆续进场
for ( int i= 0 ;i< 10 ;i++){
participant p= new participant(i+ "号人员" ,v);
new thread(p).start();
}
}
}
class videoconference implements runnable{
private countdownlatch controller;
public videoconference( int num){
controller= new countdownlatch(num);
}
public void arrive(string name){
system.out.printf( "%s 已经到达!\n" ,name);
controller.countdown();
system.out.printf( "还需要等 %d 个成员!\n" ,controller.getcount());
}
@override
public void run(){
try {
system.out.printf( "会议正在初始化...!\n" );
controller.await();
system.out.printf( "所有人都到齐了,开会吧!\n" );
}
catch (interruptedexception e){
e.printstacktrace();
}
}
}
class participant implements runnable{
private videoconference conference;
private string name;
public participant(string name,videoconference conference){
this .name=name;
this .conference=conference;
}
@override
public void run(){
long duration=( long )(math.random()* 100 );
try {
thread.sleep(duration);
conference.arrive( this .name);
}
catch (interruptedexception e){
}
}
}
|
输出:
会议正在初始化...!
0号人员 已经到达!
还需要等 9 个成员!
1号人员 已经到达!
还需要等 8 个成员!
9号人员 已经到达!
还需要等 7 个成员!
4号人员 已经到达!
还需要等 6 个成员!
8号人员 已经到达!
还需要等 5 个成员!
5号人员 已经到达!
还需要等 4 个成员!
6号人员 已经到达!
还需要等 3 个成员!
3号人员 已经到达!
还需要等 2 个成员!
7号人员 已经到达!
还需要等 1 个成员!
2号人员 已经到达!
还需要等 0 个成员!
所有人都到齐了,开会吧!
3. phaser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
import java.util.concurrent.phaser;
import java.util.concurrent.timeunit;
import java.util.list;
import java.util.arraylist;
import java.io.file;
import java.util.date;
public class program{
public static void main(string[] agrs){
phaser phaser= new phaser( 3 );
filesearch system= new filesearch( "c:\\windows" , "log" ,phaser);
filesearch apps= new filesearch( "c:\\program files" , "log" ,phaser);
filesearch documents= new filesearch( "c:\\documents and settings" , "log" ,phaser);
thread systemthread= new thread(system, "system" );
systemthread.start();
thread appsthread= new thread(apps, "apps" );
appsthread.start();
thread documentsthread= new thread(documents, "documents" );
documentsthread.start();
try {
systemthread.join();
appsthread.join();
documentsthread.join();
} catch (interruptedexception e) {
e.printstacktrace();
}
system.out.println( "terminated: " + phaser.isterminated());
}
}
class filesearch implements runnable{
private string initpath;
private string end;
private list<string> results;
private phaser phaser;
public filesearch(string initpath,string end,phaser phaser){
this .initpath=initpath;
this .end=end;
this .results= new arraylist<string>();
this .phaser=phaser;
}
private void directoryprocess(file file){
file[] files=file.listfiles();
if (files!= null ){
for ( int i= 0 ;i<files.length;i++){
if (files[i].isdirectory()){
directoryprocess(files[i]);
}
else {
fileprocess(files[i]);
}
}
}
}
private void fileprocess(file file){
if (file.getname().endswith(end)){
results.add(file.getabsolutepath());
}
}
private void filterresults(){
list<string> newresults= new arraylist<string>();
long actualdate= new date().gettime();
for ( int i= 0 ;i<results.size();i++){
file file= new file(results.get(i));
long filedate=file.lastmodified();
if (actualdate-filedate<timeunit.milliseconds.convert( 1 ,timeunit.days)){
newresults.add(results.get(i));
}
}
results=newresults;
}
private boolean checkresults(){
if (results.isempty()){
system.out.printf( "%s: phase %d: 0 results.\n" ,thread.currentthread().getname(),phaser.getphase());
system.out.printf( "%s: phase %d: end.\n" ,thread.currentthread().getname(),phaser.getphase());
phaser.arriveandderegister();
}
else {
system.out.printf( "%s: phase %d: %d results.\n" ,thread.currentthread().getname(),phaser.getphase(),results.size());
phaser.arriveandawaitadvance();
return true ;
}
}
private void showinfo() {
for ( int i= 0 ; i<results.size(); i++){
file file= new file(results.get(i));
system.out.printf( "%s: %s\n" ,thread.currentthread().getname(),file.getabsolutepath());
}
phaser.arriveandawaitadvance();
}
@override
public void run(){
file file= new file(initpath);
if (file.isdirectory()){
directoryprocess(file);
}
if (!checkresults()){
return ;
}
filterresults();
if (!checkresults()){
return ;
}
showinfo();
phaser.arriveandderegister();
system.out.printf( "%s: work completed.\n" ,thread.currentthread().getname());
}
}
|
运行结果:
apps: phase 0: 4 results.
system: phase 0: 27 results.
希望本文所述对大家java程序设计有所帮助。
原文链接:https://blog.csdn.net/kkkkkxiaofei/article/details/19079259