Java线程(六):JUC包阻塞队列的应用

时间:2023-02-13 13:54:02

前言:

1、利用Lock 和 Condition  实现一个阻塞队列


一、阻塞队列 BlockingQueue接口

1、队列:数据的集合,先进先出。有固定大小的,也可以不固定。

2、阻塞与非阻塞:可以阻塞也可以不阻塞。

1)阻塞队列,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止。同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止。

2)非阻塞列队,上述情况会报错。


3、JUC 包阻塞队列还有更多实现类,用来满足各种复杂的需求:ArrayBlockingQueue, DelayQueue,LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,具体的API差别也很小。

1)LinkedBlockingQueue的容量默认是没有上限的(在不指定时容量为Integer.MAX_VALUE),也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。

2)ArrayBlockingQueue在构造时需要指定容量,并可以选择是否需要公平性(默认false),如果公平参数 被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的:即等待时间最长的线程 会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO(先进先出)原则对元素进行排序。

3)PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级 顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的,但是如果队列为空,取元素的操作take就会阻塞。另外,往入该队列中的元 素要具有比较能力。

4)DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed元素的*阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。


3、方法:只有put 和 take 才会阻塞

1)Queue 的方法

add           增加一个元索                                           如果队列已满,则抛出一个IIIegaISlabEepeplian异常

remove    移除并返回队列头部的元素                    如果队列为空,则抛出一个NoSuchElementException异常

element   返回但不移除队列头部的元素                如果队列为空,则抛出一个NoSuchElementException异常

offer         添加一个元素并返回true                        如果队列已满,则返回false,不会有异常

poll           移除并返问队列头部的元素                    如果队列为空,则返回null,不会有异常

peek         返回但不移除队列头部的元素                如果队列为空,则返回null,不会有异常


2)BlockingQueue在Queue基础上添加的接口:

put           添加一个元素                                         如果队列满,则阻塞

take         移除并返回队列头部的元素                   如果队列为空,则阻塞

offer(E e, long timeout, TimeUnit unit)                队列满时指定阻塞时间,否则立即返回false,不会有异常

poll(long timeout, TimeUnit unit)                          队列空时指定阻塞时间,否则立即返回null


二、实现生产者与消费者

package com.newThread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 
 * BlockingQueueTest.java
 * 
 * @title 阻塞队列的运用
 * @description Q 
 * 				1-队列:数据的集合,先进先出。有固定大小的,也可以不固定。 
 * 				2-阻塞与非阻塞:可以阻塞也可以不阻塞。非阻塞的,会报错。阻塞的会阻塞。
 *              3、ArrayBlockingQueue实现了BlockingQueue接口。
 *              	1)-Insert(放):add(e)-抛异常;offer(e)-返回true/false; put(e)-阻塞;
 *              	2)-Remove(取):remove()-抛异常; poll()-返回null; take()-阻塞; poll(time,unit)
 * @author SAM-SHO
 * @Date 2014-8-31
 */
public class BlockingQueueTest {
	public static void main(String[] args) {
		// 定义一个阻塞队列,允许放三个数据
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(3);

		// 两个子线程 放数据
		for (int i = 0; i < 2; i++) {
			new Thread() {
				public void run() {
					while (true) {
						try {
							Thread.sleep((long) (Math.random() * 1000));
							System.out.println(Thread.currentThread().getName() + "准备放数据!");
							queue.put(1);// put()放的时候,会阻塞

							// queue.size() 取到现在队列的大小
							System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据");
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}

			}.start();
		}

		// main 主线程 取数据
		new Thread() {
			public void run() {
				while (true) {
					try {
						// 将此处的睡眠时间分别改为100和1000,观察运行结果
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName() + "准备取数据!");

						queue.take();// 取数据
						System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}

		}.start();
	}
}


三、实现同步

1、阻塞队列与 Semphore(灯) 有些类似,但也不同,阻塞队列是一方存放数据,另一方释放数据。

2、Semphore 通常是由同一方设置和释放信号量

package com.newThread;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 
 * BlockingQueueCommunication.java
 * 
 * @title 阻塞队列实现同步通知
 * @description 1-阻塞队列与Semphore有些类似,但也不同,阻塞队列是一方存放数据,另一方释放数据
 *              2-Semphore通常是由同一方设置和释放信号量
 * @author SAM-SHO
 * @Date 2014-8-31
 */
public class BlockingQueueCommunication {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		final Business business = new Business();
		
		//子线程
		new Thread(new Runnable() {

			@Override
			public void run() {

				for (int i = 1; i <= 5; i++) {
					business.sub(i);
				}

			}
		}).start();

		// 主线程
		for (int i = 1; i <= 10; i++) {//5次循环以后,会阻塞。因为子线程已经结束 queue2 没有线程去取
			business.main(i);
		}

	}

	// 内部类使用 static ,实现为外部类
	static class Business {

		BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);//队列1
		BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);//队列2

		// 匿名构造方法,运行时机在任何构造方法之前
		{
//			Collections.synchronizedMap(null);// map的同步类方法
			try {
				System.out.println("xxxxxdfsdsafdsa");
				queue2.put(1);// 先在队列2 queue2中放数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		/**
		 * 
		 * @param i
		 */
		public void sub(int i) {
			try {
				queue1.put(1);//队列1 放数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			for (int j = 1; j <= 10; j++) {
				System.out.println("子线程 sequece of " + j + ",loop of " + i);
			}
			try {
				queue2.take();//队列2 取数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		/**
		 * 
		 * @param i
		 */
		public void main(int i) {
			try {
				queue2.put(1);//队列2 放数据,如果先主线程进来,会阻塞
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			for (int j = 1; j <= 4; j++) {
				System.out.println("主线程  sequece of " + j + ",loop of " + i);
			}
			try {
				queue1.take();// 队列1 取数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

【输出】

xxxxxdfsdsafdsa
子线程 sequece of 1,loop of 1
子线程 sequece of 2,loop of 1
子线程 sequece of 3,loop of 1
子线程 sequece of 4,loop of 1
子线程 sequece of 5,loop of 1
子线程 sequece of 6,loop of 1
子线程 sequece of 7,loop of 1
子线程 sequece of 8,loop of 1
子线程 sequece of 9,loop of 1
子线程 sequece of 10,loop of 1
主线程  sequece of 1,loop of 1
主线程  sequece of 2,loop of 1
主线程  sequece of 3,loop of 1
主线程  sequece of 4,loop of 1
子线程 sequece of 1,loop of 2
子线程 sequece of 2,loop of 2
子线程 sequece of 3,loop of 2
子线程 sequece of 4,loop of 2
子线程 sequece of 5,loop of 2
子线程 sequece of 6,loop of 2
子线程 sequece of 7,loop of 2
子线程 sequece of 8,loop of 2
子线程 sequece of 9,loop of 2
子线程 sequece of 10,loop of 2
主线程  sequece of 1,loop of 2
主线程  sequece of 2,loop of 2
主线程  sequece of 3,loop of 2
主线程  sequece of 4,loop of 2
子线程 sequece of 1,loop of 3
子线程 sequece of 2,loop of 3
子线程 sequece of 3,loop of 3
子线程 sequece of 4,loop of 3
子线程 sequece of 5,loop of 3
子线程 sequece of 6,loop of 3
子线程 sequece of 7,loop of 3
子线程 sequece of 8,loop of 3
子线程 sequece of 9,loop of 3
子线程 sequece of 10,loop of 3
主线程  sequece of 1,loop of 3
主线程  sequece of 2,loop of 3
主线程  sequece of 3,loop of 3
主线程  sequece of 4,loop of 3
子线程 sequece of 1,loop of 4
子线程 sequece of 2,loop of 4
子线程 sequece of 3,loop of 4
子线程 sequece of 4,loop of 4
子线程 sequece of 5,loop of 4
子线程 sequece of 6,loop of 4
子线程 sequece of 7,loop of 4
子线程 sequece of 8,loop of 4
子线程 sequece of 9,loop of 4
子线程 sequece of 10,loop of 4
主线程  sequece of 1,loop of 4
主线程  sequece of 2,loop of 4
主线程  sequece of 3,loop of 4
主线程  sequece of 4,loop of 4
子线程 sequece of 1,loop of 5
子线程 sequece of 2,loop of 5
子线程 sequece of 3,loop of 5
子线程 sequece of 4,loop of 5
子线程 sequece of 5,loop of 5
子线程 sequece of 6,loop of 5
子线程 sequece of 7,loop of 5
子线程 sequece of 8,loop of 5
子线程 sequece of 9,loop of 5
子线程 sequece of 10,loop of 5
主线程  sequece of 1,loop of 5
主线程  sequece of 2,loop of 5
主线程  sequece of 3,loop of 5
主线程  sequece of 4,loop of 5