在如今的程序里,单线程的程序,应该已经比较少了,而Java语言是内置支持多线程并发的,大家都说Java语言内置支持多线程,非常非常的强大和方便,但一直没有深入研究jdk内concurrent包。今天就认真学习了一下java.util.concurrent包,发现jdk多线程编程果然是强大和方便。本文是学习java.util.concurrent包内线程池及相关接口的一些总结。
任务接口抽象
Runnable接口
在java.lang包内,为多线程提供了Runnable接口。
public interface Runnable {
public abstract void run();
}
Runnable接口只是简单地提供了一个任务运行的入口。但对于任务执行的结果以及任务的状态,都是没有定义的。但在jdk 1.5之后,Java针对多线程任务,提供了更强大的接口支持。那就是提供了Callable和Future接口。这两个接口,为Task提供了更多的抽象。可以更方便进行Task抽象。
Callable接口
Callable接口,虽然定义仍然很简单。但提供了Task运行的返回值,同时,也支持抛出异常。Callable接口的定义如下:
public interface Callable<V> {
V call() throws Exception;
}
Future接口
Future接口,是对异步任务结果的抽象。Future接口可以查询任务执行结果,或者等待任务结果,并且可以获取任务的执行结果。Future接口有5个方法。
1. cancel 用于取消任务,mayInterruptIfRunning参数表示,是否通过中断取消正在运行的任务,如果任务已经完成,则此方法返回false;
2. isCancelled 任务是否被取消,如果任务在正常完成前被取消,则返回true;
3. isDone 任务是否完成,无论任务是通过正常执行完,或者中途抛出异常,或者被取消,都认为任务已经完成;
4. get 获取任务执行结果,如果任务正在执行,则等待任务执行完成。get可以指定超时时间,如果超时,则抛出TimeoutException异常,如果任务被取消,则抛出CancellationException异常,如果任务在中途抛出异常,则get方法将异常封装在ExecutionException异常内,并抛出;如果当前线程被中断,则抛出InterruptedException异常。
Future接口定义如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Executor(执行器接口)
使用Thread和Runnable接口创建异步任务,线程和线程执行任务之间都是紧耦合的,同时,多个任务的执行策略,需要程序员自己进行繁琐地控制。为了将任务提交和任务执行之间进行解耦,Java在jdk1.5之后,提供了Executor接口。Executor接口定义如下:
public interface Executor {
void execute(Runnable command);
}
Executor接口是基于生产者-消费者模式,生产者只需要调用execute方法提交任务,至于消费者什么时候在哪里执行方法是和生产者解耦的。
线程池概述
在Java的java.util.concurrent包内,通过Executors类的静态方法,主要提供了4种线程池。具体如下:
方法 |
作用 |
newFixedThreadPool |
创建一个定长的线程池,当提交一个任务就会创建一个线程,直到达到池子的最大长度,然后线程池内的线程数不会再变化,如果有线程由于异常Exception而结束,线程池会补充一个新的线程。 |
newCachedThreadPool |
创建一个可缓存的线程池,如果当前线程池内空闲线程过多,它可以灵活地回收空闲的线程,当需求增加时,它可以灵活地添加新的线程,而不会对池的长度作任何限制。 |
newSingleThreadExecutor |
创建一个单线程化的executor,并只创建唯一的工作线程来执行任务,如果这个线程异常结束,会另外创建一个取代它。executor会保证任务依照任务队列所规定的顺序(FIFO,LIFO,优先级)执行。 |
newScheduledThreadPool |
创建一个定长的线程池,而且支持定时的和周期性任务执行,类似于Timer。 |
ExecutorService接口
在上面四种线程池,newFixedThreadPool,newCachedThreadPool和newSingleThreadExecutor都是返回ExecutorService接口的对象。
ExecutorService接口是继承Executor接口的,ExecutorService接口提供了更多更丰富的方法。主要方法如下:
1. submit submit方法通过重载,可以接受Callable任务对象和Runnable对象;submit方法返回一个Future对象。通过返回的Future对象,可以进行线程池任务状态的查询,以及取消任务。
2. invokeAll invokeAll方法可以进行任务的匹配提交,接受一个任务列表,同时可以可选提供一个超时时间;invokeAll方法返回一个Future对象列表。
3. isShutdown 线程池是否已经关闭
4. isTerminated 线程池所有任务是否已经执行完全。
5. shutdown 关闭线程池,线程池关闭后,则不能再接受任务。此方法只是使线程池不再接收新的任务,但已经提交的任务,仍然会继续运行。
6. shutdownNow 立刻关闭线程池,正在运行的任务会继续执行。未执行的任务将不会再执行,并将等待执行的任务返回。
7. awaitTermination 在指定时间内,等待线程池任务完成。
下面是定长线程池以及Future接口和Callable接口的代码示例:
package com.test.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
class Job1 implements Callable<Boolean> {
int id = 0;
public Job1(int id ){
this.id = id;
}
public Boolean call() throws Exception {
System.out.println("Job1 running.id:"+id+" Current time:"+System.currentTimeMillis());
return false;
}
}
class Job2 implements Callable<Boolean> {
int id =0;
public Job2(int id ){
this.id = id;
}
public Boolean call() throws Exception {
System.out.println("Job2 running.id:"+id+" Current time:"+System.currentTimeMillis());
Thread.sleep(15*1000);
System.out.println("Job2 end.id:"+id+" Current time:"+System.currentTimeMillis());
return true;
}
}
class Job3 implements Callable<Boolean> {
int id =0;
public Job3(int id ){
this.id = id;
}
public Boolean call() throws Exception {
System.out.println("Job3 running.id:"+id+" Current time:"+System.currentTimeMillis());
throw new RuntimeException("Job3 throw exception.");
}
}
public class ThreadPool {
static void addTask() throws InterruptedException, ExecutionException {
int id = 0;
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Boolean> future1 = executor.submit(new Job1(++id));
Future<Boolean> future2 = executor.submit(new Job2(++id));
Future<Boolean> future3 = executor.submit(new Job3(++id));
System.out.println("job1 is done:"+future1.isDone());
System.out.println("job1 result:"+future1.get());
System.out.println("job2 is done:"+future2.isDone());
System.out.println("job2 result:"+future2.get());
try {
System.out.println("job3 result:"+future3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
e.getCause();
}
List<Callable<Boolean>> taskList = new ArrayList<Callable<Boolean>>();
taskList.add(new Job1(++id));
taskList.add(new Job2(++id));
List<Future<Boolean>> futures = executor.invokeAll(taskList);
for (Future<Boolean> future : futures) {
System.out.println(future.get());
}
executor.shutdown();
executor.submit(new Job1(2));
}
public static void main( String[] args ) throws InterruptedException, ExecutionException {
addTask();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
上面的代码,执行结果如下:
Job1 running.id:1 Current time:1468632394429
Job2 running.id:2 Current time:1468632394430
job1 is done:true
Job3 running.id:3 Current time:1468632394431
job1 result:false
job2 is done:false
Job2 end.id:2 Current time:1468632409431
job2 result:true
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Job3 throw exception.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at com.test.concurrent.ThreadPool.addTask(ThreadPool.java:56)
at com.test.concurrent.ThreadPool.main(ThreadPool.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.RuntimeException: Job3 throw exception.
at com.test.concurrent.Job3.call(ThreadPool.java:40)
at com.test.concurrent.Job3.call(ThreadPool.java:33)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.lang.Thread.run(Thread.java:745)
Job1 running.id:4 Current time:1468632409434
Job2 running.id:5 Current time:1468632409434
Job2 end.id:5 Current time:1468632424435
false
true
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@67854d1f rejected from java.util.concurrent.ThreadPoolExecutor@608a6351[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 5]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at com.test.concurrent.ThreadPool.addTask(ThreadPool.java:73)
at com.test.concurrent.ThreadPool.main(ThreadPool.java:77)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
ScheduledExecutorService接口
ScheduledExecutorService接口继承ExecutorService接口,但额外提供了三个用于周期调度任务的接口。
方法 |
说明 |
schedule(Callable callable, long delay, TimeUnit unit) |
提交任务,并且指定任务延迟执行的时间,此方法提交的任务,只会被执行一次
|
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) |
提交一个任务,同时指定延迟时间initialDelay,每隔任务开始period时间,如果,任务已经结束,则会再次调度任务;如果任务没有结束,则任务结束后,则会立刻调度。如果没有空闲进程,则仍旧会等待 |
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) |
提交任务,指定延迟时间initialDelay,每次任务结束后,再间隔delay时间,再次调度任务 |
scheduleAtFixedRate和scheduleWithFixedDelay方法的区别,主要是scheduleAtFixedRate方法的任务间隔是以任务起始时间算,scheduleWithFixedDelay方法是以任务结束时间算的。
package com.test.concurrent;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class ScheduledTask1 implements Runnable{
AtomicInteger index = new AtomicInteger(0);
public void run() {
int tmp_index = index.addAndGet(1);
System.out.println("Thread name:"+Thread.currentThread().getName()+". Task1 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task1 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
}
}
class ScheduledTask2 implements Runnable{
AtomicInteger index = new AtomicInteger(100);
public void run() {
int tmp_index = index.addAndGet(1);
System.out.println("Thread name:"+Thread.currentThread().getName()+". Task2 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task2 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
}
}
class ScheduledTask3 implements Runnable{
AtomicInteger index = new AtomicInteger(1000);
public void run() {
int tmp_index = index.addAndGet(1);
System.out.println("Thread name:"+Thread.currentThread().getName()+". Task3 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task3 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis());
}
}
public class ScheduledPool {
static public void addTask() throws InterruptedException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(new ScheduledTask1(), 20, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new ScheduledTask2(), 0, 5, TimeUnit.SECONDS);
ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(1);
executor2.scheduleWithFixedDelay(new ScheduledTask3(), 0, 5, TimeUnit.SECONDS);
Thread.sleep(50*1000);
executor.shutdown();
executor.shutdown();
}
public static void main( String[] args ) throws InterruptedException {
addTask();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
上面代码,运行结果如下:
Thread name:pool-1-thread-1. Task2 begin running. index:101 Current time:1468637711181
Thread name:pool-2-thread-1. Task3 begin running. index:1001 Current time:1468637711182
Task2 end running. index:101 Current time:1468637721183
Thread name:pool-1-thread-1. Task2 begin running. index:102 Current time:1468637721183
Task3 end running. index:1001 Current time:1468637721183
Thread name:pool-2-thread-1. Task3 begin running. index:1002 Current time:1468637726184
Task2 end running. index:102 Current time:1468637731184
Thread name:pool-1-thread-1. Task2 begin running. index:103 Current time:1468637731184
Task3 end running. index:1002 Current time:1468637736185
Task2 end running. index:103 Current time:1468637741185
Thread name:pool-1-thread-1. Task2 begin running. index:104 Current time:1468637741185
Thread name:pool-2-thread-1. Task3 begin running. index:1003 Current time:1468637741185
Task2 end running. index:104 Current time:1468637751185
Thread name:pool-1-thread-1. Task1 begin running. index:1 Current time:1468637751186
Task3 end running. index:1003 Current time:1468637751186
Thread name:pool-2-thread-1. Task3 begin running. index:1004 Current time:1468637756187
Task1 end running. index:1 Current time:1468637761186
Task3 end running. index:1004 Current time:1468637766187
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
可以看出,ScheduledTask2任务,会在上次任务结束后,立马开始运行,而ScheduledTask3任务,则会在上次任务结束后,再等待5秒再运行,当executor线程池中,添加了一个新的阻塞任务后,则ScheduledTask2任务,会等待上个任务运行结束。同时,从上面的线程池名称可以看出,线程池中,线程的数量是固定的。