Java多线程基础知识例子

时间:2022-04-07 04:59:33

一、管理

1、创建线程

Thread
public class Main {

	public static void main(String[] args) {

		MyThread myThread = new MyThread();
myThread.start();
}
} /**
* 继承Thread来创建线程
* 缺点是不能再继承其他的类了
*/
public class MyThread extends Thread { @Override
public void run() {
System.out.println("继承Thread");
}
}
Runnable
public class Main {

	public static void main(String[] args) {

		Thread myThread = new Thread(new MyThread());
myThread.start();
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
System.out.println("实现Runnable");
}
}
ThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { //newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
//newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
//newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
//newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ExecutorService executorService = Executors.newFixedThreadPool(5); MyThread thread = new MyThread();
executorService.execute(thread); System.out.println("线程池创建thread");
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
System.out.println("实现Runnable");
}
}
Callable
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask; public class Main { public static void main(String[] args) { MyThread myThread = new MyThread();
FutureTask<Integer> result = new FutureTask<Integer>(myThread);
Thread thread = new Thread(result);
thread.start(); try {
System.out.println(result.get());
System.out.println("end");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; /**
* 实现Callable来创建线程并返回结果
* 优点是可以返回结果,可以捕获异常
*/
public class MyThread implements Callable<Integer> { @Override
public Integer call() {
System.out.println("实现Callable");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 233;
}
}

2、状态切换

import java.util.concurrent.TimeUnit;

/**
* 线程状态:NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED
*/
public class Main { public static void main(String[] args) { try { Object lock = new Object();
System.out.println("线程运行:创建线程");
Thread myThread = new Thread(new MyThread(lock));
TimeUnit.SECONDS.sleep(1); //NEW
System.out.println("线程状态:" + myThread.getState());
TimeUnit.SECONDS.sleep(1); //启动线程然后等待lock
System.out.println("线程运行:启动线程");
myThread.start();
TimeUnit.SECONDS.sleep(2); //WAITING
System.out.println("线程状态:" + myThread.getState());
TimeUnit.SECONDS.sleep(1); //启动解锁线程唤醒lock
//TIME_WAITING
Thread unLockThread = new Thread(new UnLockThread(lock));
unLockThread.start();
TimeUnit.SECONDS.sleep(2);
System.out.println("线程状态:" + myThread.getState()); TimeUnit.SECONDS.sleep(1);
Thread lockThread = new Thread(new LockThread(lock));
lockThread.start(); //BLOCKED
TimeUnit.SECONDS.sleep(2);
System.out.println("线程状态:" + myThread.getState()); //RUNNABLE
TimeUnit.SECONDS.sleep(4);
System.out.println("线程状态:" + myThread.getState()); TimeUnit.SECONDS.sleep(1);
System.out.println("线程运行:中断线程");
myThread.interrupt(); //TERMINATED
TimeUnit.SECONDS.sleep(2);
System.out.println("线程状态:" + myThread.getState()); } catch (InterruptedException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.TimeUnit; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { private Object lock; public MyThread(Object lock) {
this.lock = lock;
} @Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
synchronized (lock) {
System.out.println("线程运行:进入等待lock");
lock.wait();
} TimeUnit.SECONDS.sleep(1);
System.out.println("线程运行:进入超时等待"); TimeUnit.SECONDS.sleep(3);
System.out.println("线程运行:进入阻塞lock");
synchronized (lock) {
TimeUnit.SECONDS.sleep(1);
System.out.println("线程运行:获得锁");
}
TimeUnit.SECONDS.sleep(1);
System.out.println("线程运行:无限运行");
boolean flag = true;
do {
if(Thread.interrupted()) {
flag = false;
//System.out.println("线程运行:线程中断");
}
} while(flag);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程运行:结束执行");
}
} import java.util.concurrent.TimeUnit; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class LockThread implements Runnable { private Object lock; public LockThread(Object lock) {
this.lock = lock;
} @Override
public void run() {
System.out.println("线程运行:LockThread占有锁3秒");
synchronized (lock) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程运行:LockThread释放锁");
}
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class UnLockThread implements Runnable { private Object lock; public UnLockThread(Object lock) {
this.lock = lock;
} @Override
public void run() {
System.out.println("线程运行:UnlockThread唤醒lock");
synchronized (lock) {
lock.notify();
}
}
}

3、守护线程

import java.util.concurrent.TimeUnit;

/**
* 守护线程会在所有线程结束后结束
*/
public class Main { public static void main(String[] args) { Thread daemonThread = new Thread(new DaemonThread());
daemonThread.setDaemon(true);
daemonThread.start(); Thread myThread = new Thread(new MyThread());
myThread.start(); try {
//等待线程结束
myThread.join();
} catch (InterruptedException e1) {
e1.printStackTrace();
} try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程便当");
}
} import java.util.concurrent.TimeUnit; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程存活确认");
}
System.out.println("子线程便当");
}
} import java.util.concurrent.TimeUnit; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class DaemonThread implements Runnable { @Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("守护线程存活确认");
}
}
}

4、异常捕获

UncaughtExceptionHandler
import org.sy.lab.Java7并发编程实战.一线程管理.八线程中不可控异常的处理.ExceptionHandler;

//可捕获的异常分为非运行时异常和运行时异常
//非运行时异常必须在方法的生命体里throw或捕获
//运行时异常发生时如果没有捕获,只会在控制台输出堆栈信息,所以一般需要给线程设置未捕获异常处理器去捕获和处理异常
//顺序:线程的未捕获异常处理器 -> 线程组的未捕获异常处理器 -> 全局线程的未捕获异常处理器
public class Main { public static void main(String[] args) { Thread myThread = new Thread(new MyThread());
myThread.setUncaughtExceptionHandler(new ExceptionHandler());
myThread.start();
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
int a = Integer.parseInt("TTT");
}
} import java.lang.Thread.UncaughtExceptionHandler; public class ExceptionHandler implements UncaughtExceptionHandler { @Override
public void uncaughtException(Thread t, Throwable e) {
System.out.printf("An exception has been captured\n");
System.out.printf("Thread: %s\n", t.getId());
System.out.printf("Exception: %s: %s\n", e.getClass().getName(), e.getMessage());
System.out.printf("Stack Trace: \n");
e.printStackTrace();
System.out.printf("Thread status: %s\n", t.getState());
}
}
线程组uncaughtException
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {

		MyThread myThread = new MyThread();
MyThreadGroup group = new MyThreadGroup("xs");
for(int i=0; i<5; i++) {
Thread thread = new Thread(group, myThread);
thread.setUncaughtExceptionHandler(new ExceptionHandler());
thread.start();
} try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println("线程数量:" + group.activeCount()); group.list(); Thread[] threads = new Thread[group.activeCount()];
group.enumerate(threads);
for (int i = 0; i < threads.length; i++) {
System.out.println("线程" + threads[i].getName() + "状态:" + threads[i].getState() + " id:" + threads[i].getId());
}
}
} import java.util.concurrent.TimeUnit; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
System.out.println("实现Runnable");
try {
TimeUnit.SECONDS.sleep(2);
Integer a = Integer.valueOf("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public class MyThreadGroup extends ThreadGroup { public MyThreadGroup(String name) {
super(name);
} @Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("捕获到线程组的异常,异常线程:" + t.getName());
e.printStackTrace();
interrupt();
}
} import java.lang.Thread.UncaughtExceptionHandler; public class ExceptionHandler implements UncaughtExceptionHandler { @Override
public void uncaughtException(Thread t, Throwable e) {
System.out.printf("An exception has been captured\n");
System.out.printf("Thread: %s\n", t.getId());
System.out.printf("Exception: %s: %s\n", e.getClass().getName(), e.getMessage());
System.out.printf("Stack Trace: \n");
e.printStackTrace();
System.out.printf("Thread status: %s\n", t.getState());
}
}
Future
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { MyThread myThread = new MyThread();
FutureTask<Integer> result = new FutureTask<Integer>(myThread);
Thread thread = new Thread(result);
thread.start(); try {
TimeUnit.SECONDS.sleep(3);
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
System.out.println("捕获到异常");
e.printStackTrace();
}
}
} import java.util.concurrent.Callable; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Callable<Integer> { @Override
public Integer call() throws Exception {
int a = Integer.parseInt("TTT");
return a;
}
}

5、局部变量

import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {

		MyThread unsafeThread = new MyThread();
for (int i = 0; i < 5; i++) {
Thread myThread = new Thread(unsafeThread);
myThread.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { //在当前的线程中保存变量的副本
//ThreadLocal.get: 获取ThreadLocal中当前线程共享变量的值
//ThreadLocal.set: 设置ThreadLocal中当前线程共享变量的值
//ThreadLocal.remove: 移除ThreadLocal中当前线程共享变量的值
//ThreadLocal.initialValue: ThreadLocal没有被当前线程赋值时或当前线程刚调用remove方法后调用get方法,返回此方法值
ThreadLocal<Integer> num = new ThreadLocal<Integer>() {
protected Integer initialValue() {
return 0;
};
}; @Override
public void run() {
for (int i = 0; i < 10; i++) {
num.set(num.get() + 1);
}
System.out.println(num.get());
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class UnsafeThread implements Runnable { private Integer num = 0; @Override
public void run() {
for (int i = 0; i < 10; i++) {
num++;
}
System.out.println(num);
}
}

6、线程组

import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {

		MyThread myThread = new MyThread();
ThreadGroup group = new ThreadGroup("xs");
for(int i=0; i<5; i++) {
Thread thread = new Thread(group, myThread);
thread.start();
} try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} System.out.println("线程数量:" + group.activeCount()); group.list(); Thread[] threads = new Thread[group.activeCount()];
group.enumerate(threads);
for (int i = 0; i < threads.length; i++) {
System.out.println("线程" + threads[i].getName() + "状态:" + threads[i].getState() + " id:" + threads[i].getId());
}
}
} import java.util.concurrent.TimeUnit; /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
System.out.println("实现Runnable");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

7、

public class Main {

	public static void main(String[] args) {

		MyThread myThread = new MyThread();
MyThreadFactory factory = new MyThreadFactory();
Thread thread = factory.newThread(myThread);
thread.start();
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
System.out.println("实现Runnable");
}
} import java.util.concurrent.ThreadFactory; public class MyThreadFactory implements ThreadFactory { @Override
public Thread newThread(Runnable r) { System.out.println("工厂创建线程");
Thread thread = new Thread(r);
return thread;
}
}

二、同步

1、synchronized

锁的对象
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {

		//UnsafeTick tick = new UnsafeTick();

		//锁住该对象
//public synchronized void method { } 方法锁
//synchronized(this) { } 代码锁
//Tick1 tick = new Tick1(); //锁住该类
//public synchronized static void method { } 方法锁
//synchronized(Test.class) { } 代码锁
//Tick2 tick = new Tick2(); //锁住非依赖属性
//synchronized(o) {} 代码锁
Tick3 tick = new Tick3(); MyThread myThread = new MyThread(tick); for (int i = 0; i < 10; i++) {
Thread thread = new Thread(myThread);
thread.start();
} try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tick.getNum());
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { private ITick tick; public MyThread(ITick tick) {
this.tick = tick;
} @Override
public void run() { for (int i = 0; i < 10; i++) {
tick.subtract();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} public interface ITick { public void add(); public void subtract(); public int getNum();
} public class Tick1 implements ITick { private int num = 100; @Override
public synchronized void add() {
num++;
} @Override
public synchronized void subtract() {
num--;
} @Override
public int getNum() {
return num;
}
} public class Tick2 implements ITick { private int num = 100; @Override
public void add() {
synchronized(ITick.class) {
num++;
}
} @Override
public void subtract() {
synchronized(ITick.class) {
num--;
}
} @Override
public int getNum() {
return num;
}
} public class Tick3 implements ITick { private int num = 100;
private Object obj = new Object(); @Override
public void add() {
synchronized(obj) {
num++;
}
} @Override
public void subtract() {
synchronized(obj) {
num--;
}
} @Override
public int getNum() {
return num;
}
} public class UnsafeTick implements ITick { private int num = 100; @Override
public void add() {
num++;
} @Override
public void subtract() {
num--;
} @Override
public int getNum() {
return num;
}
}
使用条件
public class Main {

	public static void main(String[] args) {

		Storage storage = new Storage();

		Producer producer1 = new Producer(storage, 10);
Producer producer2 = new Producer(storage, 10);
Producer producer3 = new Producer(storage, 10);
Producer producer4 = new Producer(storage, 50);
Producer producer5 = new Producer(storage, 10);
Producer producer6 = new Producer(storage, 10);
Producer producer7 = new Producer(storage, 50);
Producer producer8 = new Producer(storage, 10);
Producer producer9 = new Producer(storage, 10);
Producer producer10 = new Producer(storage, 10); Consumer consumer1 = new Consumer(storage, 50);
Consumer consumer2 = new Consumer(storage, 20);
Consumer consumer3 = new Consumer(storage, 30); Thread thread1 = new Thread(producer1, "生产者1");
Thread thread2 = new Thread(producer2, "生产者2");
Thread thread3 = new Thread(producer3, "生产者3");
Thread thread4 = new Thread(producer4, "生产者4");
Thread thread5 = new Thread(producer5, "生产者5");
Thread thread6 = new Thread(producer6, "生产者6");
Thread thread7 = new Thread(producer7, "生产者7");
Thread thread8 = new Thread(producer8, "生产者8");
Thread thread9 = new Thread(producer9, "生产者9");
Thread thread10 = new Thread(producer10, "生产者10");
Thread thread11 = new Thread(consumer1, "消费者1");
Thread thread12 = new Thread(consumer2, "消费者2");
Thread thread13 = new Thread(consumer3, "消费者3"); thread11.start();
thread12.start();
thread13.start();
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();
thread7.start();
thread8.start();
thread9.start();
thread10.start();
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { @Override
public void run() {
System.out.println("实现Runnable");
}
} public class Consumer implements Runnable { private Storage storage;
private int num; public Consumer(Storage storage, int num) {
this.storage = storage;
this.num = num;
} @Override
public void run() {
storage.subtract(num);
}
} public class Producer implements Runnable { private Storage storage;
private int num; public Producer(Storage storage, int num) {
this.storage = storage;
this.num = num;
} @Override
public void run() {
storage.add(num);
}
} /**
* 仓库
*/
public class Storage { private int max = 100;
private int num = 0; public synchronized void add(int n) { while(num + n > max) {
try {
System.out.println(Thread.currentThread().getName() + " 要增加:" + n + " 仓库存量:" + num + " 无法加入,进入等待");
wait();
System.out.println(Thread.currentThread().getName() + " 被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num += n;
System.out.println(Thread.currentThread().getName() + " 已增加:" + n + " 现仓库存量:" + num);
notifyAll();
} public synchronized void subtract(int n) { while(num - n < 0) {
try {
System.out.println(Thread.currentThread().getName() + " 要取出:" + n + " 仓库存量:" + num + " 无法取出,进入等待");
wait();
System.out.println(Thread.currentThread().getName() + " 被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num -= n;
System.out.println(Thread.currentThread().getName() + " 已取出:" + n + " 现仓库存量:" + num);
notifyAll();
} public int getNum() {
return num;
}
}

2、ReentrantLock

创建锁
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) {

		Tick tick = new Tick();

		MyThread myThread = new MyThread(tick);

		for (int i = 0; i < 10; i++) {
Thread thread = new Thread(myThread);
thread.start();
} try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tick.getNum());
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { private Tick tick; public MyThread(Tick tick) {
this.tick = tick;
} @Override
public void run() { for (int i = 0; i < 10; i++) {
tick.subtract();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class Tick { private int num = 100;
private Lock lock = new ReentrantLock(); public void add() {
lock.lock();
num++;
lock.unlock();
} public void subtract() {
lock.lock();
num--;
lock.unlock();
} public int getNum() {
return num;
}
}
读写锁
public class Main {

	public static void main(String[] args) {

		Counter counter = new Counter();
Read read = new Read(counter);
Write write = new Write(counter); Thread thread1 = new Thread(read);
thread1.start(); Thread thread2 = new Thread(write);
thread2.start();
}
} import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock; public class Counter { private int num = 10;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public void add() {
lock.writeLock().lock();
System.out.println("--开始写入");
try {
for (int i = 0; i < 5; i++) {
num++;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("--结束写入");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
} public int read() {
lock.readLock().lock();
try {
System.out.println("读取:" + num);
return num;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
return 0;
} public int unsafeRead() {
System.out.println("读取:" + num);
return num;
}
} import java.util.concurrent.TimeUnit; public class Read implements Runnable { private Counter counter; public Read(Counter counter) {
this.counter = counter;
} @Override
public void run() { for (int i = 0; i < 10; i++) {
counter.read();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} import java.util.concurrent.TimeUnit; public class Write implements Runnable { private Counter counter; public Write(Counter counter) {
this.counter = counter;
} @Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} counter.add();
}
}
公平锁
public class Main {

	public static void main(String[] args) {

		Tick tick = new Tick();

		MyThread myThread = new MyThread(tick);

		for (int i = 0; i < 5; i++) {
Thread thread = new Thread(myThread);
thread.start();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} /**
* 实现Runnable来创建线程
* 优点是可以继承其他的类,可以作为多个Thread的target实现资源共享
*/
public class MyThread implements Runnable { private Tick tick; public MyThread(Tick tick) {
this.tick = tick;
} @Override
public void run() {
tick.add();
}
} import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class Tick { private Lock lock = new ReentrantLock(false); public void add() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "进入锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(Thread.currentThread().getName() + "离开锁");
lock.unlock(); lock.lock();
System.out.println(Thread.currentThread().getName() + "进入锁2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(Thread.currentThread().getName() + "离开锁2");
lock.unlock();
}
}
使用条件
public class Main {

	//酒吧里有100瓶老酒,吧台有3个位置,1次只能喝1瓶
//1个酒保把老酒拿到吧台
//5个酒鬼在吧台抢酒喝
public static void main(String[] args) { Bar bar = new Bar();
Counter counter = new Counter();
Bartender bartender = new Bartender(bar, counter);
Drunkard drunkard = new Drunkard(counter); Thread thread = new Thread(bartender, "酒保");
thread.start(); for (int i = 1; i <= 5; i++) {
Thread thread1 = new Thread(drunkard, "酒鬼" + i);
thread1.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} public class Bar { private int beer = 100; public void getBeer() {
beer--;
System.out.println(Thread.currentThread().getName() + "取走1瓶酒,酒窖剩余:" + beer);
} public boolean hasBeer() {
return beer>0 ? true:false;
}
} public class Bartender implements Runnable { private Bar bar;
private Counter counter; public Bartender(Bar bar, Counter counter) {
this.bar = bar;
this.counter = counter;
} @Override
public void run() {
while (bar.hasBeer()) {
bar.getBeer();
counter.setBeer();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
counter.setOpen(false);
}
} import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; public class Counter { private int beer = 0;
private int max = 3;
private Lock lock = new ReentrantLock(false);
private Condition set = lock.newCondition();
private Condition get = lock.newCondition();
private boolean open = true; public void setBeer() {
lock.lock();
try {
while(beer == max) {
set.await();
}
beer++;
get.signalAll();
System.out.println(Thread.currentThread().getName() + "拿出1瓶酒,吧台老酒数量:" + beer);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} public void getBeer() {
lock.lock();
try {
while(beer == 0) {
get.await();
}
beer--;
set.signalAll();
System.out.println("--" + Thread.currentThread().getName() + "喝掉1瓶酒,吧台老酒数量:" + beer);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} public boolean hasBeer() {
return beer>0 ? true:false;
} public boolean isOpen() {
return open;
} public void setOpen(boolean open) {
this.open = open;
}
} public class Drunkard implements Runnable { private Counter counter; public Drunkard(Counter counter) {
this.counter = counter;
} @Override
public void run() {
while (counter.isOpen() || counter.hasBeer()) {
counter.getBeer();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

三、同步辅助类

1、Semaphore信号量

//Semaphore有限流的效果
//10个文件,3台打印机,同时最多有3个文件处于正在打印的状态
public class Main { public static void main(String[] args) { Queue queue = new Queue();
MyThread myThread = new MyThread(queue); for (int i = 0; i < 10; i++) {
Thread thread = new Thread(myThread, "打印机" + i);
thread.start();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} public class MyThread implements Runnable { private Queue queue; public MyThread(Queue queue) {
this.queue = queue;
} @Override
public void run() {
queue.print(Thread.currentThread().getName());
}
} import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; public class Queue { private Semaphore semaphore = new Semaphore(3); public void print(String name) { try {
semaphore.acquire();
System.out.println(name + "开始打印");
TimeUnit.SECONDS.sleep(1);
System.out.println("--" + name + "打印结束");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}

2、CountDownLatch门阀

import java.util.concurrent.TimeUnit;

//CountDownLatch可以提供类似join的功能
//await等待其他线程,在计数减为0的时候继续执行
//例子里10个人参与众筹,在6个人参加后众筹成功
public class Main { public static void main(String[] args) { Crowdfunding crowdfunding = new Crowdfunding(6);
Thread thread = new Thread(crowdfunding);
thread.start(); MyThread myThread = new MyThread(crowdfunding);
for (int i = 1; i <= 10; i++) {
Thread thread2 = new Thread(myThread, "客户" + i);
thread2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} public class MyThread implements Runnable { private Crowdfunding crowdfunding; public MyThread(Crowdfunding crowdfunding) {
this.crowdfunding = crowdfunding;
} @Override
public void run() {
crowdfunding.join(Thread.currentThread().getName());
}
} import java.util.concurrent.CountDownLatch; public class Crowdfunding implements Runnable { private CountDownLatch countDownLatch; public Crowdfunding(int num) {
countDownLatch = new CountDownLatch(num);
} public void join(String name) { countDownLatch.countDown();
System.out.println(name + "参与众筹,还需要数量:" + countDownLatch.getCount());
} @Override
public void run() {
System.out.println("众筹开始,需求数量:" + countDownLatch.getCount());
try {
countDownLatch.await();
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("众筹成功");
}
}

3、CyclicBarrier同步屏障

import java.util.concurrent.TimeUnit;

//CyclicBarrier允许两个或者多个线程在某个点上进行同步
//可以传入另一个Runnable对象作为初始化参数,当所有线程都到达集合点后,CyclicBarrier类将这个Runnable对象作为线程优先去执行
//比CountDownLatch功能强大,支持用reset()方法重置,getNumberWaiting()可以获得阻塞的线程数量,isBroken()判断是否被中断
//例子里5个人参与众筹,在5个人参加后众筹成功,生产完产品,5个人结束众筹
public class Main { public static void main(String[] args) { Product product = new Product();
Crowdfunding crowdfunding = new Crowdfunding(5, product); MyThread myThread = new MyThread(crowdfunding);
for (int i = 1; i <= 5; i++) {
Thread thread = new Thread(myThread, "客户" + i);
thread.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} public class MyThread implements Runnable { private Crowdfunding crowdfunding; public MyThread(Crowdfunding crowdfunding) {
this.crowdfunding = crowdfunding;
} @Override
public void run() {
crowdfunding.join(Thread.currentThread().getName());
}
} import java.util.concurrent.TimeUnit; public class Product implements Runnable { @Override
public void run() { try {
System.out.println("众筹成功,生产产品..");
TimeUnit.SECONDS.sleep(3);
System.out.println("产品发货");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; public class Crowdfunding { private CyclicBarrier cyclicBarrier; public Crowdfunding(int num, Runnable runnable) {
cyclicBarrier = new CyclicBarrier(num, runnable);
} public void join(String name) { System.out.println("当前参与人数:" + cyclicBarrier.getNumberWaiting());
System.out.println(name + "参与众筹");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + "完成众筹");
}
}

4、Phaser阶段器

//Phaser是一种可重用的同步屏障,比CountDownLatch和CyclicBarrier更灵活
//Phaser默认不会进行异常处理,休眠的线程不会响应中断事件
//arriveAndAwaitAdvance()等待参与者到达指定的数量
//arriveAndDeregister注销当前线程,可以模拟CountDownLatch的功能
//继承Phaser可以实现更多的触发行为,其中onAdvance()返回true表示deregister所有线程并解除阻塞
//例子中模拟火箭分3个阶段升空的过程,有3个模块在每个阶段都要同时就绪,燃油要在前两个阶段就绪,然后在第2个阶段结束后回收
public class Main { public static void main(String[] args) { Rocket rocket = new Rocket(); Module module1 = new Module(rocket);
Module module2 = new Module(rocket);
Module module3 = new Module(rocket);
Fuel fuel = new Fuel(rocket); Thread thread1 = new Thread(module1, "模块1");
Thread thread2 = new Thread(module2, "模块2");
Thread thread3 = new Thread(module3, "模块3");
Thread thread4 = new Thread(fuel, "燃料"); thread1.start();
thread2.start();
thread3.start();
thread4.start();
}
} import java.util.concurrent.TimeUnit; public class Fuel implements Runnable { private Rocket rocket = new Rocket(); public Fuel(Rocket rocket) {
this.rocket = rocket;
} @Override
public void run() { try { TimeUnit.SECONDS.sleep((int)(Math.random()*5));
rocket.first(Thread.currentThread().getName()); TimeUnit.SECONDS.sleep((int)(Math.random()*5));
rocket.secondRemove(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.TimeUnit; public class Module implements Runnable { private Rocket rocket = new Rocket(); public Module(Rocket rocket) {
this.rocket = rocket;
} @Override
public void run() { try { TimeUnit.SECONDS.sleep((int)(Math.random()*5));
rocket.first(Thread.currentThread().getName()); TimeUnit.SECONDS.sleep((int)(Math.random()*5));
rocket.second(Thread.currentThread().getName()); TimeUnit.SECONDS.sleep((int)(Math.random()*5));
rocket.third(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.Phaser; public class MyPhaser extends Phaser { public MyPhaser(int parties) {
super(null, parties);
} @Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("火箭第1阶段就绪,就绪部件数量:" + registeredParties + "\n");
return false;
case 1:
System.out.println("火箭第2阶段就绪,就绪部件数量:" + registeredParties + "\n");
return false;
case 2:
System.out.println("火箭第3阶段就绪,就绪部件数量:" + registeredParties + "\n");
System.out.println("火箭升空\n");
return false;
default:
return true;
}
}
} public class Rocket { private MyPhaser phaser = new MyPhaser(4); public void first(String name) {
System.out.println(name + "在阶段1就绪");
phaser.arriveAndAwaitAdvance();
} public void second(String name) {
System.out.println("--" + name + "在阶段2就绪");
phaser.arriveAndAwaitAdvance();
} public void third(String name) {
System.out.println("----" + name + "在阶段3就绪");
phaser.arriveAndAwaitAdvance();
} public void secondRemove(String name) {
System.out.println("--" + name + "在阶段2就绪");
phaser.arriveAndAwaitAdvance(); System.out.println("--" + name + "在阶段2结束后移除");
phaser.arriveAndDeregister();
}
}

5、Exchanger交换者

import java.util.concurrent.Exchanger;

//Exchanger用于两个线程在到达同步点时交换数据
//可以用于遗传算法和校对工作
//例子中对产生后的2个银行流水进行比较
public class Main { public static void main(String[] args) { Exchanger<Integer> exchanger = new Exchanger<Integer>(); Account1 account1 = new Account1(exchanger);
Account2 account2 = new Account2(exchanger); Thread thread1 = new Thread(account1, "银行账单1");
Thread thread2 = new Thread(account2, "银行账单2"); thread1.start();
thread2.start();
}
} import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit; public class Account1 implements Runnable { private Exchanger<Integer> exchanger; public Account1(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
} @Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "已出账"); Integer account = exchanger.exchange(233);
System.out.println(Thread.currentThread().getName() + "对方金额:" + account);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit; public class Account2 implements Runnable { private Exchanger<Integer> exchanger; public Account2(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
} @Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "已出账"); Integer account = exchanger.exchange(666);
System.out.println(Thread.currentThread().getName() + "对方金额:" + account);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

四、执行器

1、创建线程池

newFixedThreadPool
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor; //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 5; i < 10; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 10; i < 15; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
}
executor.shutdown();
}
} import java.util.concurrent.TimeUnit; public class MyThread implements Runnable { private String name; public MyThread(String name) {
this.name = name;
} @Override
public void run() { System.out.println(Thread.currentThread().getName() + "开始执行" + name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}
newCachedThreadPool
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor; import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread; //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 5; i < 10; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 10; i < 15; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
}
executor.shutdown();
}
} import java.util.concurrent.TimeUnit; public class MyThread implements Runnable { private String name; public MyThread(String name) {
this.name = name;
} @Override
public void run() { System.out.println(Thread.currentThread().getName() + "开始执行" + name);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}
newSingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread; //创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
public class Main { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 5; i < 10; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 10; i < 15; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
}
executor.shutdown();
}
} import java.util.concurrent.TimeUnit; public class MyThread implements Runnable { private String name; public MyThread(String name) {
this.name = name;
} @Override
public void run() { System.out.println(Thread.currentThread().getName() + "开始执行" + name);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}
newScheduledThreadPool
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread; //创建一个定长线程池,支持定时及周期性任务执行
public class Main { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(3); for (int i = 0; i < 5; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.schedule(myThread, 10, TimeUnit.SECONDS);
} for (int i = 5; i < 10; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
} for (int i = 10; i < 15; i++) {
MyThread myThread = new MyThread("任务" + (i+1));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(myThread);
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} import java.util.concurrent.TimeUnit; public class MyThread implements Runnable { private String name; public MyThread(String name) {
this.name = name;
} @Override
public void run() { System.out.println(Thread.currentThread().getName() + "开始执行" + name);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}
周期性任务
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import org.sy.lab.多线程基础._4_执行器._1_创建线程池._1_newFixedThreadPool.MyThread; //创建一个定长线程池,支持定时及周期性任务执行
//scheduleAtFixedRate 第3个参数表示前一个任务的开始时间和后一个任务的开始时间间隔
//scheduleWithFixedDelay 第3个参数表示前一个任务的结束时间和后一个任务的开始时间间隔
public class Main { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); MyThread myThread = new MyThread("任务");
executor.scheduleAtFixedRate(myThread, 0, 5, TimeUnit.SECONDS);
//executor.scheduleWithFixedDelay(myThread, 0, 5, TimeUnit.SECONDS); executor.shutdown();
}
} import java.util.concurrent.TimeUnit; public class MyThread implements Runnable { private String name; public MyThread(String name) {
this.name = name;
} @Override
public void run() { System.out.println(Thread.currentThread().getName() + "开始执行" + name);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行结束");
}
}

2、Future线程控制

返回结果
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); MyCallable myCallable = new MyCallable(); Future<String> result = executor.submit(myCallable);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
} import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override
public String call() throws Exception { return "这是callable返回的结果";
}
}
处理第一个结果
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor; import org.sy.lab.多线程基础._4_执行器._2_Future线程控制._3_处理所有结果.MyCallable; //invokeAny 返回第一个完成没有抛异常的任务结果
public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); MyCallable myCallable = new MyCallable(5, "任务1");
MyCallable myCallable2 = new MyCallable(2, "任务2"); List<MyCallable> list =new ArrayList<MyCallable>();
list.add(myCallable);
list.add(myCallable2); try {
System.out.println(executor.invokeAny(list));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
} import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; public class MyCallable implements Callable<String> { private int time;
private String name; public MyCallable(int time, String name) {
this.time = time;
this.name = name;
} @Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(time);
return "这是" + name + "返回的结果";
}
}
处理所有结果
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; //invokeAll 接收一个任务列表,等待所有任务完成
public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); MyCallable myCallable = new MyCallable(5, "任务1");
MyCallable myCallable2 = new MyCallable(2, "任务2"); List<MyCallable> list =new ArrayList<MyCallable>();
list.add(myCallable);
list.add(myCallable2); List<Future<String>> result = new ArrayList<Future<String>>();
try {
result = executor.invokeAll(list);
for (Iterator<Future<String>> iterator = result.iterator(); iterator.hasNext();) {
Future<String> future = (Future<String>) iterator.next();
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
} import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; public class MyCallable implements Callable<String> { private int time;
private String name; public MyCallable(int time, String name) {
this.time = time;
this.name = name;
} @Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(time);
return "这是" + name + "返回的结果";
}
}
取消任务
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); MyCallable myCallable = new MyCallable(); Future<String> result = executor.submit(myCallable);
System.out.println("isCancelled:" + result.isCancelled());
System.out.println("isDone:" + result.isDone());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("取消任务");
result.cancel(true);
System.out.println("isCancelled:" + result.isCancelled());
System.out.println("isDone:" + result.isDone());
executor.shutdown();
}
} import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override
public String call() throws Exception {
while(true) {
System.out.printf("Task: Test\n");
Thread.sleep(100);
}
}
}
控制任务的完成
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import org.sy.lab.多线程基础._4_执行器._2_Future线程控制._4_取消任务.MyCallable; public class Main { public static void main(String[] args) { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); MyCallable myCallable = new MyCallable();
MyFuture myFuture = new MyFuture(myCallable); executor.submit(myFuture);
executor.shutdown();
System.out.println("isCancelled:" + myFuture.isCancelled());
System.out.println("isDone:" + myFuture.isDone());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("取消任务");
myFuture.cancel(true);
System.out.println("isCancelled:" + myFuture.isCancelled());
System.out.println("isDone:" + myFuture.isDone());
}
} import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override
public String call() throws Exception {
while(true) {
System.out.printf("Task: Test\n");
Thread.sleep(100);
}
}
} import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask; public class MyFuture extends FutureTask<String> { public MyFuture(Callable<String> callable) {
super(callable);
} @Override
protected void done() {
if(isCancelled()) {
System.out.println("cancel");
} else {
System.out.println("done");
}
}
}

3、分离任务的启动和结束

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
CompletionService<String> service = new ExecutorCompletionService<String>(executorService); Producer producer1 = new Producer("生产者1", service);
Producer producer2 = new Producer("生产者2", service);
Consumer consumer = new Consumer("消费者", service); Thread thread1 = new Thread(producer1);
Thread thread2 = new Thread(producer2);
Thread thread3 = new Thread(consumer); thread1.start();
thread2.start();
thread3.start(); try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
consumer.setEnd();
}
} import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; public class Consumer implements Runnable { private String name;
private boolean end = false;
private CompletionService<String> service; public Consumer(String name, CompletionService<String> service) {
this.name = name;
this.service = service;
} @Override
public void run() {
while(!end) {
Future<String> result = service.poll();
try {
if(result != null) {
System.out.println(name + "消费 " + result.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
} public void setEnd() {
end = true;
}
} import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; public class Processor implements Callable<String> { private String name; public Processor(String name) {
this.name = name;
} @Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep((long) (Math.random() * 3));
System.out.println(name + "生产产品完成");
return name + "生产的产品";
}
} import java.util.concurrent.CompletionService; public class Producer implements Runnable { private String name;
private CompletionService<String> service; public Producer(String name, CompletionService<String> service) {
this.name = name;
this.service = service;
} @Override
public void run() {
Processor processor = new Processor(name);
service.submit(processor);
}
}

4、处理被拒绝的任务

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor; //ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
//ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
//ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
//ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
//自定义处理继承RejectedExecutionHandler类
public class Main {
public static void main(String[] args) {
RejectedTaskController controller = new RejectedTaskController();
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
executor.setRejectedExecutionHandler(controller);
System.out.printf("Main: Starting.\n");
for(int i=0; i<3; i++) {
Task task = new Task("Task" + i);
executor.submit(task);
}
System.out.printf("Main: Shutting down the Executor.\n");
executor.shutdown();
System.out.printf("Main: Sending another Task.\n");
Task task = new Task("RejectedTask");
executor.submit(task);
System.out.printf("Main: End.\n");
}
} import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; public class RejectedTaskController implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.printf("RejectedTaskController: The task %s has been rejected\n", r.toString());
System.out.printf("RejectedTaskController: %s\n", executor.toString());
System.out.printf("RejectedTaskController: Terminating: %s\n", executor.isTerminating());
System.out.printf("RejectedTaksController: Terminated: %s\n", executor.isTerminated());
}
} import java.util.concurrent.TimeUnit; public class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.printf("Task " + name + ": Starting\n");
try {
long duration = (long) (Math.random() * 10);
System.out.printf("Task %s: ReportGenerator: Generating a report during %d seconds\n", name, duration);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Task %s: Ending\n", name);
}
public String toString() {
return name;
}
}

五、ForkJoin

六、集合

七、定制

八、监控

九、代码下载

代码下载:gitee