Bravo!Java学习笔记(3)---多线程

时间:2022-05-28 12:32:23

多线程解决的问题

不管是Linux、Java、还是Windows,多线程都是解决线程如何创建、使用、销毁,以及线程之间如何同步的问题。
1. 场景1
某个数据A被多个线程值改变,可能是增加,或者减少。比如A=A+15,再给A增加15前,如何确保A没有被再增加过、减少过?

    Load register , A
Add register , 15
Mov A, register

2. 场景2
线程A在处理数据d,线程B在等待A的处理结果,A处理完后立即休息,B拿到数据d立即开始。
3. 场景3
线程A、B、C都在抢夺代码片段c的控制权,谁先进入,谁先运行该代码,其他线程就等着。

1. Reentrantlock

Reentrantlock对象,是java里的锁对象。初次访问锁对象的线程获得锁,后面再访问的线程则阻塞,直到锁对象被前者释放,锁的方法是lock(),解锁的方式是unlock()

    myThread implements Runnable{
Reentrantlock myLock = new Reentrantlock();
public void run{
myLock.lock();
try{
....//具体需共享的代码or数据
}
finally{
myLock.unlock();
}
}
}

2. 测试锁Trylock()方法

如果后访问的线程不想一直阻塞,应采用锁测试的方法trylock(),它去拿锁,有则true返回,无则false返回;也可以使用超时参数,比如

    myLock.trylock(100.TimeUnit.MILLISECONDS)

本来增加超时就有防死锁的功能。还有一种情况,在阻塞期间,线程被中断,中断里也可能发生死锁。若使用带超时参数的trylock()若被中断,将抛出InterruptedException异常,程序可以处理该异常,防止死锁。

3. 读写锁

ReentrantReadWriteLock对于只读的线程,因为读不会破坏数据,它允许读线程共享访问数据,则写线程互斥访问,提高读写效率。

    public Lock rLock = new ReentrantReadWriteLock.readLock(); //获取读锁
public Lock wLock = new ReentrantReadWriteLock.writeLock();//获取写锁

条件变量Condition

条件变量用来线程间相互同步,需要配合1个锁使用。理解它,最关键的2点是:
1. 线程A先加锁,再阻塞,线程B中可唤醒A的阻塞;
2. 线程A阻塞的同时就释放了锁。不然其他线程进不来。
等待条件并阻塞的方法是await(),唤醒的方法是signalAll() or signal();
一般用,如下形式等待条件变量:
while( !(condition_is_ok)) cd.await();
Condition变量可以有多个。

形象说明下,A、B线程都执行下面的代码,A线程先拿到锁,B进来没锁了,B等着;A发现不满足条件,进入await()等着,同时释放锁; B于是拿到了锁,B满足条件,处理完事务,调用signalAll()通知了A,并把锁换给A,A也处理完事务,释放锁。

myThread implements Runnable{
Reentrantlock myLock = new Reentrantlock();
Condition cd = myLock.newCondition();
public void run{
myLock.lock();
try{
while( !(condition_is_ok))
cd.await(); //等待条件,不满足则等,同时释放锁,不然其他线程进不来
....//具体需共享的代码or数据
cd.signalAll(); //解除其他线程的阻塞状态,即通知阻塞线程,但不代表阻塞线程立即执行,看调度
}
finally
{
myLock.unlock();
}
}
}

同步关键字synchronized

1. 方法的synchronized

在某个method前用public sychronized function1(){...} 相当自动给该方法配了一把内部对象锁,自动lock,自动unlock,使用很方便、简洁。
内部对象锁只有1个相关条件,并且其相应的方法就是大名鼎鼎的wait()、notifyAll()和notify();可见这几个方法就是在声明了synchronized的时候用的,作用一样。这3个方法自Object对象就有了。

Condition对象 Synchronized声明
await() wait()
signalAll() notifyAll()
signal() notify()
    public synchronized function{
...
while(!(condition_is_ok))
wait();
..// doing sth
notifyAll();
...
}

2. 代码块的synchronized

还可以用如下形式:

    Object myLock =  new Object();
synchronized (lock){
...
public function {
while(!(condition_is_ok))
wait();
..// doing sth
notifyAll();
...
}
}

注意用synchronized 方法时,锁是自动加在定义该方法的对象上的;
而用synchronized 代码块时,锁是加在自定义的对象上,后者更加细分。

volatile关键字

有点像C语言的作用,限定为volatile的域告诉编译器、虚拟机,这个变量是需要并发访问的。

    public volatile boolean done = false;

final关键字

域声明为final也可以对共享域进行安全访问。

    final Map<String,Double> accounts = new HashMap<>();

一个线程调用后,其他线程一定是在构造函数完成后才能看到这个accounts变量,否则不能保证,比如accounts=NULL

线程局部变量ThreadLocal

ThreadLocal不是解决共享的问题。而是让线程各自拥有某个局部变量,它为各自线程提供各自的示例。

    public static final ThreadLocal<Something> s = new ThreadLocal<Something>(){
protected Something initialValue(){ //调用initialValue初始化对象
return new Something("good");
}
}
...
s.get().function(); //线程里调用ThreadLocal.get()方法可获得局部对象,再调用该对象的某function();

ThreadLocal.get()获得这个线程当前值,如果首次调用get,则调用initialValue得到该值。
ThreadLocal.set()为线程设置新值。
ThreadLocal.remove()删除对应这个线程的值

阻塞队列BlockingQueue

相比上面提到的锁、条件变量这些,阻塞队列是更高层的结构,更方便,安全。可以让1个线程安全地共享数据给另外的线程,
1个线程add()数据到队列,另外1个数据take()从队列里取数据,没有就乖乖等着。
当然不管是放数据的还是取数据的,队列的操作结果,取决于队列的空、满状况,看你想操作后是阻塞、返回、还是抛出异常。

方法 动作 特殊情况
add 添加新元素入列 如果满,抛出IIegalStateException
put 添加新元素入列 如果满,则阻塞
element 返回头元素 如果空,抛出NoSuchElementException
offer 添加个元素并返回true 如果满,返回false
peek 返回队列头元素 如果空,则返回NULL
poll 移出并返回头元素 如果空,则返回NULL
remove 移出并返回头元素 如果空,抛出NoSuchElementException
take 移出并返回头元素 如果空,则阻塞

BlockingQueue还有几个变种:
LinkedBlockingQueue容量无上限;
ArrayBlockingQueue可指定公平参数,比如让等待最长的线程优先处理;
PriorityBlockingQueue的元素会按照优先级顺序被移出;
DelayBlockingQueue里的元素只有在delay时间用完才能被移除;

线程安全的集合

普通集合如List、Set、Map等都不是线程安全的。集合类通过使用同步包装器(synchronization wrapper)编程是线程安全的,使用集合时,要用synchronized关键字:

    List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>);
synchronized(synchArrayList )
{
...
}

而在java.util.concurrent里,如今有了专门的安全集合:
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
ConcurrentLinkedQueue
CopyOnWriteArrayList
CopyOnWriteArraySet

Callable、Future、FutureTask

Runable的run()无参数无返回值,是异步执行的任务,Callable类似Runable,但有返回值,而且是接口。

    public interface Callable<V>
{

V call() throws Exception;
}

Future也是个接口,它可以作为线程池里任务完成的结果的返回,可以调用get()阻塞等待,也可以调用cancle()取消等待,也可以调用iscancle()、isdone()查询结果状态;

public interface Future<V>
V get() throws...;

V get(long timeout, TimeUnit unit) throws ...;
void cancel(boolean mayInterrupt);
boolean isCancelled();
boolean isDone();

FutureTask是个包装器,可以将Callable转成Future和Runable。
说白了用FutureTask包装1个Callable任务,用Future来等待任务结果,两者融到一起,很方便。

    Callable<Integer> m = new Callable<Integer>(){
Integer call(){...} //类似Runnable里的run(),但带参数返回
}
FutureTask<Integer> task = new FutureTask<Integer>(m);
new Thread(task).start(); //这是Runnable;
...
Integer result = task.get(); //这是Future

Executor执行器

频繁的构建、销毁线程是有代价的。线程池是一种线程管理对象,它掌管线程的调用、优先级和生命周期,使用它后,开发者可关注于线程干的事,而不是线程本身。它包含若干空闲线程,一旦把Runnable对象交给它,就会有1个线程调用其run(),退出后,线程也不会死亡,准备为下一次请求服务。
Executor就是用来构建线程池的,用了静态工厂的方法,有各种功能的池子:

方法 描述
newCachedThreadPool 有空闲则用,没有时创建新线程,空闲线程保留60秒
newFixedThreadPool 包含固定数目的线程,空闲线程一直保留,提交太多则入列
newSingleThreadExecutor 单个线程的”池”,池子顺序执行每个提交的任务
newScheduledThreadPool 预订执行而构建的固定线程池,替代java.util.timer
newSingleScheduledThreadPool 预订执行而构建的单线程”池”

返回实现了ExecutorSevice接口的ThreadPoolExecutor类的对象
用下面方法可将Runnable或Callable对象提交给ExecutorService:

    Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task);

显然返回的Future可查询任务状态。
用完线程池用shutdown()关闭,它会等待所有任务完成。另外一个是shutdownNow(),会立即关闭正在运行的线程。
使用线程池的步骤:
1. new 一个合适的ThreadPool;
2. submit(...)提交Runnable或Callable任务对象;
3. 用返回的Future对象可查询任务状态或取消任务;
4. 不提交任务调用shutdown();
ScheduledExecutorService用schedule()方法提交定时任务:

    ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUint unit)
ScheduledFuture<?> schedule(Runnable task, long time, TimeUint unit)

任务组

想要执行给定的任务,用invokeAny()
想要执行一组任务而不是单个任务,可用invokeAll()方法:

    List<Callable<T>> tasks = ...;
List<Future<T>> results = exector.invokeAll(tasks);
for(Future<T> result:results)
processFurther(result.get());

它也有个缺点,如果第1个任务get()时间太长,则要一直会等待,可用ExecutorCompletionService来解决。它通过1个执行器构建成服务,可以管理执行器的执行结果。

    ExecutorCompletionService service = new ExecutorCompletionService(executor);
for(Callable<T> task:tasks) service.submit(task);
for(int i = 0 ; i < task.sizes(); i ++)
processFurther(service.take().get());

Fork-Join框架

这里有更详尽的介绍 http://ifeve.com/talk-concurrency-forkjoin/

同步器

1. 信号量semaphore

例如,10个线程同时竞争5个资源,则设置5的信号量,先来先得,后面走1个再进1个。
acquire()获得信号量,使得值要么-1,为0则阻塞等待;
release()释放信号量,使得值+1,其余等在0上的线程获得信号量继续运行;
availablePermits()得知当前还有多少信号灯可以使用;

2. CountDownLatch

例如,某件事A要等前边10件事干完才能做,初始化countdown值为10,A调用await()等待,其余10件事依次完成后调用countdown()减计数,计数值为0时,A醒来继续运行。

    public CountDownLatch(int count);
public void countDown();
public void await() throws InterruptedException

3. CyclicBarrier

例如,就像赛跑,所有事件都要同时发生,在这之前,每个选手都要await()等待,在Barrier上的所有事件全部await()后,才会一起继续执行。其实CountdownLatch也可以实现的。

    public int await()throws InterruptedException,BrokenBarrierException
public int await(int timeout,TimeUnit.MILLISECONDS)

3. Exchanger

只能用于2个线程之间,A往缓冲区填,B从缓冲区取,A等B取完了再放,B等A放了在取

    final Exchanger<List<Integer>> exchanger = new Exchanger<<List<Integer>>();
public List addList;
public List removeList;
public A implements Runnable{
if(removeList.size() != 0)
list = exchanger.exchange(removeList);
list .add(...);
list = exchanger.exchange(addList);
}
}
public B implements Runnable{
if(addList.size() == 0)
list = exchanger.exchange(addList);
list .removeFirst(...);
list = exchanger.exchange(removeList);
}
}

4. SynchronousQueue

Blockqueue是1个在put()时,另外1个阻塞的在take(),put的这一方不会阻塞;
SynchronousQueue是put()的一方也阻塞,take()的一方在没东西前也只阻塞,收到东西后,双方都释放阻塞。如果用生产者-消费者模型,它是单向的,而上面提到的Exchanger可看成是双向的。
SynchronousQueue和Blockqueue不同的是,它是一个没有数据缓冲的BlockingQueue,也就是size()得到永远是0。
总之其特点是:
1. 容量为0,无论何时 size方法总是返回0
2. put操作阻塞,直到另外一个线程取走队列的元素。
3. take操作阻塞,直到另外的线程put某个元素到队列中。
4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素。