前言:
一、阻塞队列 BlockingQueue接口
1、队列:数据的集合,先进先出。有固定大小的,也可以不固定。
2、阻塞与非阻塞:可以阻塞也可以不阻塞。
1)阻塞队列,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止。同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止。
2)非阻塞列队,上述情况会报错。
3、JUC 包阻塞队列还有更多实现类,用来满足各种复杂的需求:ArrayBlockingQueue, DelayQueue,LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,具体的API差别也很小。
1)LinkedBlockingQueue的容量默认是没有上限的(在不指定时容量为Integer.MAX_VALUE),也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
2)ArrayBlockingQueue在构造时需要指定容量,并可以选择是否需要公平性(默认false),如果公平参数 被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的:即等待时间最长的线程 会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO(先进先出)原则对元素进行排序。
3)PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级 顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的,但是如果队列为空,取元素的操作take就会阻塞。另外,往入该队列中的元 素要具有比较能力。
4)DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed元素的*阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
3、方法:只有put 和 take 才会阻塞
1)Queue 的方法
add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
element 返回但不移除队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
offer 添加一个元素并返回true 如果队列已满,则返回false,不会有异常
poll 移除并返问队列头部的元素 如果队列为空,则返回null,不会有异常
peek 返回但不移除队列头部的元素 如果队列为空,则返回null,不会有异常
2)BlockingQueue在Queue基础上添加的接口:
put 添加一个元素 如果队列满,则阻塞
take 移除并返回队列头部的元素 如果队列为空,则阻塞
offer(E e, long timeout, TimeUnit unit) 队列满时指定阻塞时间,否则立即返回false,不会有异常
poll(long timeout, TimeUnit unit) 队列空时指定阻塞时间,否则立即返回null
二、实现生产者与消费者
package com.newThread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * * BlockingQueueTest.java * * @title 阻塞队列的运用 * @description Q * 1-队列:数据的集合,先进先出。有固定大小的,也可以不固定。 * 2-阻塞与非阻塞:可以阻塞也可以不阻塞。非阻塞的,会报错。阻塞的会阻塞。 * 3、ArrayBlockingQueue实现了BlockingQueue接口。 * 1)-Insert(放):add(e)-抛异常;offer(e)-返回true/false; put(e)-阻塞; * 2)-Remove(取):remove()-抛异常; poll()-返回null; take()-阻塞; poll(time,unit) * @author SAM-SHO * @Date 2014-8-31 */ public class BlockingQueueTest { public static void main(String[] args) { // 定义一个阻塞队列,允许放三个数据 final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(3); // 两个子线程 放数据 for (int i = 0; i < 2; i++) { new Thread() { public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); System.out.println(Thread.currentThread().getName() + "准备放数据!"); queue.put(1);// put()放的时候,会阻塞 // queue.size() 取到现在队列的大小 System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } // main 主线程 取数据 new Thread() { public void run() { while (true) { try { // 将此处的睡眠时间分别改为100和1000,观察运行结果 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "准备取数据!"); queue.take();// 取数据 System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
三、实现同步
1、阻塞队列与 Semphore(灯) 有些类似,但也不同,阻塞队列是一方存放数据,另一方释放数据。
2、Semphore 通常是由同一方设置和释放信号量
package com.newThread; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * * BlockingQueueCommunication.java * * @title 阻塞队列实现同步通知 * @description 1-阻塞队列与Semphore有些类似,但也不同,阻塞队列是一方存放数据,另一方释放数据 * 2-Semphore通常是由同一方设置和释放信号量 * @author SAM-SHO * @Date 2014-8-31 */ public class BlockingQueueCommunication { /** * @param args */ public static void main(String[] args) { final Business business = new Business(); //子线程 new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 5; i++) { business.sub(i); } } }).start(); // 主线程 for (int i = 1; i <= 10; i++) {//5次循环以后,会阻塞。因为子线程已经结束 queue2 没有线程去取 business.main(i); } } // 内部类使用 static ,实现为外部类 static class Business { BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);//队列1 BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);//队列2 // 匿名构造方法,运行时机在任何构造方法之前 { // Collections.synchronizedMap(null);// map的同步类方法 try { System.out.println("xxxxxdfsdsafdsa"); queue2.put(1);// 先在队列2 queue2中放数据 } catch (InterruptedException e) { e.printStackTrace(); } } /** * * @param i */ public void sub(int i) { try { queue1.put(1);//队列1 放数据 } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 1; j <= 10; j++) { System.out.println("子线程 sequece of " + j + ",loop of " + i); } try { queue2.take();//队列2 取数据 } catch (InterruptedException e) { e.printStackTrace(); } } /** * * @param i */ public void main(int i) { try { queue2.put(1);//队列2 放数据,如果先主线程进来,会阻塞 } catch (InterruptedException e1) { e1.printStackTrace(); } for (int j = 1; j <= 4; j++) { System.out.println("主线程 sequece of " + j + ",loop of " + i); } try { queue1.take();// 队列1 取数据 } catch (InterruptedException e) { e.printStackTrace(); } } } }
【输出】
xxxxxdfsdsafdsa 子线程 sequece of 1,loop of 1 子线程 sequece of 2,loop of 1 子线程 sequece of 3,loop of 1 子线程 sequece of 4,loop of 1 子线程 sequece of 5,loop of 1 子线程 sequece of 6,loop of 1 子线程 sequece of 7,loop of 1 子线程 sequece of 8,loop of 1 子线程 sequece of 9,loop of 1 子线程 sequece of 10,loop of 1 主线程 sequece of 1,loop of 1 主线程 sequece of 2,loop of 1 主线程 sequece of 3,loop of 1 主线程 sequece of 4,loop of 1 子线程 sequece of 1,loop of 2 子线程 sequece of 2,loop of 2 子线程 sequece of 3,loop of 2 子线程 sequece of 4,loop of 2 子线程 sequece of 5,loop of 2 子线程 sequece of 6,loop of 2 子线程 sequece of 7,loop of 2 子线程 sequece of 8,loop of 2 子线程 sequece of 9,loop of 2 子线程 sequece of 10,loop of 2 主线程 sequece of 1,loop of 2 主线程 sequece of 2,loop of 2 主线程 sequece of 3,loop of 2 主线程 sequece of 4,loop of 2 子线程 sequece of 1,loop of 3 子线程 sequece of 2,loop of 3 子线程 sequece of 3,loop of 3 子线程 sequece of 4,loop of 3 子线程 sequece of 5,loop of 3 子线程 sequece of 6,loop of 3 子线程 sequece of 7,loop of 3 子线程 sequece of 8,loop of 3 子线程 sequece of 9,loop of 3 子线程 sequece of 10,loop of 3 主线程 sequece of 1,loop of 3 主线程 sequece of 2,loop of 3 主线程 sequece of 3,loop of 3 主线程 sequece of 4,loop of 3 子线程 sequece of 1,loop of 4 子线程 sequece of 2,loop of 4 子线程 sequece of 3,loop of 4 子线程 sequece of 4,loop of 4 子线程 sequece of 5,loop of 4 子线程 sequece of 6,loop of 4 子线程 sequece of 7,loop of 4 子线程 sequece of 8,loop of 4 子线程 sequece of 9,loop of 4 子线程 sequece of 10,loop of 4 主线程 sequece of 1,loop of 4 主线程 sequece of 2,loop of 4 主线程 sequece of 3,loop of 4 主线程 sequece of 4,loop of 4 子线程 sequece of 1,loop of 5 子线程 sequece of 2,loop of 5 子线程 sequece of 3,loop of 5 子线程 sequece of 4,loop of 5 子线程 sequece of 5,loop of 5 子线程 sequece of 6,loop of 5 子线程 sequece of 7,loop of 5 子线程 sequece of 8,loop of 5 子线程 sequece of 9,loop of 5 子线程 sequece of 10,loop of 5 主线程 sequece of 1,loop of 5 主线程 sequece of 2,loop of 5 主线程 sequece of 3,loop of 5 主线程 sequece of 4,loop of 5