多线程解决的问题
不管是Linux、Java、还是Windows,多线程都是解决线程如何创建、使用、销毁,以及线程之间如何同步的问题。
1. 场景1
某个数据A被多个线程值改变,可能是增加,或者减少。比如A=A+15,再给A增加15前,如何确保A没有被再增加过、减少过?
Load register , A
Add register , 15
Mov A, register
2. 场景2
线程A在处理数据d,线程B在等待A的处理结果,A处理完后立即休息,B拿到数据d立即开始。
3. 场景3
线程A、B、C都在抢夺代码片段c的控制权,谁先进入,谁先运行该代码,其他线程就等着。
锁
1. Reentrantlock
Reentrantlock对象,是java里的锁对象。初次访问锁对象的线程获得锁,后面再访问的线程则阻塞,直到锁对象被前者释放,锁的方法是lock()
,解锁的方式是unlock()
。
myThread implements Runnable{
Reentrantlock myLock = new Reentrantlock();
public void run{
myLock.lock();
try{
....//具体需共享的代码or数据
}
finally{
myLock.unlock();
}
}
}
2. 测试锁Trylock()方法
如果后访问的线程不想一直阻塞,应采用锁测试的方法trylock()
,它去拿锁,有则true返回,无则false返回;也可以使用超时参数,比如
myLock.trylock(100.TimeUnit.MILLISECONDS)
本来增加超时就有防死锁的功能。还有一种情况,在阻塞期间,线程被中断,中断里也可能发生死锁。若使用带超时参数的trylock()
若被中断,将抛出InterruptedException
异常,程序可以处理该异常,防止死锁。
3. 读写锁
ReentrantReadWriteLock对于只读的线程,因为读不会破坏数据,它允许读线程共享访问数据,则写线程互斥访问,提高读写效率。
public Lock rLock = new ReentrantReadWriteLock.readLock(); //获取读锁
public Lock wLock = new ReentrantReadWriteLock.writeLock();//获取写锁
条件变量Condition
条件变量用来线程间相互同步,需要配合1个锁使用。理解它,最关键的2点是:
1. 线程A先加锁,再阻塞,线程B中可唤醒A的阻塞;
2. 线程A阻塞的同时就释放了锁。不然其他线程进不来。
等待条件并阻塞的方法是await(),唤醒的方法是signalAll() or signal();
一般用,如下形式等待条件变量: while( !(condition_is_ok)) cd.await();
Condition变量可以有多个。
形象说明下,A、B线程都执行下面的代码,A线程先拿到锁,B进来没锁了,B等着;A发现不满足条件,进入await()等着,同时释放锁; B于是拿到了锁,B满足条件,处理完事务,调用signalAll()通知了A,并把锁换给A,A也处理完事务,释放锁。
myThread implements Runnable{
Reentrantlock myLock = new Reentrantlock();
Condition cd = myLock.newCondition();
public void run{
myLock.lock();
try{
while( !(condition_is_ok))
cd.await(); //等待条件,不满足则等,同时释放锁,不然其他线程进不来
....//具体需共享的代码or数据
cd.signalAll(); //解除其他线程的阻塞状态,即通知阻塞线程,但不代表阻塞线程立即执行,看调度
}
finally
{
myLock.unlock();
}
}
}
同步关键字synchronized
1. 方法的synchronized
在某个method前用public sychronized function1(){...}
相当自动给该方法配了一把内部对象锁,自动lock,自动unlock,使用很方便、简洁。
内部对象锁只有1个相关条件,并且其相应的方法就是大名鼎鼎的wait()、notifyAll()和notify();可见这几个方法就是在声明了synchronized的时候用的,作用一样。这3个方法自Object对象就有了。
Condition对象 | Synchronized声明 |
---|---|
await() | wait() |
signalAll() | notifyAll() |
signal() | notify() |
public synchronized function{
...
while(!(condition_is_ok))
wait();
..// doing sth
notifyAll();
...
}
2. 代码块的synchronized
还可以用如下形式:
Object myLock = new Object();
synchronized (lock){
...
public function {
while(!(condition_is_ok))
wait();
..// doing sth
notifyAll();
...
}
}
注意用synchronized 方法时,锁是自动加在定义该方法的对象上的;
而用synchronized 代码块时,锁是加在自定义的对象上,后者更加细分。
volatile关键字
有点像C语言的作用,限定为volatile的域告诉编译器、虚拟机,这个变量是需要并发访问的。
public volatile boolean done = false;
final关键字
域声明为final也可以对共享域进行安全访问。
final Map<String,Double> accounts = new HashMap<>();
一个线程调用后,其他线程一定是在构造函数完成后才能看到这个accounts变量,否则不能保证,比如accounts=NULL
。
线程局部变量ThreadLocal
ThreadLocal不是解决共享的问题。而是让线程各自拥有某个局部变量,它为各自线程提供各自的示例。
public static final ThreadLocal<Something> s = new ThreadLocal<Something>(){
protected Something initialValue(){ //调用initialValue初始化对象
return new Something("good");
}
}
...
s.get().function(); //线程里调用ThreadLocal.get()方法可获得局部对象,再调用该对象的某function();
ThreadLocal.get()
获得这个线程当前值,如果首次调用get,则调用initialValue
得到该值。 ThreadLocal.set()
为线程设置新值。 ThreadLocal.remove()
删除对应这个线程的值
阻塞队列BlockingQueue
相比上面提到的锁、条件变量这些,阻塞队列是更高层的结构,更方便,安全。可以让1个线程安全地共享数据给另外的线程,
1个线程add()
数据到队列,另外1个数据take()
从队列里取数据,没有就乖乖等着。
当然不管是放数据的还是取数据的,队列的操作结果,取决于队列的空、满状况,看你想操作后是阻塞、返回、还是抛出异常。
方法 | 动作 | 特殊情况 |
---|---|---|
add | 添加新元素入列 | 如果满,抛出IIegalStateException |
put | 添加新元素入列 | 如果满,则阻塞 |
element | 返回头元素 | 如果空,抛出NoSuchElementException |
offer | 添加个元素并返回true | 如果满,返回false |
peek | 返回队列头元素 | 如果空,则返回NULL |
poll | 移出并返回头元素 | 如果空,则返回NULL |
remove | 移出并返回头元素 | 如果空,抛出NoSuchElementException |
take | 移出并返回头元素 | 如果空,则阻塞 |
BlockingQueue还有几个变种: LinkedBlockingQueue
容量无上限; ArrayBlockingQueue
可指定公平参数,比如让等待最长的线程优先处理; PriorityBlockingQueue
的元素会按照优先级顺序被移出; DelayBlockingQueue
里的元素只有在delay时间用完才能被移除;
线程安全的集合
普通集合如List、Set、Map等都不是线程安全的。集合类通过使用同步包装器(synchronization wrapper)编程是线程安全的,使用集合时,要用synchronized关键字:
List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>);
synchronized(synchArrayList )
{
...
}
而在java.util.concurrent里,如今有了专门的安全集合: ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
ConcurrentLinkedQueue
CopyOnWriteArrayList
CopyOnWriteArraySet
Callable、Future、FutureTask
Runable的run()无参数无返回值,是异步执行的任务,Callable类似Runable,但有返回值,而且是接口。
public interface Callable<V>
{
V call() throws Exception;
}
Future也是个接口,它可以作为线程池里任务完成的结果的返回,可以调用get()
阻塞等待,也可以调用cancle()
取消等待,也可以调用iscancle()、isdone()
查询结果状态;
public interface Future<V>
V get() throws...;
V get(long timeout, TimeUnit unit) throws ...;
void cancel(boolean mayInterrupt);
boolean isCancelled();
boolean isDone();
FutureTask是个包装器,可以将Callable转成Future和Runable。
说白了用FutureTask包装1个Callable任务,用Future来等待任务结果,两者融到一起,很方便。
Callable<Integer> m = new Callable<Integer>(){
Integer call(){...} //类似Runnable里的run(),但带参数返回
}
FutureTask<Integer> task = new FutureTask<Integer>(m);
new Thread(task).start(); //这是Runnable;
...
Integer result = task.get(); //这是Future
Executor执行器
频繁的构建、销毁线程是有代价的。线程池是一种线程管理对象,它掌管线程的调用、优先级和生命周期,使用它后,开发者可关注于线程干的事,而不是线程本身。它包含若干空闲线程,一旦把Runnable对象交给它,就会有1个线程调用其run(),退出后,线程也不会死亡,准备为下一次请求服务。
Executor就是用来构建线程池的,用了静态工厂的方法,有各种功能的池子:
方法 | 描述 |
---|---|
newCachedThreadPool | 有空闲则用,没有时创建新线程,空闲线程保留60秒 |
newFixedThreadPool | 包含固定数目的线程,空闲线程一直保留,提交太多则入列 |
newSingleThreadExecutor | 单个线程的”池”,池子顺序执行每个提交的任务 |
newScheduledThreadPool | 预订执行而构建的固定线程池,替代java.util.timer |
newSingleScheduledThreadPool | 预订执行而构建的单线程”池” |
返回实现了ExecutorSevice接口的ThreadPoolExecutor类的对象
用下面方法可将Runnable或Callable对象提交给ExecutorService:
Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task);
显然返回的Future可查询任务状态。
用完线程池用shutdown()
关闭,它会等待所有任务完成。另外一个是shutdownNow()
,会立即关闭正在运行的线程。
使用线程池的步骤:
1. new 一个合适的ThreadPool;
2. submit(...)
提交Runnable或Callable任务对象;
3. 用返回的Future对象可查询任务状态或取消任务;
4. 不提交任务调用shutdown()
;
ScheduledExecutorService用schedule()
方法提交定时任务:
ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUint unit)
ScheduledFuture<?> schedule(Runnable task, long time, TimeUint unit)
任务组
想要执行给定的任务,用invokeAny()
;
想要执行一组任务而不是单个任务,可用invokeAll()
方法:
List<Callable<T>> tasks = ...;
List<Future<T>> results = exector.invokeAll(tasks);
for(Future<T> result:results)
processFurther(result.get());
它也有个缺点,如果第1个任务get()
时间太长,则要一直会等待,可用ExecutorCompletionService
来解决。它通过1个执行器构建成服务,可以管理执行器的执行结果。
ExecutorCompletionService service = new ExecutorCompletionService(executor);
for(Callable<T> task:tasks) service.submit(task);
for(int i = 0 ; i < task.sizes(); i ++)
processFurther(service.take().get());
Fork-Join框架
这里有更详尽的介绍 http://ifeve.com/talk-concurrency-forkjoin/
同步器
1. 信号量semaphore
例如,10个线程同时竞争5个资源,则设置5的信号量,先来先得,后面走1个再进1个。 acquire()
获得信号量,使得值要么-1,为0则阻塞等待; release()
释放信号量,使得值+1,其余等在0上的线程获得信号量继续运行; availablePermits()
得知当前还有多少信号灯可以使用;
2. CountDownLatch
例如,某件事A要等前边10件事干完才能做,初始化countdown值为10,A调用await()
等待,其余10件事依次完成后调用countdown()
减计数,计数值为0时,A醒来继续运行。
public CountDownLatch(int count);
public void countDown();
public void await() throws InterruptedException
3. CyclicBarrier
例如,就像赛跑,所有事件都要同时发生,在这之前,每个选手都要await()
等待,在Barrier上的所有事件全部await()
后,才会一起继续执行。其实CountdownLatch也可以实现的。
public int await()throws InterruptedException,BrokenBarrierException
public int await(int timeout,TimeUnit.MILLISECONDS)
3. Exchanger
只能用于2个线程之间,A往缓冲区填,B从缓冲区取,A等B取完了再放,B等A放了在取
final Exchanger<List<Integer>> exchanger = new Exchanger<<List<Integer>>();
public List addList;
public List removeList;
public A implements Runnable{
if(removeList.size() != 0)
list = exchanger.exchange(removeList);
list .add(...);
list = exchanger.exchange(addList);
}
}
public B implements Runnable{
if(addList.size() == 0)
list = exchanger.exchange(addList);
list .removeFirst(...);
list = exchanger.exchange(removeList);
}
}
4. SynchronousQueue
Blockqueue是1个在put()
时,另外1个阻塞的在take()
,put的这一方不会阻塞;
SynchronousQueue是put()
的一方也阻塞,take()
的一方在没东西前也只阻塞,收到东西后,双方都释放阻塞。如果用生产者-消费者模型,它是单向的,而上面提到的Exchanger可看成是双向的。
SynchronousQueue和Blockqueue不同的是,它是一个没有数据缓冲的BlockingQueue,也就是size()
得到永远是0。
总之其特点是:
1. 容量为0,无论何时 size方法总是返回0
2. put操作阻塞,直到另外一个线程取走队列的元素。
3. take操作阻塞,直到另外的线程put某个元素到队列中。
4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素。