1.什么是线程?
线程有时称为 轻量级进程,每个线程都有自己的程序计数器,称为堆栈和本地变量。然而,线程存在于进程中,它们与同一进程内的其他线程共享内存、文件句柄以及每进程状态。这使线程更易于与其他线程共享信息,但也意味着您必须确保线程之间不相互干涉。2.线程安全的措施。
在 JDK 5.0 之前,确保线程安全的主要机制是 synchronized 原语。访问共享变量(那些可以由多个线程访问的变量)的线程必须使用同步来协调对共享变量的读写访问。java.util.concurrent 包提供了一些备用并发原语,以及一组不需要任何其他同步的线程安全实用程序类。JDK 1.2 中引入的 Collection 框架是一种表示对象集合的高度灵活的框架,其中一些集合已经是线程安全的(Hashtable 【SourceReader】和 Vector),通过同步的封装工厂(Collections.synchronizedMap()、synchronizedList() 和 synchronizedSet())。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Hashtable;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class SourceReader {
@SuppressWarnings({ "unused", "rawtypes", "unchecked" })
public static void main(String[] args) {
//自行阅读源码
//重型集合
Hashtable table = new Hashtable();
//弱一致迭代
List list = new ArrayList();
list.add(null); //modCount
list.iterator().next();
//并发Hash
ConcurrentHashMap map = new ConcurrentHashMap();
map.put("", ""); //Segment 并发桶,继承ReentrantLock
map.putIfAbsent("", ""); //原子性操作
//队列
PriorityQueue pQ = new PriorityQueue (0, new Comparator(){
public int compare(Object o1, Object o2) {
return 0;
}});
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
//线程池
Executor executor1 = Executors.newCachedThreadPool();
ThreadPoolExecutor executor2 =
(ThreadPoolExecutor)Executors.newFixedThreadPool(10);
executor2.setRejectedExecutionHandler(null); //策略
}
}
3.concurrent包带来的变化
a.弱一致迭代:CopyOnWriteArrayList 和 CopyOnWriteArraySet【CopyOnWriteDemo】
java.util 包中的集合类都返回 fail-fast 迭代器,检测到在迭代过程中进行了更改操作,那么它会抛出 ConcurrentModificationException【SourceReader】。CopyOnWriteArrayList 和 CopyOnWriteArraySet通过每次添加或删除元素时创建支持数组的新副本,避免了这个问题,但是进行中的迭代保持对创建迭代器时的当前副本进行操作。虽然复制也会有一些成本,但是在许多情况下,迭代要比修改多得多,在这些情况下,写入时复制要比其他备用方法具有更好的性能和并发性。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteDemo {
public static void main(String[] args) {
//通常情况下,迭代是强一致的
fastFail();
//concurrent引入的弱一致迭代
weakItr();
}
private static void weakItr() {
List<String> list = new CopyOnWriteArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
Iterator<String> weakItr = list.iterator();
while(weakItr.hasNext()){
System.out.println(weakItr.next());
list.add("d");
}
System.out.println("弱一致迭代不会抛错,但不会打印新增的数据");
}
private static void fastFail() {
List<String> list = new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
try{
Iterator<String> weakItr = list.iterator();
while(weakItr.hasNext()){
System.out.println(weakItr.next());
list.add("d");
}
}catch(ConcurrentModificationException e){
System.err.println("【ERROR】迭代引数据更新异常");
}
}
}
b.支持并发的Hash:ConcurrentHashMap
Hashtable 和 Collections.synchronizedMap 通过同步每个方法获得线程安全。这意味着当一个线程执行一个 Map 方法时,无论其他线程要对 Map 进行什么样操作,都不能执行,直到第一个线程结束才可以。对比来说,ConcurrentHashMap 允许多个读取几乎总是并发执行,读和写操作“通常”并发执行,多个同时写入“经常”并发执行。结果是当多个线程需要访问同一 Map 时,可以获得更高的并发性。特别的:①原子性的其他操作putIfAbsent②返回弱一致迭代
c.Queue 【SourceReader】
接口比 List 简单得多,仅包含 put() 和 take() 方法,并允许比 LinkedList 更有效的实现。实现 Queue的类是:• LinkedList 已经进行了改进来实现 Queue。(原)
• PriorityQueue 非线程安全的优先级对列(堆)实现,根据自然顺序或比较器返回元素。(原)
• ConcurrentLinkedQueue 快速、线程安全的、无阻塞 FIFO 队列。
d.Executor 框架 【SourceReader】
Executor,提供executor方法。ExecutorService接口,定义了更多的生命周期管理方法。
• Executors.newCachedThreadPool() 创建不限制大小的线程池,但是当以前创建的线程可以使用时将重新使用那些线程。如果没有现有线程可用,将创建新的线程并将其添加到池中。使用不到 60 秒的线程将终止并从缓存中删除。
• Executors.newFixedThreadPool(int n) 创建线程池,其重新使用在不受限制的队列之外运行的固定线程组。在关闭前,所有线程都会因为执行。过程中的失败而终止,如果需要执行后续任务,将会有新的线程来代替这些线程。
• 定制 ThreadPoolExecutor
• 队列已满使用的策略;抛出异常(默认情况),放弃任务,在调用者的线程中执行任务,或放弃队列中最早的任务以为新任务腾出空间
e.调整线程池大小
用 WT 表示每项任务的平均等待时间,ST 表示每项任务的平均服务时间(计算时间)。则 WT/ST 是每项任务等待所用时间的百分比。对于 N 处理器系统,池中可以近似有 N*(1+WT/ST) 个线程。f.Future 接口【FutureDemo】
Future 接口允许表示已经完成的任务、正在执行过程中的任务或者尚未开始执行的任务。通过 Future 接口,可以尝试取消尚未完成的任务,查询任务已经完成还是取消了,以及提取(或等待)任务的结果值。package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import org.junit.Test;
public class FutureDemo {
static ThreadPoolExecutor executor =
(ThreadPoolExecutor)Executors.newFixedThreadPool(2);
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void wrongUseDemo() throws InterruptedException, ExecutionException {
FutureTask task = new FutureTask(new Callable(){
@Override
public Object call() {
try {
//任务执行2秒等待
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "0";
}
});
Future f = executor.submit(task);
System.out.println("陷阱,空返回:"+f.get());
executor.shutdown();
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void FutureTaskDemo() throws InterruptedException,
ExecutionException {
List<FutureTask> taskList = new ArrayList<FutureTask>();
for(int i=0;i<5;i++){
FutureTask task = new FutureTask(new Callable(){
@Override
public Object call() {
try {
//任务执行2秒等待
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "0";
}
});
taskList.add(task);
//提交任务,任务池的容量为2,一次最多2个任务执行
executor.submit(taskList.get(i));
}
//取消
System.out.println("取消结果:"+taskList.get(2).cancel(false));
System.out.println("取消状态:"+taskList.get(2).isCancelled());
//成功执行
System.out.println("成功执行返回结果"+taskList.get(1).get());
System.out.println("是否成功执行:"+taskList.get(1).isDone());
executor.shutdown();
}
}
g.其他有趣的同步工具类
• CompletionService【CompletionServiceDemo】这是一种类似于生产者/消费者的结构体,已完成的任务会进入完成队列(按照完成顺序),通过take/poll取出结果。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CompletionServiceDemo {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws InterruptedException, ExecutionException {
//这是一种类似于生产者/消费者的结构体
//已完成的任务会进入完成队列,通过take/poll取出结果
ExecutorService es = Executors.newFixedThreadPool(5);
CompletionService cs = new ExecutorCompletionService(es);
for(int i=0;i<5;i++){
final int result = i;
Callable task = new Callable(){
@Override
public Object call() {
try {
//任务执行2秒等待
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
};
cs.submit(task);
}
Future pf = cs.poll();
System.out.println("poll:直接取回结果,如果任务队列里都没有完成的,则返回null:"
+(pf==null?null:pf.get()));
System.out.println("take:等待返回结果:"+cs.take().get());
es.shutdown();
}
}
• Semaphore【SemaphoreDemo】
以计数的方式控制访问权,很有趣的同步控制类。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
Semaphore s = new Semaphore(5);
System.out.println("你只有5个可访问权,我要6个");
//神奇的设计在于此
//虽然构造器里只定了5个,但是只要释放1个,访问权就能突破限制变成6了!
//所以使用的时候要小心
// TODO 释放这段代码看看
//s.release();
s.acquire(6);
System.out.println("一共只有5个可访问权,却要6个,所以永远也到不了这里");
}
}
• CyclicBarrier【CyclicBarrierDemo】和CountdownLatch【CountdownLatchDemo】
CyclicBarrier初始化时需要提供一个计数参数,当前线程通过该对象await方法,将进入阻塞状态并等待其他线程调用await,直到所有线程(指定计数参数数量)都执行await,然后在该点允许所有线程继续执行。
CountdownLatch与CyclicBarrier类似初始化是需提供计数参数,区别在于,通过该对象的countDown方法对计数做递减1,当计数为0时,调用了await而等待的线程将全部继续执行。计数不能重用。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
final CyclicBarrier cb = new CyclicBarrier (5);
while(true){ //CyclicBarrier可重复使用
for(int i=1;i<6;i++){
final int index = i;
Thread t = new Thread(){
public void run() {
try {
System.out.println("线程"+index+"开启");
Thread.sleep(index*1000);
System.out.println("线程"+index+"完成,进入等待状态");
cb.await();
//cb.await(index, TimeUnit.SECONDS);
//限定等待时间,如果超时等待线程将会抛出TimeoutException
//其他线程将会抛出BrokenBarrierException
System.out.println("全部线程到达");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
};
t.start();
}
Thread.sleep(6000);
}
}
}
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
final CyclicBarrier cb = new CyclicBarrier (5);
while(true){ //CyclicBarrier可重复使用
for(int i=1;i<6;i++){
final int index = i;
Thread t = new Thread(){
public void run() {
try {
System.out.println("线程"+index+"开启");
Thread.sleep(index*1000);
System.out.println("线程"+index+"完成,进入等待状态");
cb.await();
//cb.await(index, TimeUnit.SECONDS);
//限定等待时间,如果超时等待线程将会抛出TimeoutException
//其他线程将会抛出BrokenBarrierException
System.out.println("全部线程到达");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
};
t.start();
}
Thread.sleep(6000);
}
}
}
• Exchanger【ExchangerDemo】
Exchanger 类似于一个计数值为2的CyclicBarrier,区别在于,调用exchange方法进入线程阻塞,同时该方法需要一个待交换的数据作为参数,当两个线程都调用exchange时,待交换的参数将互相交换。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) throws InterruptedException {
final Exchanger<String> ex = new Exchanger<String>();
for(int i=1;i<3;i++){
final int index = i;
Thread t = new Thread(){
private String object = index+"";
public void run() {
try {
System.out.println("线程"+index+"开启进入等待交换数据状态当前线程"
+index+"的数据是"+object);
Thread.sleep(index*1000);
object = ex.exchange(object);
System.out.println("交换完成,当前线程"+index+"的数据是"+object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
}
}
h.Lock接口,ReentrantLock 更易控制的synchronized【ReentrantLockDemo】
ReentrantLock提供更细粒度和更加可控的同步功能,允许多个锁定实现,同时提供一些内置锁定缺少的功能,如计时的等待、可中断的等待、锁定轮询、每个锁定有多个条件等待集合以及无阻塞结构的锁定。但是,如果不需要以上特殊提供的功能,建议使用java自带的同步原语synchronized。
• synchronized可以在jvm级别做优化,jdk版本越高,优化性能越好。
• 使用ReentrantLock,很容易忘记释放锁,这会引起很多死锁问题。
ReentrantLock 实现的锁定规则非常简单 -- 每当一个线程具有锁定时,其他线程必须等待,直到该锁定可用。有时,当对数据结构的读取通常多于修改时,可以使用更复杂的称为读写锁定的锁定结构,它允许有多个并发读者,同时还允许一个写入者独占锁定。该方法在一般情况下(只读)提供了更大的并发性,同时在必要时仍提供独占访问的安全性。ReadWriteLock 接口和 ReentrantReadWriteLock 类提供这种功能 -- 多读者、单写入者锁定规则,可以用这种功能来保护共享的易变资源。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
try{
//阻塞获得锁
lock.lock();
//若锁被其他线程获得,则立即返回false
//lock.tryLock();
//其他方法balabala
//条件原语的支持
Condition condition = lock.newCondition();
condition.wait();
}finally{
//切记在finally中释放锁!!!
lock.unlock();
}
}
}
i.Condition 更易控制的wait/notify【ReentrantLockDemo】
a.将条件原语时,首先要知道Java为对象的各自创建了两个池,锁池和等待池。• 锁池,若线程A已经拥有了某个对象的锁,那其他欲占用该对象锁的线程将进入锁池,等待线程A释放锁。
• 等待池,线程A已经拥有了某个对象的锁,在同步原语的范围内因某些条件没满足而不能继续执行,他可以先放弃持有的对象锁,并进入该对象的等待池,当别的线程执行了该对象的notify()/notifyAll()方法,将激活该对象等待池中的线程。notify()将随机激活一个线程,notifyAll()将激活全部线程,没有抢到锁的进入锁池。
b.这里需注意,sleep()和yeild()方法并不会释放锁,而只是让线程进入等待或挂起状态。
c.配合Lock接口的newCondition()方法,Condition是对wait()/notify()/notifyAll()的具体实现,他提供了await()/signal()/signalAll()方法以实现在Lock范围内的条件控制。
j.Atomic 原子类型对象【AtomicDemo】
原子类型这些类公开对 JVM 的低级别改进,允许进行具有高度可伸缩性的原子读-修改-写操作。大多数现代CPU都有原子读-修改-写的原语,比如比较并交换(CAS)或加载链接/条件存储(LL/SC)。原子变量类使用硬件提供的最快的并发结构来实现。
package com.qfang.learn.z_otherYouDecide.concurrent;
import java.util.concurrent.atomic.AtomicLong;
public class AtomicDemo {
public static void main(String[] args) {
//提供原子级别的操作,不需要同步语句,带来更高的性能和伸缩性
AtomicLong l = new AtomicLong();
//增加数
long addL = l.addAndGet(1l);
//递增后
long inr = l.getAndIncrement();
//对比并赋值
boolean isSet = l.compareAndSet(1l, 1l);
//其他balabala方法,自行参看源码介绍
}
}