Java 并发编程系列文章
Java 并发编程——Callable+Future+FutureTask
java并发编程——通过ReentrantLock,Condition实现银行存取款
简介
BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
阻塞队列是一个队列,而且是一个先进先出的队列(FIFO)。
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。
但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)
如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。
BlockingQueue接口介绍
放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续。
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
运用实践
public class Producer implements Runnable{ private volatile boolean isRunning = true; private BlockingQueue queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; private String produceName; public Producer(BlockingQueue queue,String name) { this.queue = queue; this.produceName = name; } public void run() { String data = null; Random r = new Random(); System.out.println(produceName+" 启动生产者线程!"); try { while (isRunning) { System.out.println(produceName+" 正在生产数据..."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println(produceName+" 将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println(produceName+" 放入数据失败:" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println(produceName+" 退出生产者线程!"); } } public void stop() { isRunning = false; } }
public class Consumer implements Runnable { private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; private String cusumerName; public Consumer(BlockingQueue<String> queue,String name) { this.queue = queue; this.cusumerName = name; } public void run() { System.out.println(cusumerName+" 启动消费者线程!"); Random r = new Random(); boolean isRunning = true; try { while (isRunning) { System.out.println(cusumerName+" 正从队列获取数据..."); String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println(cusumerName+" 拿到数据:" + data); System.out.println(cusumerName+" 正在消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); } else { // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false; } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { System.out.println(cusumerName+" 退出消费者线程!"); } } }
public class BlockingQueueTest { public static void main(String[] args) { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue,"P1"); Producer producer2 = new Producer(queue,"P2"); Producer producer3 = new Producer(queue,"P2"); Consumer consumer = new Consumer(queue,"C"); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); // 执行10s try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } producer1.stop(); producer2.stop(); producer3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 退出Executor service.shutdown(); } }
上述代码的示意图如下,开辟了三个线作为生产者,一个线程作为消费者。生产者负责往队列中添加数据,消费者负责从队列中消费数据(当队列中没有数据时则处于阻塞状态)
从上面的运用实践中很容易理解阻塞队列的好处,让设计的隔离度更好,生产者只负责生产数据消费者只负责消费数据,而不用关心队列中具体有多少数据,如果满和空的特殊处理也不用关心。
可以想象一下如果没有阻塞队列,自己定义一个数组存放元素,生产者和消费者需要做很多额外的控制工作,并对边界条件做特殊处理。最重要的一点是生产者和消费者还要保证多线程操作数组数据的安全性同时兼顾效率,这应该是件很头疼的事。
这里可能有个疑惑, 3个Producer产生数据,当队列已经满时,其它Producer如何再往队列里面生产数据?
可以看到Producer中的代码,通过 offer(data, 2, TimeUnit.SECONDS) 往队列中添加数据,此时如果队列已满则阻塞等待直到Customer从队列中取走一个数据,然后再将数据放入,这里等待的时间不等。队列满时,offer()函数从开始执行到结束可能需要经历0~2000ms。从执行结果看,所有数据都成功的加入了队列没有出现超时的现象。
ArrayBlockingQueue源码分析
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
可以看出ArrayBlockingQueue不光实现了BlockingQueue接口还继承了抽象类AbstractQueue,说明可以对进行队列的操作(可以参考java容器类4:Queue深入解读)。建议先了解可重入锁和条件变量的概念:
java并发编程——通过ReentrantLock,Condition实现银行存取款
下面看一下里面的主要成员变量
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */
// 主要解决多线程访问的线程安全性问题 final ReentrantLock lock; /** Condition for waiting takes */// 添加元素时,通过notEmpty 唤醒消费线程(在等待该条件)
private final Condition notEmpty; /** Condition for waiting puts */
// 删除元素时,通过 notFull 唤醒生成线程(在等待该条件) private final Condition notFull;
通过一个数组存放队列元素,并且通过维护一个插入元素(putIndex)和移除元素(takeIndex)的位置来控制元素的添加和删除。
看一下里面比较复杂的函数,大概能了解ArrayBlockingQueue的具体工作原理了:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { Objects.requireNonNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0L) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
该函数是往阻塞队列中添加元素,如果超过设置的时间还没有添加成功(可能队列已满,且没有其它线程从中移除元素)则返回false。源码中可以看出,当执行添加时,首先获取阻塞队列的锁,如果队列未满则直接添加元素返回true即可。
当队列已满,则调用 notFull(Condition类型)的awaitNanos()方法,该方法或释放可重入锁,并且让线程进入等待状态,知道有其它线程将该线程唤醒。enqueue的源码中会调用 notEmpty.signal()方法唤醒阻塞的移除元素的线程。同理,当某个线程调用take()/remove()/poll()时会调用 notFull.signal()唤醒一个被阻塞的添加元素的线程。
LinkedBlockingQueue
构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
成员变量
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
可以发现LinkedBlockingQueue和ArrayBlockingQueue成员变量还是有差别的
1.它内部是通过链表存储的,而ArrayBlockingQueue是通过数组存储的
2. 它设置了两个可重入锁,一个控制存,一个控制取。 (感觉这样并发性更好)
3. 它的计数器count采用: AtomicInteger ,而ArrayBlockingQueue采用的int。 可能原因: 在LinkedBlockingQueue中两端都可以同时进行存取操作(因为不是同一个锁,这时可能需要同时改变计数器的值,所以要保证线程安全,所有用了AtomicInteger ),而在ArrayBlockingQueue中不可能存在多个线程操作count值的情况,所以直接使用了int。
上图中画出了LinkedBlockingQueue的工作机制,通过takeLock,putLock两把锁分别控制取数据和存数据,两者可以同时进。 下面可以看一下取数据的源码,其实很简单:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count;// 获取取数据的锁
final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try {//如果没有数据,则进入挂起状态,直到“存操作”唤醒该挂起装填
while (count.get() == 0) {
notEmpty.await();
}
// 将数据弹出队列,并将计数器减一
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 如果有挂起的存线程,则将其唤醒
notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
小结
BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待与唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。
参考:
https://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html