1.Callable和Runnable
I Callable定义的方法是call,而Runnable定义的方法是run。
II Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
III Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常。
Runnable 不做具体介绍
通过实现Callable接口来创建Thread线程:其中,Callable接口(也只有一个方法)定义如下:
public interface Callable<V>
{
V call() throws Exception;
}
步骤1:创建实现Callable接口的类SomeCallable<Integer>(略);
步骤2:创建一个类对象:
Callable<Integer> oneCallable = new SomeCallable<Integer>();
步骤3:由Callable<Integer>创建一个FutureTask<Integer>对象:
FutureTask<Integer> oneTask = new FutureTask<Integer>(oneCallable);
注释:FutureTask<Integer>是一个包装器,它通过接受Callable<Integer>来创建,它同时实现了Future和Runnable接口。
步骤4:由FutureTask<Integer>创建一个Thread对象:
Thread oneThread = new Thread(oneTask);
步骤5:启动线程:
oneThread.start();
至此,一个线程就创建完成了。
例:
ackage sync;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* Created by fanqunsong on 2017/7/24.
*/
//步骤1:创建实现Callable接口的类SomeCallable<Integer>
public class CallableImpl implements Callable<String> {
private String acceptStr;
public CallableImpl(String acceptStr){
this.acceptStr=acceptStr;
}
@Override
public String call() throws Exception{
Thread.sleep(1000);
return this.acceptStr + "append some chars and return it!";
}
public static void main(String[] args) throws Exception{
//步骤2:创建一个类对象:
Callable<String> callable = new CallableImpl("my callable test!");
//步骤3:由Callable<>创建一个FutureTask<>对象:
FutureTask<String>task = new FutureTask<String>(callable);
long beginTime = System.currentTimeMillis();
//步骤4:由FutureTask<>创建一个Thread对象:步骤5:启动线程:
new Thread(task).start();
String result = task.get();
long endTime = System.currentTimeMillis();
System.out.println("hello : " + result);
System.out.println("cast : " + (endTime - beginTime) / 1000 + " second!");
}
}
2.锁的相关概念介绍
1)可重入锁
如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。
举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。
看下面这段代码就明白了:
class MyClass {
public synchronized void method1() {
method2();
}
public synchronized void method2() {
}
}
上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,
假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。
而由于synchronized和Lock都具备可重入性,所以不会发生上述现象。
2)可中断锁
可中断锁:顾名思义,就是可以相应中断的锁。
在Java中,synchronized就不是可中断锁,而Lock是可中断锁。
如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,
我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。
lockInterruptibly()可体现了Lock的可中断性。
3)公平锁
公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。
非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。
而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。
在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。
我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:
ReentrantLock lock = new ReentrantLock(true);
如果参数为true表示为公平锁,为fasle为非公平锁。默认情况下,如果使用无参构造器,则是非公平锁。
4)读写锁
读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。
正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。
ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。
可以通过readLock()获取读锁,通过writeLock()获取写锁。
3.ThreadLocal
用处:保存线程的独立变量。对一个线程类(继承自Thread)
当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
JDK 5 以后提供了泛型支持,ThreadLocal 被定义为支持泛型:
public class ThreadLocal<T> extends Object
T 为线程局部变量的类型。该类定义了 4 个方法:
1) protected T initialValue(): 返回此线程局部变量的当前线程的“初始值”。线程第一次使用 get() 方法访问变量时将调用此方法,但如果线程之前调用了 set(T) 方法,则不会对该线程再调用 initialValue 方法。通常,此方法对每个线程最多调用一次,但如果在调用 get() 后又调用了 remove(),则可能再次调用此方法。 该实现返回 null;如果程序员希望线程局部变量具有 null 以外的值,则必须为 ThreadLocal 创建子类,并重写此方法。通常将使用匿名内部类完成此操作。
2)public T get():返回此线程局部变量的当前线程副本中的值。如果变量没有用于当前线程的值,则先将其初始化为调用 initialValue() 方法返回的值。
3)public void set(T value):将此线程局部变量的当前线程副本中的值设置为指定值。大部分子类不需要重写此方法,它们只依靠 initialValue() 方法来设置线程局部变量的值。
4)public void remove():移除此线程局部变量当前线程的值。如果此线程局部变量随后被当前线程读取,且这期间当前线程没有设置其值,则将调用其 initialValue() 方法重新初始化其值。这将导致在当前线程多次调用 initialValue 方法。
下面是一个使用 ThreadLocal 的例子,每个线程产生自己独立的序列号。就是使用ThreadLocal存储每个线程独立的序列号复本,线程之间互不干扰。
package sync;
/**
* Created by fanqunsong on 2017/7/24.
*/
public class SequenceNumber {
// 定义匿名子类创建ThreadLocal的变量
private static ThreadLocal<Integer>seqNum = new ThreadLocal<Integer>(){
// 覆盖初始化方法
public Integer initialValue() {
return 0;
}
};
// 下一个序列号
public int getNextNum() {
seqNum.set(seqNum.get() + 1);
return seqNum.get();
}
private static class TestClient extends Thread {
private SequenceNumber sn;
public TestClient(SequenceNumber sn) {
this.sn = sn;
}
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println("thread[" + Thread.currentThread().getName() + "] sn[" + sn.getNextNum() + "]");
}
}
}
public static void main(String[] args) {
SequenceNumber sn = new SequenceNumber();
TestClient t1 = new TestClient(sn);
TestClient t2 = new TestClient(sn);
TestClient t3 = new TestClient(sn);
t1.start();
t2.start();
t3.start();
}
}
ThreadLocal 是如何实现为每个线程保存独立的变量的副本的呢?通过查看它的源代码,我们会发现,是通过把当前“线程对象”当作键,变量作为值存储在一个 Map 中。
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
4.线程池
1)初识线程池:
根据系统自身的环境情况,有效的限制执行线程的数量,使得运行效果达到最佳。线程主要是通过控制执行的线程的数量,超出数量的线程排队等候,等待有任务执行完毕,再从队列最前面取出任务执行。
2)线程池作用:
减少创建和销毁线程的次数,每个工作线程可以多次使用
可根据系统情况调整执行的线程数量,防止消耗过多内存
3)使用
ExecutorService:线程池接口
ExecutorService pool = Executors.常见线程
eg:ExecutorService pool = Executors.newSingleThreadExecutor();
4)常见线程池
①newSingleThreadExecutor
单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
②newFixedThreadExecutor(n)
固定数量的线程池,没提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
③newCacheThreadExecutor(推荐使用)
可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。
④newScheduleThreadExecutor
大小无限制的线程池,支持定时和周期性的执行线程
5)实例
public class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行中。。。");
}
}
public class TestFixedThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.shutdown();
}
}
运行结果
pool-1-thread-1执行中。。。
pool-1-thread-3执行中。。。
pool-1-thread-4执行中。。。
pool-1-thread-5执行中。。。
pool-1-thread-2执行中。。。
5.阻塞队列 BlockingQueue
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。
一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
BlockingQueue 的方法
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
四组不同的行为方式解释:
1)抛异常:如果试图的操作无法立即执行,抛一个异常。
2)特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
3)阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
4)超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
BlockingQueue 的实现
BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):
-
ArrayBlockingQueue
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
-
DelayQueue
-
LinkedBlockingQueue
-
PriorityBlockingQueue
-
SynchronousQueue
示例:
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue){
this.queue=queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
------------------------------------------------------------------------
public class Consumer implements Runnable {
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------
public class BlockingQueueExample {
public static void main(String[] args) throws Exception{
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
6.闭锁 CountDownLatch
1)类介绍
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,
所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。
一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行
2)使用场景
在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。
应用实例:
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException{
final CountDownLatch begin = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(10);
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int index = 0; index <10 ; index++) {
final int NO = index + 1;
Runnable run = new Runnable() {
@Override
public void run() {
try {
begin.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("NO."+NO+"arrived");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
exec.submit(run);
}
System.out.println("Game Start");
begin.countDown();
end.await();
System.out.println("Game Over");
exec.shutdown();
}
}
7.CyclicBarrier
CyclicBarrier 翻译过来叫循环栅栏。它主要的方法就是一个:await()。await() 方法没被调用一次,计数便会减少1,并阻塞住当前线程。
当计数减至0时,阻塞解除,所有在此 CyclicBarrier 上面阻塞的线程开始运行。在这之后,如果再次调用 await() 方法,计数就又会变成 N-1,新一轮重新开始,这便是 Cyclic 的含义所在。
CyclicBarrier 的使用并不难,但需要主要它所相关的异常。除了常见的异常,CyclicBarrier.await() 方法会抛出一个独有的 BrokenBarrierException。
这个异常发生在当某个线程在等待本 CyclicBarrier 时被中断或超时或被重置时,其它同样在这个 CyclicBarrier 上等待的线程便会受到 BrokenBarrierException。
意思就是说,同志们,别等了,有个小伙伴已经挂了,咱们如果继续等有可能会一直等下去,所有各回各家吧。
CyclicBarrier.await() 方法带有返回值,用来表示当前线程是第几个到达这个 Barrier 的线程。
和 CountDownLatch 一样,CyclicBarrier 同样可以可以在构造函数中设定总计数值。与 CountDownLatch 不同的是,CyclicBarrier 的构造函数还可以接受一个 Runnable,会在 CyclicBarrier 被释放时执行。
8.交换机Exchanger
9.Semaphore
我们以一个停车场运作为例来说明信号量的作用。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦。
以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。
在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。信号量是一个非负整数,表示了当前公共资源的可用数目
(在上面的例子中可以用空闲的停车位类比信号量),当一个线程要使用公共资源时(在上面的例子中可以用车辆类比线程),首先要查看信号量,
如果信号量的值大于1,则将其减1,然后去占有公共资源。如果信号量的值为0,则线程会将自己阻塞,直到有其它线程释放公共资源。
在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
实例
public class SemaphoreTest {
private Semaphore semaphore = new Semaphore(3);
private Random random = new Random();
class TaskDemo implements Runnable{
private String id;
TaskDemo(String id){
this.id=id;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Thread " + id + " is working");
Thread.sleep(random.nextInt(1000));
semaphore.release();
System.out.println("Thread "+id+" is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SemaphoreTest semaphoreTest = new SemaphoreTest();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(semaphoreTest.new TaskDemo("a"));
executorService.submit(semaphoreTest.new TaskDemo("b"));
executorService.submit(semaphoreTest.new TaskDemo("c"));
executorService.submit(semaphoreTest.new TaskDemo("d"));
executorService.submit(semaphoreTest.new TaskDemo("e"));
executorService.submit(semaphoreTest.new TaskDemo("f"));
executorService.shutdown();
}
}
运行结果
Thread a is working
Thread b is working
Thread c is working
Thread a is over
Thread d is working
Thread d is over
Thread e is working
Thread b is over
Thread f is working
Thread f is over
Thread e is over
Thread c is over