并发工具概述
体系结构
A.并发工具处于java.util.concurrent包
B.其实包括的内容有:
-同步器
-执行器
-并发集合
-Fork/join框架
-atomic包
-locks包
各组成部分和作用
同步器:为各种特定的同步问题提供了解决方案
执行器:用来管理线程的执行
并发集合:提供了集合框架中集合的并发版本
Fork/Join框架:提供了对并发编程的支持
atomic包:提供了不需要锁即可完成并发环境变量使用的原子性操作
locks包:使用lock接口为并发编程提供了同步的另一种替代方案
同步器Semaphore 和 CountDownLatch API介绍和代码实现
同步器 Semaphore(信号量)
Semaphore - API
Semaphpre - 演示代码
package com.semaphore.demo1;
import java.util.concurrent.Semaphore;
/**
* 主函数 - 类
*
*演示目标:模拟银行窗口服务流程
*
* 条件: 1.有两个服务窗口
* 2.三名顾客在等待服务
* 代码实现
* */
public class DemoCase {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
Person p1 = new Person(semaphore, "A");
p1.start();
Person p2 = new Person(semaphore, "B");
p2.start();
Person p3 = new Person(semaphore, "C");
p3.start();
}
}
/**
* 功能 - 类
*
* */
class Person extends Thread
{
private Semaphore semaphore;
public Person(Semaphore semaphore ,String name) {
setName(name);
this.semaphore = semaphore;
}
@Override
public void run() {
System.out.println(getName()+" is waiting ...");
try {
semaphore.acquire();
System.out.println(getName()+" is Servicing...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+"is Done!");
semaphore.release();
}
}
Semaphpre - 运行结果
Semaphpre - 使用总结
Semaphore 同步器运行特性:
1.可以自定义同时并发的线程数量。
如:线程数量为3,但设置信号量为2,就会先并发执行两条线程,然后执行第3条线程。
2.并发执行线程的顺序是随机的,也就是说没有固定顺序。
Semaphore API应用及其运行步骤
1.功能类 书写步骤
1. - semaphore.acquire();
2. - 功能实现部分
3. - semaphore.release();
2.调用类 书写步骤
1. -Semaphore semaphore = new Semaphore(2);
2.Person p1 = new Person(semaphore, "A");
p1.start();
CountDownLatch(计数栓)
CountDownLatch - API
CountDownLatch - 演示代码
package com.countdownlatch.demo1;
import java.util.concurrent.CountDownLatch;
/**
* 主函数 - 类
*
* 演示目标:模拟赛跑比赛,裁判员倒数3,2,1之后,所有线程并发执行
*
* 演示代码
* */
public class DemoCase {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Racer(countDownLatch,"A").start();
new Racer(countDownLatch,"B").start();
new Racer(countDownLatch,"C").start();
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(1000);
System.out.println(3-i);
if(i ==2 )
{
System.out.println("开始起跑");
}
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 功能 - 类
*
* */
class Racer extends Thread
{
private CountDownLatch countDownLatch;
public Racer(CountDownLatch countDownLatch,String name) {
setName(name);
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
countDownLatch.await();
for (int i = 0; i < 3; i++) {
System.out.println(getName()+": "+i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
super.run();
}
}
CountDownLatch - 运行结果
CountDownLatch - 使用总结
CountDownLatch 同步器运行特性:
1.可以自定义线程运行的时间
2.同样线程的先后执行顺序也是无序的,随机执行。
CountDownLatch API的应用以及书写方式
功能类:完成需求功能即可。
调用类:CountDownLatch countDownLatch = new CountDownLatch(3);
countDownLatch.countDown();
同步器- CyclicBarrier、Exchanger 和 Phaser API介绍和代码实现
同步器- CyclicBarrier(关卡)
CyclicBarrier - API
CyclicBarrier - 演示代码
package com.CyclicBarrier.demo1;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class DemoCase {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Runnable() {
@Override
public void run() {
System.out.println("开始 游戏");
}
});
new Player(cyclicBarrier, "A").start();
new Player(cyclicBarrier, "B").start();
new Player(cyclicBarrier, "C").start();
}
}
class Player extends Thread
{
private CyclicBarrier cyclicBarrier ;
public Player(CyclicBarrier cyclicBarrier,String name) {
setName(name);
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(getName()+" is waiting other palyers ...");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CyclicBarrier - 运行结果
CyclicBarrier - 使用总结
CyclicBarrier 同步器特性
1.指定线程数量、以及线程都到达后执行的操作
CyclicBarrier API的应用以及书写方式
功能类
1.在功能类书写功能代码,功能代码后添加
cyclicBarrier.await();
调用类:
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Runnable() {
@Override
public void run() {
System.out.println("开始 游戏");
}
});
同步器 - Exchanger(交换器)
Exchanger - API
Exchanger - 演示代码
package com.exchanger.demo1;
import java.util.concurrent.Exchanger;
public class DemoCase {
public static void main(String[] args) {
Exchanger<String> ex = new Exchanger<>();
new B(ex).start();
new A(ex).start();
}
}
class A extends Thread
{
private Exchanger<String> ex;
public A(Exchanger<String> ex) {
this.ex = ex;
}
@Override
public void run() {
String str = null ;
try {
str = ex.exchange("Hello?");
System.out.println("A的数据:"+str);
str = ex.exchange("A");
System.out.println("A的数据:"+str);
str = ex.exchange("B");
System.out.println("A的数据:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class B extends Thread
{
private Exchanger<String> ex;
public B(Exchanger<String> ex) {
this.ex = ex;
}
@Override
public void run() {
String str = null ;
try {
str = ex.exchange("Hey?");
System.out.println("B的数据:"+str);
str = ex.exchange("1");
System.out.println("B的数据:"+str);
str = ex.exchange("2");
System.out.println("B的数据:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Exchanger - 运行结果
Exchanger - 使用总结
Exchanger 同步器特性
1.两条线程 数据的相互交换
Exchanger API的应用以及书写方式
功能类
1.创建两个线程类
2.在两条线程类中完成功能,使用 ex.exchange("Hello?"); 准备要交换的数据
调用类:
1.Exchanger<String> ex = new Exchanger<>();
2.调用功能类即可。
同步器 - Phaser(相位器)
Phaser - API
Phaser - 演示代码
package com.phaser.demo1;
import java.util.concurrent.Phaser;
/**
* 演示目标:模拟餐厅服务
* 条件:
* 1,有三位客人
* 2. 有三个服务环节:服务员(接待)、厨师(做菜)、传菜(给客人上菜)
* */
public class DemoCase {
public static void main(String[] args) {
Phaser phaser = new Phaser(1);
System.out.println("准备启动。。。");
new Worker(phaser, "服务员").start();
new Worker(phaser, "厨师").start();
new Worker(phaser, "上菜").start();
for (int i = 1; i <=3; i++) {
System.out.println(i+"号订单开始处理");
phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
System.out.println("All Done!");
}
}
/**
* 功能类:
* 实现了每个岗位的工作
* */
class Worker extends Thread
{
private Phaser phaser ;
public Worker(Phaser phaser ,String name)
{
setName(name);
this.phaser = phaser;
phaser.register();
}
@Override
public void run()
{
for(int i = 1; i<=3; i++)
{
System.out.println("当前服务的是:"+getName());
if(i == 3)
{
phaser.arriveAndDeregister();
}else
{
phaser.arriveAndAwaitAdvance();
}
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
Phaser - 运行结果
Phaser - 使用总结
Phaser 同步器特性
工作方式与CyclicBarrrier类似,但是可以定义多个阶段
Phaser 的API的使用
1,首先在类的构造方法中先注册 phaser.register();
2. 其次有两种情况
当线程没有执行完,可以等待 phaser.arriveAndAwaitAdvance();
当线程执行完,要注销自己 phaser.arriveAndDeregister();