Java中并发工具包 - 上

时间:2022-12-21 20:16:32

并发工具概述

体系结构

        A.并发工具处于java.util.concurrent包

B.其实包括的内容有:
-同步器
-执行器
-并发集合
-Fork/join框架
-atomic包
-locks包

各组成部分和作用

          同步器:为各种特定的同步问题提供了解决方案

执行器:用来管理线程的执行

并发集合:提供了集合框架中集合的并发版本

Fork/Join框架:提供了对并发编程的支持

atomic包:提供了不需要锁即可完成并发环境变量使用的原子性操作

locks包:使用lock接口为并发编程提供了同步的另一种替代方案

同步器Semaphore 和 CountDownLatch API介绍和代码实现

同步器 Semaphore(信号量)

Semaphore - API

Java中并发工具包 - 上

Semaphpre - 演示代码

package com.semaphore.demo1;

import java.util.concurrent.Semaphore;

/**
* 主函数 - 类
*
*演示目标:模拟银行窗口服务流程
*
* 条件: 1.有两个服务窗口
* 2.三名顾客在等待服务
* 代码实现
* */

public class DemoCase {

public static void main(String[] args) {
/*
* 设置两个信号量
* ps:
* 1.信号量的数量,就是我允许多少个并发线程进入到这个区域
* 2.信号量的使用
* A.先获取信号量
* B.进行实际操作。
* C.操作完成后,释放信号量
* 3.并发线程执行顺序是不固定的,也就是随机的
* */

Semaphore semaphore = new Semaphore(3);

Person p1 = new Person(semaphore, "A");
p1.start();

Person p2 = new Person(semaphore, "B");
p2.start();

Person p3 = new Person(semaphore, "C");
p3.start();
}
}

/**
* 功能 - 类
*
* */


//1.创建一个内部类,继承Thread类
class Person extends Thread
{
//2.创建全局变量 Semaphore
private Semaphore semaphore;

//3.创建类的 有参构造方法
public Person(Semaphore semaphore ,String name) {
setName(name);
this.semaphore = semaphore;
}

//4.重写Thread类的run方法
@Override
public void run() {
System.out.println(getName()+" is waiting ..."); //提示消息:等待服务

try {
semaphore.acquire(); //获取 - 信号量

System.out.println(getName()+" is Servicing...");//提示消息:服务中

Thread.sleep(1000); //为了看出效果,休眠一秒

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println(getName()+"is Done!"); //提示消息 :完成

semaphore.release(); //释放 - 信号量
}



}

Semaphpre - 运行结果

Java中并发工具包 - 上

Semaphpre - 使用总结

Semaphore 同步器运行特性:
1.可以自定义同时并发的线程数量。
如:线程数量为3,但设置信号量为2,就会先并发执行两条线程,然后执行第3条线程。
2.并发执行线程的顺序是随机的,也就是说没有固定顺序。

Semaphore API应用及其运行步骤
1.功能类 书写步骤
1. - semaphore.acquire(); //获取主函数的数量
2. - 功能实现部分
3. - semaphore.release(); //释放信号量

2.调用类 书写步骤
1. -Semaphore semaphore = new Semaphore(2);//创建 semaphore 对象实例,设置信号量的数量也就是并发执行的线程数量
2.Person p1 = new Person(semaphore, "A");
p1.start(); //创建功能类对象实例,启动线程

CountDownLatch(计数栓)

CountDownLatch - API

Java中并发工具包 - 上

CountDownLatch - 演示代码

package com.countdownlatch.demo1;

import java.util.concurrent.CountDownLatch;

/**
* 主函数 - 类
*
* 演示目标:模拟赛跑比赛,裁判员倒数3,2,1之后,所有线程并发执行
*
* 演示代码
* */

public class DemoCase {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3); //创建计数栓对象

new Racer(countDownLatch,"A").start();
new Racer(countDownLatch,"B").start();
new Racer(countDownLatch,"C").start();

for (int i = 0; i < 3; i++) {
try {
Thread.sleep(1000);
System.out.println(3-i);
if(i ==2 ) //开始起跑
{
System.out.println("开始起跑");
}
/*ps:
* 1.开始计数: 每隔一秒计数一次,总计3次也就是三秒
* 2.线程的执行顺序是不固定的,也就是随机的
* */

countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 功能 - 类
*
* */


//1.Racer(运动员)类 继承 Thread类
class Racer extends Thread
{
//2.创建共有对象
private CountDownLatch countDownLatch;

//3.创建类的有参函数
public Racer(CountDownLatch countDownLatch,String name) {
setName(name);
this.countDownLatch = countDownLatch;
}

//4.重写Thread类的run方法
@Override
public void run() {
try {

countDownLatch.await();// 等待
for (int i = 0; i < 3; i++) {
System.out.println(getName()+": "+i);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

super.run();
}
}

CountDownLatch - 运行结果

Java中并发工具包 - 上

CountDownLatch - 使用总结

CountDownLatch 同步器运行特性:
1.可以自定义线程运行的时间
2.同样线程的先后执行顺序也是无序的,随机执行。

CountDownLatch API的应用以及书写方式
功能类:完成需求功能即可。
调用类:CountDownLatch countDownLatch = new CountDownLatch(3);//创建计数栓对象,并设置计数栓的值。
countDownLatch.countDown(); //countDown()方法负责统计,每触发一次,就加1。如:计数栓设置的值是3,那么 countDown()被触发3次就会执行所有线程。

同步器- CyclicBarrier、Exchanger 和 Phaser API介绍和代码实现

同步器- CyclicBarrier(关卡)

CyclicBarrier - API

Java中并发工具包 - 上

CyclicBarrier - 演示代码

package com.CyclicBarrier.demo1;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/*
* 演示目标:模拟斗地主
* */

public class DemoCase {
public static void main(String[] args) {
/*
* 创建 CyclicBarrier对象、指定线程数量、以及线程都到达后执行的操作
* */

CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Runnable() {

@Override
public void run() {
System.out.println("开始 游戏");
}
});

new Player(cyclicBarrier, "A").start();
new Player(cyclicBarrier, "B").start();
new Player(cyclicBarrier, "C").start();
}
}

// 1.类继承 Thread类
class Player extends Thread
{
//2.创建共有对象
private CyclicBarrier cyclicBarrier ;

//3.创建类 有参构造函数
public Player(CyclicBarrier cyclicBarrier,String name) {
setName(name);
this.cyclicBarrier = cyclicBarrier;
}

//重写Thread类 run方法
@Override
public void run() {
System.out.println(getName()+" is waiting other palyers ..."); //提示消息,等待其他玩家

try {
cyclicBarrier.await();//等待线程

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

CyclicBarrier - 运行结果

Java中并发工具包 - 上

CyclicBarrier - 使用总结

CyclicBarrier 同步器特性
1.指定线程数量、以及线程都到达后执行的操作

CyclicBarrier API的应用以及书写方式
功能类
1.在功能类书写功能代码,功能代码后添加
cyclicBarrier.await();//等待线程
调用类:
//指定线程数量,以及开启一个新的线程,当所有线程都到达且运行后,共同执行的方法
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Runnable() {
@Override
public void run() {
System.out.println("开始 游戏");
}
});

同步器 - Exchanger(交换器)

Exchanger - API

Java中并发工具包 - 上

Exchanger - 演示代码

package com.exchanger.demo1;

import java.util.concurrent.Exchanger;


/**
* 演示目标 :两条线程 交换字符串
*
* */

public class DemoCase {
public static void main(String[] args) {
Exchanger<String> ex = new Exchanger<>();

new B(ex).start();
new A(ex).start();

}
}

//1.继承 Thread 类
class A extends Thread
{

//2.创建共有对象
private Exchanger<String> ex;

//3.创建类的 有参构造,传入参数
public A(Exchanger<String> ex) {
this.ex = ex;
}

//4.重写run方法
@Override
public void run() {
String str = null ;
try {
str = ex.exchange("Hello?"); //准备要交换的数据
System.out.println("A的数据:"+str);

str = ex.exchange("A"); //准备要交换的数据
System.out.println("A的数据:"+str);

str = ex.exchange("B"); //准备要交换的数据
System.out.println("A的数据:"+str);

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
//1.继承 Thread 类
class B extends Thread
{

//2.创建共有对象
private Exchanger<String> ex;

//3.创建类的 有参构造,传入参数
public B(Exchanger<String> ex) {
this.ex = ex;
}

//4.重写run方法
@Override
public void run() {
String str = null ;
try {
str = ex.exchange("Hey?"); //准备要交换的数据
System.out.println("B的数据:"+str);

str = ex.exchange("1"); //准备要交换的数据
System.out.println("B的数据:"+str);

str = ex.exchange("2"); //准备要交换的数据
System.out.println("B的数据:"+str);

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

Exchanger - 运行结果

Java中并发工具包 - 上

Exchanger - 使用总结

Exchanger 同步器特性
1.两条线程 数据的相互交换

Exchanger API的应用以及书写方式
功能类
1.创建两个线程类
2.在两条线程类中完成功能,使用 ex.exchange("Hello?"); 准备要交换的数据
调用类:
1.Exchanger<String> ex = new Exchanger<>(); //创建 Exchanger对象实例,并指定交换数据的类型。
2.调用功能类即可。

同步器 - Phaser(相位器)

Phaser - API

Java中并发工具包 - 上

Phaser - 演示代码

package com.phaser.demo1;

import java.util.concurrent.Phaser;

/**
* 演示目标:模拟餐厅服务
* 条件:
* 1,有三位客人
* 2. 有三个服务环节:服务员(接待)、厨师(做菜)、传菜(给客人上菜)
* */

public class DemoCase {
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // Phaser对象实例,管理并发线程

System.out.println("准备启动。。。");

//创建并发线程对象
new Worker(phaser, "服务员").start();
new Worker(phaser, "厨师").start();
new Worker(phaser, "上菜").start();

//控制并发线程,当有线程没执行完 等待 (循环处理三位客人就餐)
for (int i = 1; i <=3; i++) {
System.out.println(i+"号订单开始处理");
phaser.arriveAndAwaitAdvance();//等待

}

//当所有并发线程都执行完,注销
phaser.arriveAndDeregister(); //注销
System.out.println("All Done!");
}
}
/**
* 功能类:
* 实现了每个岗位的工作
* */

//1.继承 Thread类
class Worker extends Thread
{
//2,创建共有的对象
private Phaser phaser ;

//3.创建类的有参构造函数,传入参数
public Worker(Phaser phaser ,String name)
{
setName(name);
this.phaser = phaser;

phaser.register(); //注册
}

//4.重写run方法
@Override
public void run()
{
//假设有三个订单,所以每个岗位都运行了三遍
for(int i = 1; i<=3; i++)
{
System.out.println("当前服务的是:"+getName());//提示消息,

if(i == 3)//当前岗位三次工作完成 ,注销自己
{
phaser.arriveAndDeregister(); //注销
}else //如果当前岗位没有完成3次工作,等待完成。
{
// System.out.println(getName()+"等待。。。");
phaser.arriveAndAwaitAdvance(); //等待
}

//为了看出效果,做一个休眠,每一秒 当前岗位操作一次
try
{
Thread.sleep(1000);
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

Phaser - 运行结果

Java中并发工具包 - 上

Phaser - 使用总结

Phaser 同步器特性
工作方式与CyclicBarrrier类似,但是可以定义多个阶段
Phaser 的API的使用
1,首先在类的构造方法中先注册 phaser.register(); //注册
2. 其次有两种情况
当线程没有执行完,可以等待 phaser.arriveAndAwaitAdvance(); //等待
当线程执行完,要注销自己 phaser.arriveAndDeregister(); //注销