目录
· 线程安全
· 互斥同步
· 非阻塞同步
· 无同步
· 线程间通信
· 哲学家进餐问题
· 读者-写者问题
· 线程内共享
· 定时器
· JDK5新功能
· 线程池
· 并发集合
· 系统峰值评估
· 峰值参数
· 评估方法
线程安全
1. 线程安全:当多个线程访问某一个类(对象或方法)时,这个类(对象或方法)始终能表现出正确的行为,那么这个类(对象或方法)就是线程安全的。
2. 线程不安全的示例。期望结果是100000,实际却不是,并且每次执行结果可能不同。
import java.util.ArrayList;
import java.util.List; public class Test { public static void main(String[] args) throws Exception {
MyNumber myNumber = new MyNumber();
List<MyThread> myThreads = new ArrayList<>();
for (int index = 0; index < 100; index++) {
MyThread myThread = new MyThread(myNumber);
myThreads.add(myThread);
myThread.start();
}
for (MyThread myThread : myThreads) {
myThread.join();
}
System.out.println(myNumber.value());
} } class MyNumber { private int n = 0; public int value() {
return n;
} public void increate() {
n = n + 1;
} } class MyThread extends Thread { private MyNumber myNumber; public MyThread(MyNumber myNumber) {
this.myNumber = myNumber;
} @Override
public void run() {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} for (int index = 0; index < 1000; index++) {
myNumber.increate();
}
} }
线程安全的实现方法
互斥同步
1. 最基本互斥同步手段是synchronized关键字。
2. 例如,上一个线程不安全示例的解决方法如下,两种方法等价,都是对当前对象加锁。
public synchronized void increate() {
n = n + 1;
}
public void increate() {
synchronized (this) {
n = n + 1;
}
}
3. synchronized关键字是一个重量级操作,因为Java线程是映射到操作系统原生线程上的,如果要阻塞或唤醒线程,都需要操作系统完成,那么线程需要从用户态转换到核心态,状态转换会消耗很多的CPU时间。
4. 额外注意,“static synchronized”方法与“synchronized (MyClass.class) {…}”等价,都是对类的字节对象加锁。
非阻塞同步
1. 互斥同步属于悲观并发策略,非阻塞同步属于乐观并发策略。
2. 即先行操作,如果没有其他线程争用共享数据,那么算操作成功;如果共享数据有争用,产生冲突,那么再采取其他补偿措施(最常见的就是不断重试,直到成功为止)。由于无须挂起线程,所以是非阻塞的。
3. java.util.concurrent.atomic.*包下的原子类就是这个原理,核心操作是CAS(Compare and set,利用x86 CPU的CMPXCHG原子指令实现),具体由sun.misc.Unsafe实现。
4. java.util.concurrent.atomic.*包(JDK5以后)下的原子类分4组(常用已高亮):
a) AtomicBoolean、AtomicInteger、AtomicLong,线程安全的基本类型的原子性操作。
b) AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray,线程安全的数组类型的原子性操作,操作的不是整个数组,而是数组中的单个元素。
c) AtomicLongFieldUpdater、AtomicIntegerFieldUpdater、AtomicReferenceFieldUpdater,基于反射原理对象中的基本类型(长整型、整型和引用类型)进行线程安全的操作。
d) AtomicReference、AtomicMarkableReference、AtomicStampedReference,线程安全的引用类型及防止ABA问题的引用类型的原子操作。
5. 线程不安全示例的不阻塞解决方法如下。
class MyNumber { private AtomicInteger n = new AtomicInteger(0); public int value() {
return n.get();
} public void increate() {
n.addAndGet(1);
} }
5. 参考资料:
a) http://www.cnblogs.com/nullzx/p/4967931.html
b) http://zl198751.iteye.com/blog/1848575
c) http://www.cnblogs.com/Mainz/p/3546347.html
无同步
如果本来就不涉及共享数据,那么就无须任何同步措施,代码天生就是线程安全的。
volatile关键字
1. volatile关键字的主要作用是使变量在多个线程间可见。
2. 每个线程都会有一块工作内存区,其中存放着所有线程共享的主内存中的变量值的拷贝。当线程执行时,它在自己的工作内存区中操作这些变量。为了存取共享变量,线程通常先获取锁定并去清除它的工作内存区,把这些共享变量从所有线程的工作内存区中正确的装入其中,当线程解锁时保证该工作内存区中的值写回到共享内存中。volatile的作用就是强制线程到主内存(共享内存)去读取变量,而不去线程工作内存区读取。
public class Test { public static void main(String[] args) {
MyThread myThread = new MyThread();
myThread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
myThread.run = false;
System.out.println(myThread.run);
} } class MyThread extends Thread { public volatile boolean run = true; @Override
public void run() {
while (run) {
// running
}
System.out.println(Thread.currentThread().getName() + " stoped.");
} }
3. volatile虽然保证了多个线程间的可见性,但不具备同步性。只能算轻量级的synchronized,性能要比synchronized强很多,不会造成阻塞。上一个线程不安全示例用如下方式不能保证结果正确。
class MyNumber { private volatile int n = 0;// 错误代码 public int value() {
return n;
} public void increate() {
n = n + 1;
} }
线程间通信
Object.wait()方法
1. 使执行instance.wait()方法的当前线程暂停,直至调用该对象instance.notify()或instance.notifyAll()方法唤醒该线程。
2. 当前线程必须先对该对象instance加锁(即synchronized)才可调用instance.wait()方法,否则抛出异常IllegalMonitorStateException。
3. 可能存在中断和虚唤醒,所以一般在循环中执行instance.wait()方法。
4. 综上三点所述,套路如下。
synchronized (instance) {
while (判断是否可占用资源) {
instance.wait();
}
}
5. 注意Object.wait()方法与Thread.sleep()方法的区别:wait()方法暂停时会释放synchronized的资源,而sleep()方法不会。
Object.notify()方法
1. 唤醒执行了instance.wait()方法的线程。如果存在多个这样的线程,则任意唤醒一个。
2. 当前线程必须先对该对象instance加锁(即synchronized)才可调用instance.notify()方法,否则抛出异常IllegalMonitorStateException。
3. 综上两点所述,套路如下。
synchronized (instance) {
instance.notify();
}
4. Object.notifyAll()方法与Object.notify()方法类似,区别是如果存在多个等待的线程,则全部唤醒。
编写线程间通信代码的套路
由于执行instance.wait()和instance.notify()方法都必须先synchronized (instance),所以这两个方法应写在一个类中。
public class ExclusiveResource { // 互斥资源
public synchronized void holdAndUse() {
while (<check resource>) { // 检查是否可占用资源,否则等待
wait();
}
// 执行占用资源后的代码
notify();
}
}
面试题:子线程、主线程交替循环
子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程有循环100次,如此循环50次。
public class Test { public static void main(String[] args) {
System.out.println("--------"); Looper looper = new Looper();
SubThread subThread = new SubThread(looper);
subThread.start();
looper.main();
} } class SubThread extends Thread { private Looper looper; public SubThread(Looper looper) {
this.looper = looper;
} @Override
public void run() {
looper.sub();
} } class Looper { private boolean runSub = true; public void sub() {
for (int i = 0; i < 50; i++) {
synchronized (this) {
while (!runSub) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int j = 0; j < 10; j++) {
System.out.println("子线程第" + i + "次循环" + j + "。");
}
runSub = false;
notify();
}
}
} public void main() {
for (int i = 0; i < 50; i++) {
synchronized (this) {
while (runSub) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int j = 0; j < 100; j++) {
System.out.println("主线程第" + i + "次循环" + j + "。");
}
runSub = true;
notify();
}
}
} }
生产者-消费者问题
一组生产者进程和一组消费者进程共享一个初始为空、大小为 n 的缓冲区,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中取出消息,否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) {
Container container = new Container();
new Producer(container).start();
new Producer(container).start();
new Producer(container).start();
new Consumer(container).start();
new Consumer(container).start();
} } class Product { private static final AtomicInteger COUNT = new AtomicInteger(0); private int id; public Product() {
id = COUNT.getAndIncrement();
} @Override
public String toString() {
return "Product-" + id;
} } class Container { private final static int CAPACITY = 10; private Product[] products = new Product[CAPACITY]; private int size; public synchronized void push(Product product) {
while (size >= CAPACITY) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products[size] = product;
size++;
notify();
} public synchronized Product pop() {
while (size <= 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
size--;
Product product = products[size];
notify();
return product;
} } class Producer extends Thread { private static int count; private Container container; public Producer(Container container) {
super("Producer-" + count++);
this.container = container;
} @Override
public void run() {
while (true) {
Product product = new Product();
System.out.println(getName() + " produced " + product);
container.push(product); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} } class Consumer extends Thread { private static int count; private Container container; public Consumer(Container container) {
super("Consumer-" + count++);
this.container = container;
} @Override
public void run() {
while (true) {
Product product = container.pop();
System.out.println(getName() + " consumed " + product); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
哲学家进餐问题
一张圆桌上坐着 5 名哲学家,桌子上每两个哲学家之间摆了一根筷子,桌子的中间是一碗米饭。哲学家们倾注毕生精力用于思考和进餐,哲学家在思考时,并不影响他人。只有当哲学家饥饿的时候,才试图拿起左、右两根筷子(一根一根拿起)。如果筷子已在他人手上,则需等待。饥饿的哲学家只有同时拿到了两根筷子才可以开始进餐,当进餐完毕后,放下筷子继续思考。
import java.util.Random; public class Test { public static void main(String[] args) {
Table table = new Table();
new Philosopher(table).start();
new Philosopher(table).start();
new Philosopher(table).start();
new Philosopher(table).start();
new Philosopher(table).start();
} } class Chopstick { private static int count; private int id; private boolean isTakenUp; public Chopstick() {
id = count++;
} public boolean isTakenUp() {
return isTakenUp;
} public void setTakenUp(boolean isTakenUp) {
this.isTakenUp = isTakenUp;
} @Override
public String toString() {
return "Chopstick-" + id;
} } class Table { private static final int CAPACITY = 5; private Chopstick[] chopsticks = new Chopstick[CAPACITY]; public Table() {
for (int index = 0; index < chopsticks.length; index++) {
chopsticks[index] = new Chopstick();
}
} public synchronized void takeUp(int leftChopstickIndex, int rightChopstickIndex) {
Chopstick leftChopstick = chopsticks[leftChopstickIndex];
Chopstick rightChopstick = chopsticks[rightChopstickIndex];
while (leftChopstick.isTakenUp() || rightChopstick.isTakenUp()) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
leftChopstick.setTakenUp(true);
System.out.println(leftChopstick + " has been taken up.");
rightChopstick.setTakenUp(true);
System.out.println(rightChopstick + " has been taken up.");
notifyAll();
} public synchronized void putDown(int leftChopstickIndex, int rightChopstickIndex) {
Chopstick leftChopstick = chopsticks[leftChopstickIndex];
Chopstick rightChopstick = chopsticks[rightChopstickIndex];
leftChopstick.setTakenUp(false);
System.out.println(leftChopstick + " has been put down.");
rightChopstick.setTakenUp(false);
System.out.println(rightChopstick + " has been put down.");
notifyAll();
} } class Philosopher extends Thread { private static final int MAX_COUNT = 5; private static int count; private int id; private Table table; public Philosopher(Table table) {
if (count >= MAX_COUNT) {
throw new RuntimeException("Cannot create Philosopher instance more than " + MAX_COUNT + ".");
}
id = count++;
setName("Philosopher-" + id);
this.table = table;
} @Override
public void run() {
final int leftChopstickIndex = id;
final int rightChopstickIndex = (id + 1) % MAX_COUNT;
while (true) {
table.takeUp(leftChopstickIndex, rightChopstickIndex);
System.out.println(getName() + " is eating."); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(getName() + " has eaten.");
table.putDown(leftChopstickIndex, rightChopstickIndex); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
读者-写者问题
有读者和写者两组并发线程,共享一个文件,当两个或以上的读线程同时访问共享数据时不会产生副作用,但若某个写线程和其他线程(读线程或写线程)同时访问共享数据时则可能导致数据不一致的错误。因此要求:
1. 允许多个读者可以同时对文件执行读操作;
2. 只允许一个写者往文件中写信息;
3. 任一写者在完成写操作之前不允许其他读者或写者工作;
4. 写者执行写操作前,应让已有的读者和写者全部退出。
import java.util.HashMap;
import java.util.Map;
import java.util.Random; public class Test { public static void main(String[] args) {
File file = new File();
new Reader(file).start();
new Reader(file).start();
new Reader(file).start();
new Writer(file).start();
new Writer(file).start();
} } class File { public enum Status {
idle, reading, writing,
} private Map<Long, Status> threadStatuses = new HashMap<>(); public synchronized void beginRead() {
outer: while (true) {
for (Status status : threadStatuses.values()) {
if (Status.writing.equals(status)) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
continue outer;
}
}
break;
}
threadStatuses.put(Thread.currentThread().getId(), Status.reading);
notifyAll();
} public synchronized void endRead() {
threadStatuses.put(Thread.currentThread().getId(), Status.idle);
notifyAll();
} public synchronized void beginWrite() {
outer: while (true) {
for (Status status : threadStatuses.values()) {
if (!Status.idle.equals(status)) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
continue outer;
}
}
break;
}
threadStatuses.put(Thread.currentThread().getId(), Status.writing);
notifyAll();
} public synchronized void endWrite() {
threadStatuses.put(Thread.currentThread().getId(), Status.idle);
notifyAll();
} } class Reader extends Thread { private static int count; private File file; public Reader(File file) {
super("Reader-" + count++);
this.file = file;
} @Override
public void run() {
while (true) {
file.beginRead();
System.out.println(Thread.currentThread().getName() + " is reading."); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(Thread.currentThread().getName() + " has read.");
file.endRead(); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} } class Writer extends Thread { private static int count; private File file; public Writer(File file) {
super("Writer-" + count++);
this.file = file;
} @Override
public void run() {
while (true) {
file.beginWrite();
System.out.println(Thread.currentThread().getName() + " is writing."); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(Thread.currentThread().getName() + " has wroten.");
file.endWrite(); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
线程内共享
1. ThreadLocal存放的值是线程内共享的,线程间互斥的。主要用于线程内共享数据,避免通过参数来传递,这样能优雅地解决一些实际问题。
2. ThreadLocal的原理是为每一个线程都提供了变量的副本,使得每个线程在某一时间访问到的并不是同一个对象。
3. 数据库连接工具类的举例。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException; public class ConnectionUtils { private static ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<Connection>(); public static Connection getCurrent() throws SQLException {
Connection connection = threadLocalConnection.get();
if (connection == null || connection.isClosed()) {
connection = DriverManager.getConnection("...");
threadLocalConnection.set(connection);
}
return connection;
} public static void close() throws SQLException {
Connection connection = threadLocalConnection.get();
if (connection != null && !connection.isClosed()) {
connection.close();
threadLocalConnection.remove();
}
} }
定时器
1. 定时器使用JDK的Timer和TimerTask;
2. 复杂的定时器可考虑使用Quartz框架。
import java.util.Timer;
import java.util.TimerTask; public class Test { public static void main(String[] args) {
new Timer().schedule(new MyTimerTask(), 3000);;
} } class MyTimerTask extends TimerTask { @Override
public void run() {
System.out.println("Hello World.");
} }
JDK5新功能
线程池
1. 线程池优点:
a) 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
b) 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
c) 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
2. 四种线程池:
a) 固定大小线程池:Executors.newFixedThreadPool(int nThreads)。
b) 缓存线程池:Executors.newCachedThreadPool(),线程池大小随提交任务数增加而增加。
c) 单一线程池:Executors.newSingleThreadExecutor()。
d) 调度线程池:Executors.newScheduledThreadPool(int corePoolSize),调度执行线程。
3. 关闭线程池:
a) 任务执行完毕后关闭:ExecutorService.shutdown()。
b) 不论任务是否完成立即关闭:ExecutorService.shutdownNow()。
4. 示例。
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// ExecutorService threadPool = Executors.newCachedThreadPool();
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
Task0 task0 = new Task0();
Task1 task1 = new Task1();
int taskCount = 20;
for (int index = 0; index < taskCount; index++) {
if (index % 2 == 0) {
threadPool.execute(task0);
} else {
threadPool.execute(task1);
}
}
System.out.println(taskCount + " tasks have been committed.");
threadPool.shutdown();
} } class Task0 implements Runnable { private AtomicInteger count = new AtomicInteger(0); @Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is executing Task0-" + count.getAndIncrement());
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} } } class Task1 implements Runnable { private AtomicInteger count = new AtomicInteger(0); @Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is executing Task1-" + count.getAndIncrement());
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
Callable和Future
1. Callable接口:异步任务。
2. Future接口:异步任务的结果。
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
Future<Integer> future0 = threadPool.submit(new Task());
Future<Integer> future1 = threadPool.submit(new Task());
System.out.println("Two tasks have been committed.");
System.out.println("future0=" + future0.get());
System.out.println("future1=" + future1.get());
threadPool.shutdown();
} } class Task implements Callable<Integer> { @Override
public Integer call() throws Exception {
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextInt();
} }
3. CompletionService接口:提交一组Callable任务,哪个先执行完则先返回结果。
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
Task task = new Task();
int taskCount = 10;
for (int index = 0; index < taskCount; index++) {
completionService.submit(task);
}
for (int index = 0; index < taskCount; index++) {
Future<Integer> future = completionService.take();
System.out.println("future" + index + "=" + future.get());
}
threadPool.shutdown();
} } class Task implements Callable<Integer> { private AtomicInteger count = new AtomicInteger(0); @Override
public Integer call() throws Exception {
int ret = count.getAndIncrement();
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
return ret;
} }
ReentrantLock
1. java.util.concurrent.locks.ReentrantLock与synchronized作用类似,只是更面向对象。
2. 注意:Lock应该与try { … } finally { … }使用,保证锁一定释放。
3. 线程不安全示例的Lock写法。
class MyNumber { private Lock lock = new ReentrantLock(); private int n = 0; public int value() {
return n;
} public void increate() {
try {
lock.lock();
n = n + 1;
} finally {
lock.unlock();
}
} }
ReadWriteLock
1. java.util.concurrent.locks.ReadWriteLock包含读锁和写锁,类似“读者-写者问题”中的读者和写者,即多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥。
2. “读者-写者问题”的读锁、写锁写法。
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test { public static void main(String[] args) {
File file = new File();
new Reader(file).start();
new Reader(file).start();
new Reader(file).start();
new Writer(file).start();
new Writer(file).start();
} } class File { private ReadWriteLock lock = new ReentrantReadWriteLock(); public void beginRead() {
lock.readLock().lock();
} public void endRead() {
lock.readLock().unlock();
} public void beginWrite() {
lock.writeLock().lock();
} public void endWrite() {
lock.writeLock().unlock();
} } class Reader extends Thread { private static int count; private File file; public Reader(File file) {
super("Reader-" + count++);
this.file = file;
} @Override
public void run() {
while (true) {
try {
file.beginRead();
System.out.println(Thread.currentThread().getName() + " is reading."); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(Thread.currentThread().getName() + " has read.");
} finally {
file.endRead();
} try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} } class Writer extends Thread { private static int count; private File file; public Writer(File file) {
super("Writer-" + count++);
this.file = file;
} @Override
public void run() {
while (true) {
try {
file.beginWrite();
System.out.println(Thread.currentThread().getName() + " is writing."); try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println(Thread.currentThread().getName() + " has wroten.");
} finally {
file.endWrite();
} try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
3. 使用ReadWriteLock优化缓存:第1个线程写,后续线程只读(类似ReentrantReadWriteLock JDK文档中的示例)。
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; class Cache { private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Map<Object, Object> data = new HashMap<>(); public Object getValue(Object key) {
readWriteLock.readLock().lock();
Object value = null;
try {
value = data.get(key);
if (value == null) {
readWriteLock.readLock().unlock();
readWriteLock.writeLock().lock();
try {
if (value == null) {
value = new Random().nextInt(1000); // 模拟去数据库查询数据
}
} finally {
readWriteLock.writeLock().unlock();
}
readWriteLock.readLock().lock();
} } finally {
readWriteLock.readLock().unlock();
}
return value;
} }
Condition
1. java.util.concurrent.locks.Condition类似Object.wait()和Object.notify()/Object.notifyAll()方法,实现了线程间通信。
2. “子线程、主线程交替循环”面试题的Condition写法。
class Looper { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean runSub = true; public void sub() {
for (int i = 0; i < 50; i++) {
lock.lock();
try {
while (!runSub) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int j = 0; j < 10; j++) {
System.out.println("子线程第" + i + "次循环" + j + "。");
}
runSub = false;
condition.signal();
} finally {
lock.unlock();
}
}
} public void main() {
for (int i = 0; i < 50; i++) {
lock.lock();
try {
while (runSub) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int j = 0; j < 100; j++) {
System.out.println("主线程第" + i + "次循环" + j + "。");
}
runSub = true;
condition.signal();
} finally {
lock.unlock();
}
}
} }
3. 与Object.wait()和Object.notify不同的是,Condition能实现多路暂停和唤醒,具体示例参考Condition JDK文档。
Semaphore
1. java.util.concurrent.Semaphore信号量可控制同时访问资源的线程个数。
2. Semaphore可以由一个线程获得锁,而由另一个线程释放锁,这可以应用与死锁恢复的场景。
3. Semaphore适合做网站流量控制,拿到信号量的线程可以进入,否则只能等待。
4. 示例。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; public class Test { public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
Semaphore semaphore = new Semaphore(3);
Task task = new Task(semaphore);
for (int index = 0; index < 10; index++) {
threadPool.execute(task);
}
threadPool.shutdown();
} } class Task implements Runnable { private Semaphore semaphore; public Task(Semaphore semaphore) {
this.semaphore = semaphore;
} @Override
public void run() {
try {
semaphore.acquire();
System.out.println(semaphore.availablePermits() + " permit(s) left.");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId() + " is done."); } catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
} }
CyclicBarrier
1. java.util.concurrent.CyclicBarrier阻塞一组线程,直到达到指定的阻塞线程数量后才停止阻塞。类似生活中集合的场景,10小伙伴约定地点集合,先到着等待,等全都到齐才出发。
2. 示例。
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Test { public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
Task task = new Task(cyclicBarrier);
for (int index = 0; index < 9; index++) {
threadPool.execute(task);
}
threadPool.shutdown();
} } class Task implements Runnable { private CyclicBarrier cyclicBarrier; public Task(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
} @Override
public void run() {
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行完毕。当前等待的线程个数是" + cyclicBarrier.getNumberWaiting() + "。");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
} }
CountDownLatch
1. java.util.concurrent.CountDownLatch倒数器,执行CountDownLatch.countDown()计数器减1,当计数器为0时,则所有等待的线程停止阻塞。
2. 可实现一个线程等待多个线程,也可实现多个线程等待一个线程。以3个运动员、1个裁判发令并统计结果为示例。
import java.util.Random;
import java.util.concurrent.CountDownLatch; public class Test { public static void main(String[] args) {
CountDownLatch beginCountDownLatch = new CountDownLatch(1);
CountDownLatch endCountDownLatch = new CountDownLatch(3);
new Runner(beginCountDownLatch, endCountDownLatch).start();
new Runner(beginCountDownLatch, endCountDownLatch).start();
new Runner(beginCountDownLatch, endCountDownLatch).start();
new Judge(beginCountDownLatch, endCountDownLatch).start();
} } class Runner extends Thread { private CountDownLatch beginCountDownLatch; private CountDownLatch endCountDownLatch; public Runner(CountDownLatch beginCountDownLatch, CountDownLatch endCountDownLatch) {
this.beginCountDownLatch = beginCountDownLatch;
this.endCountDownLatch = endCountDownLatch;
} @Override
public void run() {
try {
System.out.println("Runner " + getId() + " is ready.");
beginCountDownLatch.await();
Thread.sleep(new Random().nextInt(500));
System.out.println("Runner " + getId() + " has run.");
endCountDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
} } class Judge extends Thread { private CountDownLatch beginCountDownLatch; private CountDownLatch endCountDownLatch; public Judge(CountDownLatch beginCountDownLatch, CountDownLatch endCountDownLatch) {
this.beginCountDownLatch = beginCountDownLatch;
this.endCountDownLatch = endCountDownLatch;
} @Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("Judge says \"Go!\"");
beginCountDownLatch.countDown();
System.out.println("Judge is waiting for runners.");
endCountDownLatch.await();
System.out.println("Judge has waited."); } catch (InterruptedException e) {
e.printStackTrace();
}
} }
Exchanger
1. java.util.concurrent.Exchanger可实现两个线程间的数据交互。先执行Exchanger.exchange()方法的线程被阻塞,直到另一个线程也执行Exchanger.exchange()方法才停止,两条线程交换完数据后继续执行。
2. 示例。
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Exchanger; public class Test { public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<String>();
new MyThread(exchanger).start();
new MyThread(exchanger).start();
} } class MyThread extends Thread { private Exchanger<String> exchanger; public MyThread(Exchanger<String> exchanger) {
this.exchanger = exchanger;
} @Override
public void run() {
String value = UUID.randomUUID().toString();
System.out.println(Thread.currentThread().getName() + "准备用" + value + "交换。");
String newValue = null;
try {
Thread.sleep(new Random().nextInt(5000));
newValue = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "交换得到" + newValue + "。");
} }
BlockingQueue
1. java.util.concurrent.BlockingQueue为阻塞队列,类似“生产者-消费者问题”中的容器,“满值欲加”和“无值欲取”都会被阻塞。
2. 查JDK文档可知:
场景+操作 结果 |
“满值欲加” |
“无值欲取” |
检查 |
抛出异常 |
add(e) |
remove() |
element() |
返回值 |
offer(e) |
poll() |
peek() |
阻塞 |
put(e) |
take() |
不可用 |
3. 固定长度阻塞队列ArrayBlockingQueue,不固定长度阻塞队列LinkedBlockingDeque。
4. “生产者-消费者问题”的BlockingQueue写法。
class Container { private final static int CAPACITY = 10; private BlockingQueue<Product> blocingQueue = new ArrayBlockingQueue<Product>(CAPACITY); public void push(Product product) {
try {
blocingQueue.put(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
} public Product pop() {
Product product = null;
try {
product = blocingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return product;
} }
5. 两个具有1个容量的阻塞队列可实现线程间通信。“子线程、主线程交替循环问题”的BlockingQueue写法。
class Looper { private BlockingQueue<Integer> subBlockingQueue = new ArrayBlockingQueue<Integer>(1); private BlockingQueue<Integer> mainBlockingQueue = new ArrayBlockingQueue<Integer>(1); public Looper() {
subBlockingQueue.add(1);
} public void sub() {
for (int i = 0; i < 50; i++) {
try {
subBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 10; j++) {
System.out.println("子线程第" + i + "次循环" + j + "。");
}
mainBlockingQueue.add(1);
}
} public void main() {
for (int i = 0; i < 50; i++) {
try {
mainBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
System.out.println("主线程第" + i + "次循环" + j + "。");
}
subBlockingQueue.add(1);
}
} }
并发集合
1. 并发集合与传统集合、同步集合对应关系。
传统集合(非线程安全) |
同步集合(线程安全) |
并发集合(线程安全) |
ArrayList |
Vector Collections.synchronizedCollection(...) Collections.synchronizedList(...) |
CopyOnWriteArrayList |
Set |
Collections.synchronizedSet(...) |
CopyOnWriteArraySet |
TreeSet |
Collections.synchronizedNavigableSet(...) Collections.synchronizedSortedSet(...) |
ConcurrentSkipListSet |
HashMap |
Hashtable Collections.synchronizedMap(...) |
ConcurrentHashMap |
TreeMap |
Collections.synchronizedNavigableMap(...) Collections.synchronizedSortedMap(...) |
ConcurrentSkipListMap |
2. 并发集合与同步集合的区别。
a) 并发集合尽量避免synchronized,提供并发性;
b) 并发集合定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。示例如下。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList; public class Test { public static void main(String[] args) {
// List<String> list = new ArrayList<>(); // 抛出ConcurrentModificationException,或结果不正确
// List<String> list = new Vector<>(); // 抛出ConcurrentModificationException,或结果不正确
List<String> list = new CopyOnWriteArrayList<String>();
list.add("s1");
list.add("s2");
list.add("s3");
list.add("s4");
list.add("s5");
for (Iterator<String> iterator = list.iterator(); iterator.hasNext();) {
String current = iterator.next();
if ("s1".equals(current)
|| "s3".equals(current)
|| "s5".equals(current)) {
list.remove(current);
} else {
System.out.println(current);
}
}
} }
系统峰值评估
峰值参数
1. PV(Page View):网站的总访问量、页面浏览量或点击量,用户每刷新一次就会被记录一次。
2. UV(Unique Visitor):访问网站的一台电脑客户端为一个访客。一般来讲,时间上以00:00至24:00之间相同IP的客户端只记录一次。
3. QPS(Query Per Second):每秒查询数。QPS很大程度上代表了系统业务上的繁忙程度,每次查询的背后,可能对应着多次磁盘I/O,多次网络请求,多次CPU时间片等。通过QPS可以非常直观的了解当前系统业务情况,一旦当前QPS超过所设定的预警阀值,可以考虑增加机器对集群扩容,以免压力过大导致宕机。可以根据前期的压力测试得到估值,在结合后期综合运维情况,估算出阀值。
4. RT(Response Time):请求的响应时间。这个指标非常关键,直接说明前端用户的体验,任何系统设计师都想降低RT。
5. 还有设计CPU、内存、网络、磁盘等情况的参数。
评估方法
1. 一般通过开发、运维、测试以及业务等相关人员,综合出系统的一系列阀值,然后根据关键阀值(如QPS、RT等)对系统进行有效变更。
2. 进行多轮压力测试以后,可以对系统进行峰值评估,采用80/20原则,即80%的PV将在20%的时间内达到。这样计算出系统峰值QPS:
峰值QPS = ( 总PV * 80% ) / ( 60 * 60 * 24 * 20% )
总峰值QPS除以单台机器所能承受的最高QPS就是须要的机器数量:
机器数 = 总峰值QPS / 压测得出的单机极限QPS
还要考虑大型促销活动和双十一、双十二热点事件、遭受DDoS攻击等情况,系统的开发和维护人急需了解当前系统运行的状态和负载情况。
作者:netoxi
出处:http://www.cnblogs.com/netoxi
本文版权归作者和博客园共有,欢迎转载,未经同意须保留此段声明,且在文章页面明显位置给出原文连接。欢迎指正与交流。