并发编程(二)concurrent 工具类
一、CountDownLatch
经常用于监听某些初始化操作,等初始化执行完毕后,通知主线程继续工作。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest extends Thread {
private final static CountDownLatch countDown = new CountDownLatch(2); // (1)
@Override
public void run() {
// 唤醒线程线程
countDown.countDown(); // (2)
System.out.println(Thread.currentThread().getName() + "执行完毕...");
}
public static void main(String[] args) {
new Thread(new CountDownLatchTest()).start();
new Thread(new CountDownLatchTest()).start();
try {
Thread.sleep(1000);
countDown.await(); // (3)
System.out.println(Thread.currentThread().getName() + "继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-
声明一个 CountDownLatch 对象,参数 2 表示被阻塞的线程需要被唤醒再次才能执行。
final CountDownLatch countDown = new CountDownLatch(2);
-
countDown() 调用两次后,主线程才会继续执行
countDown.countDown();
-
阻塞当前线程-main
countDown.await();
-
执行结果如下:
Thread-1执行完毕...
Thread-0执行完毕...
main继续执行... // Thread-0, Thread-1 执行完成才会继续执行主线程
二、CyclicBarrier
假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个没有准备了,大家都等待。
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name + " 准备OK.");
barrier.await(); //(1)
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2); // (2)
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(new Thread(new Runner(barrier, "Thread-1")));
executor.submit(new Thread(new Runner(barrier, "Thread-2")));
executor.shutdown();
}
}
-
await() 阻塞当前的线程。
barrier.await();
-
声明一个 CyclicBarrier 对象,参数 2 表示 barrier 必须有两个线程都准备好了才能执行。
CyclicBarrier barrier = new CyclicBarrier(2);
-
执行结果如下:
Thread-1 准备OK.
Thread-2 准备OK.
Thread-1 Go!!
Thread-2 Go!! -
修改
CyclicBarrier barrier = new CyclicBarrier(3)
后这两个线程都会被阻塞, 执行结果如下:Thread-1 准备OK.
Thread-2 准备OK.
三、Future
四、Semaphore
Semaphore 信号量非常适合高并发访问。
public class UseSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5); // (1)
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire(); // (2)
System.out.println("Accessing: " + NO);
//模拟实际业务逻辑
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release(); // (3)
} catch (InterruptedException e) {
;
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(semp.getQueueLength());
// 退出线程池
exec.shutdown();
}
}
-
声明一个 Semaphore 对象,参数 5 表示最多有5个线程同时访问。
final Semaphore semp = new Semaphore(5);
semp.acquire()
获取 semp 对象,如果超过5个线程,那么其余的线程就会阻塞,直到有线程执行完毕。semp.release()
释放 semp 对象,这样其余的线程就可以执行了。
补充:
PV(page view) 网站的总访问量,页面浏览量或点击量,用户每刷新一次就会记录一次。
UV(unique vistor) 访问网站的一台电脑客户端为一个访客。一般来讲,时间上以00:00~24:00之内相同的客户端记录一次。
QPS(query per second) 即每秒查询数,QPS 很大程度代表了系统业务的繁忙程度。一旦当前 QPS 超过所设定的预警阀值,可以考虑对集群扩容,以免压力过大导致宕机。
RT(response time) 即请求的响应时间,这个指标非常关键,直接说明客户端的体验,因此任何系统设计师都想降低 RT 时间。
对系统进行峰值评估,采用所谓的80/20原则,即80%的请求20%的时间到达:
QRS = (PV * 80%) / (24 * 60 * 60 * 20%)
五、ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest implements Runnable {
private Lock lock = new ReentrantLock(); // (1)
public void run(){
try {
lock.lock(); // (2)
System.out.println(Thread.currentThread().getName() + "进入..");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "退出..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // (3)
}
}
public static void main(String[] args) throws InterruptedException {
final ReentrantLockTest ur = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
new Thread(ur).start();
}
}
}
-
ReentrantLock 一般用法:
private Lock lock = new ReentrantLock();
try {
lock.lock();
//do something
} finally {
lock.unlock();
} -
condition 使用方法,注意 condition 可以实例化多个:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await(); //阻塞线程,释放锁
condition.signal();//唤醒线程,不释放锁 -
公平锁(true)和非公平锁(false),非公平锁执行效率比公平锁高
Lock lock = new ReentrantLock(boolean isFair);
-
读写锁,实现读写分离的锁,适用于读多写少的情况下(读读共享,读写互斥)
private ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); // (1)
private ReadLock readLock = rwlock.readLock(); // (2)
private WriteLock writeLock = rwlock.writeLock(); // (3) public void read(){
try {
readLock.lock();
// do something
} finally {
readLock.unlock();
}
} public void write(){
try {
writeLock.lock();
// do something
} finally {
writeLock.unlock();
}
}
每天用心记录一点点。内容也许不重要,但习惯很重要!