1.Callable<V> :接口,多线程的一种实现方式,实现类重写方法,重写的call()方法有返回值或者抛出异常,需要配合着FutureTask类(实现了Runnable接口)使用:
1 public class CallableTest { 2 public static void main(String[] args) { 3 Test t=new Test(); 4 FutureTask<Integer> f=new FutureTask<>(t); //获取结果,与callable配合着使用 5 new Thread(f).start(); //FutureTask也实现了Runnable接口 6 Integer i= null; 7 try { 8 i = f.get(); //FutureTask的方法,获得个、返回的结果 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } catch (ExecutionException e) { 12 e.printStackTrace(); 13 } 14 System.out.println(i); 15 16 } 17 } 18 class Test implements Callable<Integer>{ //实现Runnable接口,重写call()方法,有返回值 19 int sum=0; 20 @Override 21 public Integer call() throws Exception { 22 for (int i = 0; i <=100; i++) { 23 sum=sum+i; 24 } 25 return sum; 26 } 27 }
2.Semaphore类:信号量,直接new对象,semaphore.acquire():获取信号量,如果获取失败就就如阻塞,semaphore.relase():释放信号量,semaphore.availablePermits():获取信号量的编号。
Semaphore s2=new Semaphore(5,true);
Semaphore s1=new Semaphore(5); //两种构造方法,(int,boolean) int:资源的访问量 boolean:是否是公平竞争 //公平竞争:等的越久越先执行
try {
s1.acquire();//请求资源,如果资源不可用该线程阻塞,直到有可用资源为止;
} catch (InterruptedException e) {
e.printStackTrace();
}
s1.release();//释放资源
一个信号量的停车场案例:
public class Parking2 implements Runnable{
Semaphore semaphore;
int id;
public Parking2(Semaphore semaphore, int id) {
this.semaphore = semaphore;
this.id = id;
}
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(4); //4个车位
ExecutorService executorService= Executors.newCachedThreadPool();//一个线程池
for (int i = 1; i <= 25; i++) {
executorService.execute(new Parking2(semaphore,i));
}
executorService.shutdown();
}
@Override
public void run() {
try {
push();
Thread.sleep(1000);
pop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//停
public void push() throws InterruptedException {
if(semaphore.availablePermits()>0){
System.out.println(this.id+"可以停车!");
}else{
System.out.println(this.id+"请等待,没有车位!");
}
semaphore.acquire();
System.out.println(this.id+"停车成功!");
}
//取
public void pop(){
semaphore.release();
System.out.println(this.id+"取车成功!");
}
}
3.ReentrantLock和Condition:
ReentrantLock:可重入排他锁,直接new对象,需要手动的加和释放锁,但比较灵活。
public class ReentrantTest implements Runnable{
Test t;
int id;
public ReentrantTest(Test t, int id) {
this.t = t;
this.id = id;
}
@Override
public void run() {
try {
t.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executorService= Executors.newCachedThreadPool();//线程池
Test t=new Test();
//保证每一个线程访问同一个线程池,和同一把锁
for (int i = 1; i <= 30; i++) {
executorService.execute(new ReentrantTest(t,i));
}
executorService.shutdown();//关闭
}
}
class Test{
ReentrantLock reentrantLock=new ReentrantLock();//new一个锁
public void get() throws InterruptedException {
reentrantLock.lock(); //上锁
System.out.println(Thread.currentThread().getName()+"_启动");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"_结束");
//释放锁
reentrantLock.unlock();
}
}
Condition:是一个接口,创建Condition对象需要ReentrantLock.newCondition();主要有await():导致当前线程等待,singal():唤醒一个等待线程。如下代码实例证明:1.await(),signal()只能在当前对象获得锁(ReentrantLock)是被调用;2.当对象调用await()方法后,会交出锁,让其他线程对象争用;3,。在调用signal()后,锁对象又回到await()调用处,继续执行。
public class ConditionTest { static ReentrantLock lock=new ReentrantLock(); static Condition condition=lock.newCondition(); //condition是一个接口,有await(),singal()方法 static int count=1; public static void main(String[] args) throws InterruptedException { Runnable r1= () -> { lock.lock(); System.out.println("r1带锁启动。。。"); try { System.out.println("r1即将被锁住"); condition.await(); //等待,并且交出锁 System.out.println("r1又获新生!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("r1执行完毕"); lock.unlock(); } }; Runnable r2= () -> { lock.lock(); System.out.println("r2带锁启动。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("r1即将被唤醒!"); condition.signal(); //唤醒r1 System.out.println("r2执行完毕"); lock.unlock(); }; new Thread(r1).start(); //让r1先执行,但条件不足,只能先等待 new Thread(r2).start(); } }
4.BlockingQueue:是一个阻塞队列的超类接口,此接口的实现类有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 。每个类的具体使用可以参考API。以下的实例使用LinkedBlockingQueue做的。
1 //设计一个阻塞队列 2 //每个多线程对象都会往queue(为对象共享static)中放元素会涉及: 3 // --->如果队列已满就阻塞等待,--->如果队列空就阻塞等待 4 //本例中一共有三个执行: Main主线程, 往blockingqueue放元素的线程 从blockingqueue中取元素的线程 5 //三者一起执行 6 public class TestBlockingQueue implements Runnable{ 7 //属性: 8 int id; //每个对象编号 9 static BlockingQueue<String> queue=new LinkedBlockingQueue<>(3); 10 11 //构造: 12 public TestBlockingQueue(int id) { 13 this.id = id; 14 } 15 16 @Override 17 public void run() { 18 //每个线程元素都会把自己的ID放到队列中 19 try { 20 queue.put(String.valueOf(this.id)); //put():会抛异常 21 System.out.println(this.id+"已被放进队列"); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 } 26 27 public static void main(String[] args) { 28 //定义一个线程池: 29 ExecutorService executorService= Executors.newCachedThreadPool(); 30 for (int i = 0; i < 15; i++) { 31 //线程池执行线程对象 32 executorService.execute(new TestBlockingQueue(i)); 33 } 34 Thread t=new Thread(()->{ 35 //run()的方法体 36 while (true){ 37 try { 38 Thread.sleep(100); 39 if(queue.isEmpty()){//静态属性可以类名直接调 40 break; 41 } 42 //拿出队列中的元素 43 String string=TestBlockingQueue.queue.take();//拿出并删除队列头的元素 44 System.out.println(string+"被拿出"); 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 } 49 }); 50 51 //让线程池去执行线程对象t 52 executorService.execute(t); 53 54 //关闭线程池 55 executorService.shutdown(); 56 } 57 }
5.CompletionService:是一个接口,创建实现类对象,是将生产新的异步任务和使用已完成任务结果分离开来的服务。有实现类:ExecutorCompletionService,方法:submit(),take(),具体用法如下实例:
1 //测试: CompletionService = new ExecutorCompletionService(Executor实现类)--->接口,实现类,让线程池来执行任务 2 //方法: 1.completionService.submit(Callable<V>的实现类)-->提交执行线程对象 3 // 2.completionService.take().get()-->检索并删除下一个已完成的任务,get()获得该线程的执行结果 4 //会阻塞等待 5 public class CompletionServiceTest implements Callable<String> { 6 int id; 7 8 public CompletionServiceTest(int id) { 9 this.id = id; 10 } 11 12 @Override 13 public String call() throws Exception { 14 //每个线程打印,并返回自己的编号 15 System.out.println(this.id+"_已启动"); 16 Thread.sleep(100); 17 System.out.println(this.id+"_结束"); 18 return this.id+"_thread,call()方法的返回值"; 19 } 20 21 public static void main(String[] args) { 22 //线程池 23 ExecutorService executorService= Executors.newCachedThreadPool(); 24 //放的是executor的实现类(线程池对象) 25 CompletionService <String>completionService= new ExecutorCompletionService<String>(executorService); 26 //使用ExecutorCompletionService作为执行对象,LinkedBlockingQueue:作为完成队列 27 28 for (int i = 0; i < 15; i++) { 29 completionService.submit(new CompletionServiceTest(i)); 30 //提交执行 31 } 32 for (int i = 0; i < 15; i++) { 33 try { 34 try { 35 System.out.println(completionService.take().get()); 36 //并打印返回值 37 //take():获取已完成的任务的结果,并从完成队列中删除该任务 38 } catch (ExecutionException e) { 39 e.printStackTrace(); 40 } 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 } 45 //线程池关闭 46 executorService.shutdown(); 47 } 48 }
6.CountDonwLatch:是继承自Object的类,允许一个或一组线程等待其他线程完成的辅助类
1 //CountDownLatch是继承自Object的类 2 //1.构造方法,直接new: countdownlatck=new CountDownLatch(int) 3 //2.主要方法:countdownlatch.await():等待计数到达0,后开始线程执行,没有就阻塞等待 4 // countdownlatch.countDown():计数减一 5 //一个10一起赛跑的案例: 要求:10个线程一起开始运行,等10个线程都运行结束后输出比赛结束的信息 6 7 import java.util.concurrent.CountDownLatch; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors; 10 11 public class CountDownLatchTest { 12 //比赛开始倒计时: 13 static CountDownLatch start=new CountDownLatch(1); 14 15 //比赛结束倒计时: 16 static CountDownLatch end=new CountDownLatch(10);//10名选手,每来一个就减少一个 17 18 public static void main(String[] args) { 19 //线程池 20 ExecutorService pool= Executors.newFixedThreadPool(10); 21 System.out.println("这是一场10人制比赛~~~"); 22 for (int i = 0; i < 10; i++) { //创建10个线程 23 int id=i+1; 24 Runnable runner= () -> { 25 System.out.println("运动员:"+id+"准备就绪!"); 26 try { 27 start.await(); //等待开始的指令 28 29 Thread.sleep(1000); //跑步中 30 31 System.out.println("运动员"+id+"已到达!"); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 }finally { 35 end.countDown(); //每跑完一个人就让计数减一 36 } 37 }; 38 pool.execute(runner); //执行10个线程,都阻塞在开始指令之前 39 } 40 41 start.countDown(); 42 43 try { 44 end.await(); //等待比赛结束 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 System.out.println("全场比赛结束!"); 49 //关闭线程池 50 pool.shutdown(); 51 } 52 }
7.CyclicBarrier:是一个类,作用是:允许一组线程互相等待到达一个屏障点前的同步辅助,这个屏障在所有等待的线程被释放后可以重新使用,这是该屏障称为循环(CountDownLatch不同之处)。
1 //一个旅游团的例子 2 //构造方法:cyclicBarrier=new CyclicBarrier(int)-->int:要等待的线程数 3 //重要方法:await()-->当所有线程都执行了该方法,就一起继续向前执行,否则就等待 4 public class CyclicBarrierTest { 5 //属性: 三种策略到达5个城市的时间数组: 西安 北京 重庆 成都 杭州 6 static int [] bycar={1,2,3,4,5}; 7 static int [] bybus={2,3,4,5,6}; 8 static int [] onfoot={3,4,5,6,7}; 9 10 static String getNow(){ 11 //获取现在的时间 12 SimpleDateFormat sdf=new SimpleDateFormat("hh:mm:ss"); 13 return sdf.format(new Date()); 14 } 15 static class Tour implements Runnable{ 16 CyclicBarrier cyclicBarrier=new CyclicBarrier(3); //三家旅行公司,采用三种celue 17 String tourName; //旅行店的名字 18 int [] time; 19 20 public Tour(CyclicBarrier cyclicBarrier, String tourName,int [] time) { 21 this.cyclicBarrier = cyclicBarrier; 22 this.tourName = tourName; 23 this.time=time; 24 } 25 26 @Override 27 public void run() { 28 try { 29 Thread.sleep(time[0]*1000); 30 System.out.println(getNow()+":"+this.tourName+"到达-->西安"); 31 try { 32 cyclicBarrier.await(); //等待其他线程执行到此处 33 34 //继续执行: 35 Thread.sleep(time[1]*1000); 36 System.out.println(getNow()+":"+this.tourName+"到达-->北京"); 37 cyclicBarrier.await(); //等待其他两个团到达北京,循环使用 38 39 Thread.sleep(time[2]*1000); 40 System.out.println(getNow()+":"+this.tourName+"到达-->重庆"); 41 cyclicBarrier.await(); //等待其他两个团到达北京,循环使用 42 43 Thread.sleep(time[3]*1000); 44 System.out.println(getNow()+":"+this.tourName+"到达-->成都"); 45 cyclicBarrier.await(); //等待其他两个团到达北京,循环使用 46 47 Thread.sleep(time[4]*1000); 48 System.out.println(getNow()+":"+this.tourName+"到达-->杭州"); 49 //cyclicBarrier.await(); //等待其他两个团到达北京,循环使用 50 51 } catch (BrokenBarrierException e) { 52 e.printStackTrace(); 53 } 54 } catch (InterruptedException e) { 55 e.printStackTrace(); 56 } 57 } 58 } 59 public static void main(String[] args) { 60 //定义线程池 61 ExecutorService pool= Executors.newFixedThreadPool(3); 62 63 //定义CyclicBarrier 64 CyclicBarrier cyclicBarrier=new CyclicBarrier(3); 65 66 //线程执行线程,分别执行 67 pool.execute(new Tour(cyclicBarrier,"T_Walk",onfoot)); 68 pool.execute(new Tour(cyclicBarrier,"T_Car",bycar)); 69 pool.execute(new Tour(cyclicBarrier,"T_Bus",bybus)); 70 71 //关闭线程池 72 pool.shutdown(); 73 } 74 }
8.Future<V>:是一个接口,实现类:FutureTask,CompletableFuture等,get()方法将返回V类型的数据。提供方法来检查计算是否完成,等待其完成,并检索计算结果。 结果只能在计算完成后使用方法get
进行检索,如有必要,阻塞,直到准备就绪。 取消由cancel
方法执行。 提供其他方法来确定任务是否正常完成或被取消。 计算完成后,不能取消计算。 如果您想使用Future
,以便不可撤销,但不提供可用的结果,则可以声明Future<?>
表格的类型,并返回null
作为基础任务的结果。
9.ScheduledExecutorService:接口:(实现类:ScheduledExecutorThreadPool),作用可以让一个EexcutorService在给定延迟后执行,或者是定期执行。
1 //ScheduledExecutorService:接口:(实现类:ScheduledExecutorThreadPool) 2 //作用可以让一个EexcutorService在给定延迟后执行,或者是定期执行。 3 // 方法1:pool.ScheduledFuture<?> scheduleAtFixedRate(Runnable var1, long var2, long var4, TimeUnit var6); 4 //参数含义:command - 要执行的任务 5 // initialDelay - 延迟第一次执行的时间 6 // period - 连续执行之间的时期 7 // unit - initialDelay和period参数的时间单位 8 9 //方法2:ScheduledFuture<?> schedule(Runnable command, 10 // long delay, 11 // TimeUnit unit)创建并执行在给定延迟后启用的单次操作。 12 //参数 13 //command - 要执行的任务 14 //delay - 从现在开始延迟执行的时间 15 //unit - 延时参数的时间单位 16 public class ScheduledExecutorServiceTest { 17 public static void main(String[] args) { 18 //创建线程池 19 final ScheduledExecutorService pool= Executors.newScheduledThreadPool(2); 20 //创建一个可以在给定延迟后运行,或者定期执行的线程池 21 final Runnable runnable=new Runnable() { 22 int count=0; 23 @Override 24 public void run() { 25 System.out.println(new Date()+"runnable"+(++count)); 26 } 27 }; 28 29 //1秒后运行,每隔2秒运行一次 30 ScheduledFuture<?> scheduledFuture = pool.scheduleAtFixedRate(runnable,1,2,SECONDS); 31 //2秒后运行,往后每隔5秒运行一次 32 ScheduledFuture<?> scheduledFuture1 = pool.scheduleAtFixedRate(runnable, 2, 5, SECONDS); 33 //30秒后结束关闭任务,并关闭Scheduled 34 pool.schedule(new Runnable() { 35 @Override 36 public void run() { 37 //关闭操作 38 scheduledFuture.cancel(true); 39 scheduledFuture1.cancel(true); 40 //关闭线程池 41 pool.shutdown(); 42 } 43 },30,SECONDS); 44 45 } 46 }
以上内容参考来自Java ApI文档和: