Java多线程系列

时间:2022-06-03 08:27:08

 

概要

本章介绍JUC包中的LinkedBlockingDeque。内容包括:
LinkedBlockingDeque介绍
LinkedBlockingDeque原理和数据结构
LinkedBlockingDeque函数列表
LinkedBlockingDeque源码分析(JDK1.7.0_40版本)
LinkedBlockingDeque示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503480.html

 

LinkedBlockingDeque介绍

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。

此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

 

LinkedBlockingDeque原理和数据结构

LinkedBlockingDeque的数据结构,如下图所示:

Java多线程系列

说明
1. LinkedBlockingDeque继承于AbstractQueue,它本质上是一个支持FIFO和FILO的双向的队列。
2. LinkedBlockingDeque实现了BlockingDeque接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. LinkedBlockingDeque是通过双向链表实现的。
3.1 first是双向链表的表头。
3.2 last是双向链表的表尾。
3.3 count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数。
3.4 capacity是LinkedBlockingDeque的容量,它是在创建LinkedBlockingDeque时指定的。
3.5 lock是控制对LinkedBlockingDeque的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。
3.6 notEmptynotFull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。

     -- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

关于ReentrantLock 和 Condition等更多的内容,可以参考:
    (01) Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock
    (02) Java多线程系列--“JUC锁”03之 公平锁(一)
    (03) Java多线程系列--“JUC锁”04之 公平锁(二)
    (04) Java多线程系列--“JUC锁”05之 非公平锁
    (05) Java多线程系列--“JUC锁”06之 Condition条件

 

LinkedBlockingDeque函数列表

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。
boolean add(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 从此双端队列移除所有元素。
void clear()
// 如果此双端队列包含指定的元素,则返回 true。
boolean contains(Object o)
// 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 获取但不移除此双端队列表示的队列的头部。
E element()
// 获取,但不移除此双端队列的第一个元素。
E getFirst()
// 获取,但不移除此双端队列的最后一个元素。
E getLast()
// 返回在此双端队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offer(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerFirst(E e)
// 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerLast(E e)
// 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E peek()
// 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E peekFirst()
// 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E peekLast()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E poll()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
E poll(long timeout, TimeUnit unit)
// 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E pollFirst()
// 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E pollLast()
// 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 从此双端队列所表示的堆栈中弹出一个元素。
E pop()
// 将元素推入此双端队列表示的栈。
void push(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间。
void put(E e)
// 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
void putFirst(E e)
// 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。
void putLast(E e)
// 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数。
int remainingCapacity()
// 获取并移除此双端队列表示的队列的头部。
E remove()
// 从此双端队列移除第一次出现的指定元素。
boolean remove(Object o)
// 获取并移除此双端队列第一个元素。
E removeFirst()
// 从此双端队列移除第一次出现的指定元素。
boolean removeFirstOccurrence(Object o)
// 获取并移除此双端队列的最后一个元素。
E removeLast()
// 从此双端队列移除最后一次出现的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此双端队列中的元素数。
int size()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。
E take()
// 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素。
E takeFirst()
// 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素。
E takeLast()
// 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

 

LinkedBlockingDeque源码分析(JDK1.7.0_40版本)

LinkedBlockingDeque.java的完整源码如下:

Java多线程系列Java多线程系列
   1 /*
2 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
3 *
4 *
5 *
6 *
7 *
8 *
9 *
10 *
11 *
12 *
13 *
14 *
15 *
16 *
17 *
18 *
19 *
20 *
21 *
22 *
23 */
24
25 /*
26 *
27 *
28 *
29 *
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.concurrent.locks.Condition;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 /**
46 * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
47 * linked nodes.
48 *
49 * <p> The optional capacity bound constructor argument serves as a
50 * way to prevent excessive expansion. The capacity, if unspecified,
51 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
52 * dynamically created upon each insertion unless this would bring the
53 * deque above capacity.
54 *
55 * <p>Most operations run in constant time (ignoring time spent
56 * blocking). Exceptions include {@link #remove(Object) remove},
57 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
58 * #removeLastOccurrence removeLastOccurrence}, {@link #contains
59 * contains}, {@link #iterator iterator.remove()}, and the bulk
60 * operations, all of which run in linear time.
61 *
62 * <p>This class and its iterator implement all of the
63 * <em>optional</em> methods of the {@link Collection} and {@link
64 * Iterator} interfaces.
65 *
66 * <p>This class is a member of the
67 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
68 * Java Collections Framework</a>.
69 *
70 * @since 1.6
71 * @author Doug Lea
72 * @param <E> the type of elements held in this collection
73 */
74 public class LinkedBlockingDeque<E>
75 extends AbstractQueue<E>
76 implements BlockingDeque<E>, java.io.Serializable {
77
78 /*
79 * Implemented as a simple doubly-linked list protected by a
80 * single lock and using conditions to manage blocking.
81 *
82 * To implement weakly consistent iterators, it appears we need to
83 * keep all Nodes GC-reachable from a predecessor dequeued Node.
84 * That would cause two problems:
85 * - allow a rogue Iterator to cause unbounded memory retention
86 * - cause cross-generational linking of old Nodes to new Nodes if
87 * a Node was tenured while live, which generational GCs have a
88 * hard time dealing with, causing repeated major collections.
89 * However, only non-deleted Nodes need to be reachable from
90 * dequeued Nodes, and reachability does not necessarily have to
91 * be of the kind understood by the GC. We use the trick of
92 * linking a Node that has just been dequeued to itself. Such a
93 * self-link implicitly means to jump to "first" (for next links)
94 * or "last" (for prev links).
95 */
96
97 /*
98 * We have "diamond" multiple interface/abstract class inheritance
99 * here, and that introduces ambiguities. Often we want the
100 * BlockingDeque javadoc combined with the AbstractQueue
101 * implementation, so a lot of method specs are duplicated here.
102 */
103
104 private static final long serialVersionUID = -387911632671998426L;
105
106 /** Doubly-linked list node class */
107 static final class Node<E> {
108 /**
109 * The item, or null if this node has been removed.
110 */
111 E item;
112
113 /**
114 * One of:
115 * - the real predecessor Node
116 * - this Node, meaning the predecessor is tail
117 * - null, meaning there is no predecessor
118 */
119 Node<E> prev;
120
121 /**
122 * One of:
123 * - the real successor Node
124 * - this Node, meaning the successor is head
125 * - null, meaning there is no successor
126 */
127 Node<E> next;
128
129 Node(E x) {
130 item = x;
131 }
132 }
133
134 /**
135 * Pointer to first node.
136 * Invariant: (first == null && last == null) ||
137 * (first.prev == null && first.item != null)
138 */
139 transient Node<E> first;
140
141 /**
142 * Pointer to last node.
143 * Invariant: (first == null && last == null) ||
144 * (last.next == null && last.item != null)
145 */
146 transient Node<E> last;
147
148 /** Number of items in the deque */
149 private transient int count;
150
151 /** Maximum number of items in the deque */
152 private final int capacity;
153
154 /** Main lock guarding all access */
155 final ReentrantLock lock = new ReentrantLock();
156
157 /** Condition for waiting takes */
158 private final Condition notEmpty = lock.newCondition();
159
160 /** Condition for waiting puts */
161 private final Condition notFull = lock.newCondition();
162
163 /**
164 * Creates a {@code LinkedBlockingDeque} with a capacity of
165 * {@link Integer#MAX_VALUE}.
166 */
167 public LinkedBlockingDeque() {
168 this(Integer.MAX_VALUE);
169 }
170
171 /**
172 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
173 *
174 * @param capacity the capacity of this deque
175 * @throws IllegalArgumentException if {@code capacity} is less than 1
176 */
177 public LinkedBlockingDeque(int capacity) {
178 if (capacity <= 0) throw new IllegalArgumentException();
179 this.capacity = capacity;
180 }
181
182 /**
183 * Creates a {@code LinkedBlockingDeque} with a capacity of
184 * {@link Integer#MAX_VALUE}, initially containing the elements of
185 * the given collection, added in traversal order of the
186 * collection's iterator.
187 *
188 * @param c the collection of elements to initially contain
189 * @throws NullPointerException if the specified collection or any
190 * of its elements are null
191 */
192 public LinkedBlockingDeque(Collection<? extends E> c) {
193 this(Integer.MAX_VALUE);
194 final ReentrantLock lock = this.lock;
195 lock.lock(); // Never contended, but necessary for visibility
196 try {
197 for (E e : c) {
198 if (e == null)
199 throw new NullPointerException();
200 if (!linkLast(new Node<E>(e)))
201 throw new IllegalStateException("Deque full");
202 }
203 } finally {
204 lock.unlock();
205 }
206 }
207
208
209 // Basic linking and unlinking operations, called only while holding lock
210
211 /**
212 * Links node as first element, or returns false if full.
213 */
214 private boolean linkFirst(Node<E> node) {
215 // assert lock.isHeldByCurrentThread();
216 if (count >= capacity)
217 return false;
218 Node<E> f = first;
219 node.next = f;
220 first = node;
221 if (last == null)
222 last = node;
223 else
224 f.prev = node;
225 ++count;
226 notEmpty.signal();
227 return true;
228 }
229
230 /**
231 * Links node as last element, or returns false if full.
232 */
233 private boolean linkLast(Node<E> node) {
234 // assert lock.isHeldByCurrentThread();
235 if (count >= capacity)
236 return false;
237 Node<E> l = last;
238 node.prev = l;
239 last = node;
240 if (first == null)
241 first = node;
242 else
243 l.next = node;
244 ++count;
245 notEmpty.signal();
246 return true;
247 }
248
249 /**
250 * Removes and returns first element, or null if empty.
251 */
252 private E unlinkFirst() {
253 // assert lock.isHeldByCurrentThread();
254 Node<E> f = first;
255 if (f == null)
256 return null;
257 Node<E> n = f.next;
258 E item = f.item;
259 f.item = null;
260 f.next = f; // help GC
261 first = n;
262 if (n == null)
263 last = null;
264 else
265 n.prev = null;
266 --count;
267 notFull.signal();
268 return item;
269 }
270
271 /**
272 * Removes and returns last element, or null if empty.
273 */
274 private E unlinkLast() {
275 // assert lock.isHeldByCurrentThread();
276 Node<E> l = last;
277 if (l == null)
278 return null;
279 Node<E> p = l.prev;
280 E item = l.item;
281 l.item = null;
282 l.prev = l; // help GC
283 last = p;
284 if (p == null)
285 first = null;
286 else
287 p.next = null;
288 --count;
289 notFull.signal();
290 return item;
291 }
292
293 /**
294 * Unlinks x.
295 */
296 void unlink(Node<E> x) {
297 // assert lock.isHeldByCurrentThread();
298 Node<E> p = x.prev;
299 Node<E> n = x.next;
300 if (p == null) {
301 unlinkFirst();
302 } else if (n == null) {
303 unlinkLast();
304 } else {
305 p.next = n;
306 n.prev = p;
307 x.item = null;
308 // Don't mess with x's links. They may still be in use by
309 // an iterator.
310 --count;
311 notFull.signal();
312 }
313 }
314
315 // BlockingDeque methods
316
317 /**
318 * @throws IllegalStateException {@inheritDoc}
319 * @throws NullPointerException {@inheritDoc}
320 */
321 public void addFirst(E e) {
322 if (!offerFirst(e))
323 throw new IllegalStateException("Deque full");
324 }
325
326 /**
327 * @throws IllegalStateException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 */
330 public void addLast(E e) {
331 if (!offerLast(e))
332 throw new IllegalStateException("Deque full");
333 }
334
335 /**
336 * @throws NullPointerException {@inheritDoc}
337 */
338 public boolean offerFirst(E e) {
339 if (e == null) throw new NullPointerException();
340 Node<E> node = new Node<E>(e);
341 final ReentrantLock lock = this.lock;
342 lock.lock();
343 try {
344 return linkFirst(node);
345 } finally {
346 lock.unlock();
347 }
348 }
349
350 /**
351 * @throws NullPointerException {@inheritDoc}
352 */
353 public boolean offerLast(E e) {
354 if (e == null) throw new NullPointerException();
355 Node<E> node = new Node<E>(e);
356 final ReentrantLock lock = this.lock;
357 lock.lock();
358 try {
359 return linkLast(node);
360 } finally {
361 lock.unlock();
362 }
363 }
364
365 /**
366 * @throws NullPointerException {@inheritDoc}
367 * @throws InterruptedException {@inheritDoc}
368 */
369 public void putFirst(E e) throws InterruptedException {
370 if (e == null) throw new NullPointerException();
371 Node<E> node = new Node<E>(e);
372 final ReentrantLock lock = this.lock;
373 lock.lock();
374 try {
375 while (!linkFirst(node))
376 notFull.await();
377 } finally {
378 lock.unlock();
379 }
380 }
381
382 /**
383 * @throws NullPointerException {@inheritDoc}
384 * @throws InterruptedException {@inheritDoc}
385 */
386 public void putLast(E e) throws InterruptedException {
387 if (e == null) throw new NullPointerException();
388 Node<E> node = new Node<E>(e);
389 final ReentrantLock lock = this.lock;
390 lock.lock();
391 try {
392 while (!linkLast(node))
393 notFull.await();
394 } finally {
395 lock.unlock();
396 }
397 }
398
399 /**
400 * @throws NullPointerException {@inheritDoc}
401 * @throws InterruptedException {@inheritDoc}
402 */
403 public boolean offerFirst(E e, long timeout, TimeUnit unit)
404 throws InterruptedException {
405 if (e == null) throw new NullPointerException();
406 Node<E> node = new Node<E>(e);
407 long nanos = unit.toNanos(timeout);
408 final ReentrantLock lock = this.lock;
409 lock.lockInterruptibly();
410 try {
411 while (!linkFirst(node)) {
412 if (nanos <= 0)
413 return false;
414 nanos = notFull.awaitNanos(nanos);
415 }
416 return true;
417 } finally {
418 lock.unlock();
419 }
420 }
421
422 /**
423 * @throws NullPointerException {@inheritDoc}
424 * @throws InterruptedException {@inheritDoc}
425 */
426 public boolean offerLast(E e, long timeout, TimeUnit unit)
427 throws InterruptedException {
428 if (e == null) throw new NullPointerException();
429 Node<E> node = new Node<E>(e);
430 long nanos = unit.toNanos(timeout);
431 final ReentrantLock lock = this.lock;
432 lock.lockInterruptibly();
433 try {
434 while (!linkLast(node)) {
435 if (nanos <= 0)
436 return false;
437 nanos = notFull.awaitNanos(nanos);
438 }
439 return true;
440 } finally {
441 lock.unlock();
442 }
443 }
444
445 /**
446 * @throws NoSuchElementException {@inheritDoc}
447 */
448 public E removeFirst() {
449 E x = pollFirst();
450 if (x == null) throw new NoSuchElementException();
451 return x;
452 }
453
454 /**
455 * @throws NoSuchElementException {@inheritDoc}
456 */
457 public E removeLast() {
458 E x = pollLast();
459 if (x == null) throw new NoSuchElementException();
460 return x;
461 }
462
463 public E pollFirst() {
464 final ReentrantLock lock = this.lock;
465 lock.lock();
466 try {
467 return unlinkFirst();
468 } finally {
469 lock.unlock();
470 }
471 }
472
473 public E pollLast() {
474 final ReentrantLock lock = this.lock;
475 lock.lock();
476 try {
477 return unlinkLast();
478 } finally {
479 lock.unlock();
480 }
481 }
482
483 public E takeFirst() throws InterruptedException {
484 final ReentrantLock lock = this.lock;
485 lock.lock();
486 try {
487 E x;
488 while ( (x = unlinkFirst()) == null)
489 notEmpty.await();
490 return x;
491 } finally {
492 lock.unlock();
493 }
494 }
495
496 public E takeLast() throws InterruptedException {
497 final ReentrantLock lock = this.lock;
498 lock.lock();
499 try {
500 E x;
501 while ( (x = unlinkLast()) == null)
502 notEmpty.await();
503 return x;
504 } finally {
505 lock.unlock();
506 }
507 }
508
509 public E pollFirst(long timeout, TimeUnit unit)
510 throws InterruptedException {
511 long nanos = unit.toNanos(timeout);
512 final ReentrantLock lock = this.lock;
513 lock.lockInterruptibly();
514 try {
515 E x;
516 while ( (x = unlinkFirst()) == null) {
517 if (nanos <= 0)
518 return null;
519 nanos = notEmpty.awaitNanos(nanos);
520 }
521 return x;
522 } finally {
523 lock.unlock();
524 }
525 }
526
527 public E pollLast(long timeout, TimeUnit unit)
528 throws InterruptedException {
529 long nanos = unit.toNanos(timeout);
530 final ReentrantLock lock = this.lock;
531 lock.lockInterruptibly();
532 try {
533 E x;
534 while ( (x = unlinkLast()) == null) {
535 if (nanos <= 0)
536 return null;
537 nanos = notEmpty.awaitNanos(nanos);
538 }
539 return x;
540 } finally {
541 lock.unlock();
542 }
543 }
544
545 /**
546 * @throws NoSuchElementException {@inheritDoc}
547 */
548 public E getFirst() {
549 E x = peekFirst();
550 if (x == null) throw new NoSuchElementException();
551 return x;
552 }
553
554 /**
555 * @throws NoSuchElementException {@inheritDoc}
556 */
557 public E getLast() {
558 E x = peekLast();
559 if (x == null) throw new NoSuchElementException();
560 return x;
561 }
562
563 public E peekFirst() {
564 final ReentrantLock lock = this.lock;
565 lock.lock();
566 try {
567 return (first == null) ? null : first.item;
568 } finally {
569 lock.unlock();
570 }
571 }
572
573 public E peekLast() {
574 final ReentrantLock lock = this.lock;
575 lock.lock();
576 try {
577 return (last == null) ? null : last.item;
578 } finally {
579 lock.unlock();
580 }
581 }
582
583 public boolean removeFirstOccurrence(Object o) {
584 if (o == null) return false;
585 final ReentrantLock lock = this.lock;
586 lock.lock();
587 try {
588 for (Node<E> p = first; p != null; p = p.next) {
589 if (o.equals(p.item)) {
590 unlink(p);
591 return true;
592 }
593 }
594 return false;
595 } finally {
596 lock.unlock();
597 }
598 }
599
600 public boolean removeLastOccurrence(Object o) {
601 if (o == null) return false;
602 final ReentrantLock lock = this.lock;
603 lock.lock();
604 try {
605 for (Node<E> p = last; p != null; p = p.prev) {
606 if (o.equals(p.item)) {
607 unlink(p);
608 return true;
609 }
610 }
611 return false;
612 } finally {
613 lock.unlock();
614 }
615 }
616
617 // BlockingQueue methods
618
619 /**
620 * Inserts the specified element at the end of this deque unless it would
621 * violate capacity restrictions. When using a capacity-restricted deque,
622 * it is generally preferable to use method {@link #offer(Object) offer}.
623 *
624 * <p>This method is equivalent to {@link #addLast}.
625 *
626 * @throws IllegalStateException if the element cannot be added at this
627 * time due to capacity restrictions
628 * @throws NullPointerException if the specified element is null
629 */
630 public boolean add(E e) {
631 addLast(e);
632 return true;
633 }
634
635 /**
636 * @throws NullPointerException if the specified element is null
637 */
638 public boolean offer(E e) {
639 return offerLast(e);
640 }
641
642 /**
643 * @throws NullPointerException {@inheritDoc}
644 * @throws InterruptedException {@inheritDoc}
645 */
646 public void put(E e) throws InterruptedException {
647 putLast(e);
648 }
649
650 /**
651 * @throws NullPointerException {@inheritDoc}
652 * @throws InterruptedException {@inheritDoc}
653 */
654 public boolean offer(E e, long timeout, TimeUnit unit)
655 throws InterruptedException {
656 return offerLast(e, timeout, unit);
657 }
658
659 /**
660 * Retrieves and removes the head of the queue represented by this deque.
661 * This method differs from {@link #poll poll} only in that it throws an
662 * exception if this deque is empty.
663 *
664 * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
665 *
666 * @return the head of the queue represented by this deque
667 * @throws NoSuchElementException if this deque is empty
668 */
669 public E remove() {
670 return removeFirst();
671 }
672
673 public E poll() {
674 return pollFirst();
675 }
676
677 public E take() throws InterruptedException {
678 return takeFirst();
679 }
680
681 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
682 return pollFirst(timeout, unit);
683 }
684
685 /**
686 * Retrieves, but does not remove, the head of the queue represented by
687 * this deque. This method differs from {@link #peek peek} only in that
688 * it throws an exception if this deque is empty.
689 *
690 * <p>This method is equivalent to {@link #getFirst() getFirst}.
691 *
692 * @return the head of the queue represented by this deque
693 * @throws NoSuchElementException if this deque is empty
694 */
695 public E element() {
696 return getFirst();
697 }
698
699 public E peek() {
700 return peekFirst();
701 }
702
703 /**
704 * Returns the number of additional elements that this deque can ideally
705 * (in the absence of memory or resource constraints) accept without
706 * blocking. This is always equal to the initial capacity of this deque
707 * less the current {@code size} of this deque.
708 *
709 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
710 * an element will succeed by inspecting {@code remainingCapacity}
711 * because it may be the case that another thread is about to
712 * insert or remove an element.
713 */
714 public int remainingCapacity() {
715 final ReentrantLock lock = this.lock;
716 lock.lock();
717 try {
718 return capacity - count;
719 } finally {
720 lock.unlock();
721 }
722 }
723
724 /**
725 * @throws UnsupportedOperationException {@inheritDoc}
726 * @throws ClassCastException {@inheritDoc}
727 * @throws NullPointerException {@inheritDoc}
728 * @throws IllegalArgumentException {@inheritDoc}
729 */
730 public int drainTo(Collection<? super E> c) {
731 return drainTo(c, Integer.MAX_VALUE);
732 }
733
734 /**
735 * @throws UnsupportedOperationException {@inheritDoc}
736 * @throws ClassCastException {@inheritDoc}
737 * @throws NullPointerException {@inheritDoc}
738 * @throws IllegalArgumentException {@inheritDoc}
739 */
740 public int drainTo(Collection<? super E> c, int maxElements) {
741 if (c == null)
742 throw new NullPointerException();
743 if (c == this)
744 throw new IllegalArgumentException();
745 final ReentrantLock lock = this.lock;
746 lock.lock();
747 try {
748 int n = Math.min(maxElements, count);
749 for (int i = 0; i < n; i++) {
750 c.add(first.item); // In this order, in case add() throws.
751 unlinkFirst();
752 }
753 return n;
754 } finally {
755 lock.unlock();
756 }
757 }
758
759 // Stack methods
760
761 /**
762 * @throws IllegalStateException {@inheritDoc}
763 * @throws NullPointerException {@inheritDoc}
764 */
765 public void push(E e) {
766 addFirst(e);
767 }
768
769 /**
770 * @throws NoSuchElementException {@inheritDoc}
771 */
772 public E pop() {
773 return removeFirst();
774 }
775
776 // Collection methods
777
778 /**
779 * Removes the first occurrence of the specified element from this deque.
780 * If the deque does not contain the element, it is unchanged.
781 * More formally, removes the first element {@code e} such that
782 * {@code o.equals(e)} (if such an element exists).
783 * Returns {@code true} if this deque contained the specified element
784 * (or equivalently, if this deque changed as a result of the call).
785 *
786 * <p>This method is equivalent to
787 * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
788 *
789 * @param o element to be removed from this deque, if present
790 * @return {@code true} if this deque changed as a result of the call
791 */
792 public boolean remove(Object o) {
793 return removeFirstOccurrence(o);
794 }
795
796 /**
797 * Returns the number of elements in this deque.
798 *
799 * @return the number of elements in this deque
800 */
801 public int size() {
802 final ReentrantLock lock = this.lock;
803 lock.lock();
804 try {
805 return count;
806 } finally {
807 lock.unlock();
808 }
809 }
810
811 /**
812 * Returns {@code true} if this deque contains the specified element.
813 * More formally, returns {@code true} if and only if this deque contains
814 * at least one element {@code e} such that {@code o.equals(e)}.
815 *
816 * @param o object to be checked for containment in this deque
817 * @return {@code true} if this deque contains the specified element
818 */
819 public boolean contains(Object o) {
820 if (o == null) return false;
821 final ReentrantLock lock = this.lock;
822 lock.lock();
823 try {
824 for (Node<E> p = first; p != null; p = p.next)
825 if (o.equals(p.item))
826 return true;
827 return false;
828 } finally {
829 lock.unlock();
830 }
831 }
832
833 /*
834 * TODO: Add support for more efficient bulk operations.
835 *
836 * We don't want to acquire the lock for every iteration, but we
837 * also want other threads a chance to interact with the
838 * collection, especially when count is close to capacity.
839 */
840
841 // /**
842 // * Adds all of the elements in the specified collection to this
843 // * queue. Attempts to addAll of a queue to itself result in
844 // * {@code IllegalArgumentException}. Further, the behavior of
845 // * this operation is undefined if the specified collection is
846 // * modified while the operation is in progress.
847 // *
848 // * @param c collection containing elements to be added to this queue
849 // * @return {@code true} if this queue changed as a result of the call
850 // * @throws ClassCastException {@inheritDoc}
851 // * @throws NullPointerException {@inheritDoc}
852 // * @throws IllegalArgumentException {@inheritDoc}
853 // * @throws IllegalStateException {@inheritDoc}
854 // * @see #add(Object)
855 // */
856 // public boolean addAll(Collection<? extends E> c) {
857 // if (c == null)
858 // throw new NullPointerException();
859 // if (c == this)
860 // throw new IllegalArgumentException();
861 // final ReentrantLock lock = this.lock;
862 // lock.lock();
863 // try {
864 // boolean modified = false;
865 // for (E e : c)
866 // if (linkLast(e))
867 // modified = true;
868 // return modified;
869 // } finally {
870 // lock.unlock();
871 // }
872 // }
873
874 /**
875 * Returns an array containing all of the elements in this deque, in
876 * proper sequence (from first to last element).
877 *
878 * <p>The returned array will be "safe" in that no references to it are
879 * maintained by this deque. (In other words, this method must allocate
880 * a new array). The caller is thus free to modify the returned array.
881 *
882 * <p>This method acts as bridge between array-based and collection-based
883 * APIs.
884 *
885 * @return an array containing all of the elements in this deque
886 */
887 @SuppressWarnings("unchecked")
888 public Object[] toArray() {
889 final ReentrantLock lock = this.lock;
890 lock.lock();
891 try {
892 Object[] a = new Object[count];
893 int k = 0;
894 for (Node<E> p = first; p != null; p = p.next)
895 a[k++] = p.item;
896 return a;
897 } finally {
898 lock.unlock();
899 }
900 }
901
902 /**
903 * Returns an array containing all of the elements in this deque, in
904 * proper sequence; the runtime type of the returned array is that of
905 * the specified array. If the deque fits in the specified array, it
906 * is returned therein. Otherwise, a new array is allocated with the
907 * runtime type of the specified array and the size of this deque.
908 *
909 * <p>If this deque fits in the specified array with room to spare
910 * (i.e., the array has more elements than this deque), the element in
911 * the array immediately following the end of the deque is set to
912 * {@code null}.
913 *
914 * <p>Like the {@link #toArray()} method, this method acts as bridge between
915 * array-based and collection-based APIs. Further, this method allows
916 * precise control over the runtime type of the output array, and may,
917 * under certain circumstances, be used to save allocation costs.
918 *
919 * <p>Suppose {@code x} is a deque known to contain only strings.
920 * The following code can be used to dump the deque into a newly
921 * allocated array of {@code String}:
922 *
923 * <pre>
924 * String[] y = x.toArray(new String[0]);</pre>
925 *
926 * Note that {@code toArray(new Object[0])} is identical in function to
927 * {@code toArray()}.
928 *
929 * @param a the array into which the elements of the deque are to
930 * be stored, if it is big enough; otherwise, a new array of the
931 * same runtime type is allocated for this purpose
932 * @return an array containing all of the elements in this deque
933 * @throws ArrayStoreException if the runtime type of the specified array
934 * is not a supertype of the runtime type of every element in
935 * this deque
936 * @throws NullPointerException if the specified array is null
937 */
938 @SuppressWarnings("unchecked")
939 public <T> T[] toArray(T[] a) {
940 final ReentrantLock lock = this.lock;
941 lock.lock();
942 try {
943 if (a.length < count)
944 a = (T[])java.lang.reflect.Array.newInstance
945 (a.getClass().getComponentType(), count);
946
947 int k = 0;
948 for (Node<E> p = first; p != null; p = p.next)
949 a[k++] = (T)p.item;
950 if (a.length > k)
951 a[k] = null;
952 return a;
953 } finally {
954 lock.unlock();
955 }
956 }
957
958 public String toString() {
959 final ReentrantLock lock = this.lock;
960 lock.lock();
961 try {
962 Node<E> p = first;
963 if (p == null)
964 return "[]";
965
966 StringBuilder sb = new StringBuilder();
967 sb.append('[');
968 for (;;) {
969 E e = p.item;
970 sb.append(e == this ? "(this Collection)" : e);
971 p = p.next;
972 if (p == null)
973 return sb.append(']').toString();
974 sb.append(',').append(' ');
975 }
976 } finally {
977 lock.unlock();
978 }
979 }
980
981 /**
982 * Atomically removes all of the elements from this deque.
983 * The deque will be empty after this call returns.
984 */
985 public void clear() {
986 final ReentrantLock lock = this.lock;
987 lock.lock();
988 try {
989 for (Node<E> f = first; f != null; ) {
990 f.item = null;
991 Node<E> n = f.next;
992 f.prev = null;
993 f.next = null;
994 f = n;
995 }
996 first = last = null;
997 count = 0;
998 notFull.signalAll();
999 } finally {
1000 lock.unlock();
1001 }
1002 }
1003
1004 /**
1005 * Returns an iterator over the elements in this deque in proper sequence.
1006 * The elements will be returned in order from first (head) to last (tail).
1007 *
1008 * <p>The returned iterator is a "weakly consistent" iterator that
1009 * will never throw {@link java.util.ConcurrentModificationException
1010 * ConcurrentModificationException}, and guarantees to traverse
1011 * elements as they existed upon construction of the iterator, and
1012 * may (but is not guaranteed to) reflect any modifications
1013 * subsequent to construction.
1014 *
1015 * @return an iterator over the elements in this deque in proper sequence
1016 */
1017 public Iterator<E> iterator() {
1018 return new Itr();
1019 }
1020
1021 /**
1022 * Returns an iterator over the elements in this deque in reverse
1023 * sequential order. The elements will be returned in order from
1024 * last (tail) to first (head).
1025 *
1026 * <p>The returned iterator is a "weakly consistent" iterator that
1027 * will never throw {@link java.util.ConcurrentModificationException
1028 * ConcurrentModificationException}, and guarantees to traverse
1029 * elements as they existed upon construction of the iterator, and
1030 * may (but is not guaranteed to) reflect any modifications
1031 * subsequent to construction.
1032 *
1033 * @return an iterator over the elements in this deque in reverse order
1034 */
1035 public Iterator<E> descendingIterator() {
1036 return new DescendingItr();
1037 }
1038
1039 /**
1040 * Base class for Iterators for LinkedBlockingDeque
1041 */
1042 private abstract class AbstractItr implements Iterator<E> {
1043 /**
1044 * The next node to return in next()
1045 */
1046 Node<E> next;
1047
1048 /**
1049 * nextItem holds on to item fields because once we claim that
1050 * an element exists in hasNext(), we must return item read
1051 * under lock (in advance()) even if it was in the process of
1052 * being removed when hasNext() was called.
1053 */
1054 E nextItem;
1055
1056 /**
1057 * Node returned by most recent call to next. Needed by remove.
1058 * Reset to null if this element is deleted by a call to remove.
1059 */
1060 private Node<E> lastRet;
1061
1062 abstract Node<E> firstNode();
1063 abstract Node<E> nextNode(Node<E> n);
1064
1065 AbstractItr() {
1066 // set to initial position
1067 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1068 lock.lock();
1069 try {
1070 next = firstNode();
1071 nextItem = (next == null) ? null : next.item;
1072 } finally {
1073 lock.unlock();
1074 }
1075 }
1076
1077 /**
1078 * Returns the successor node of the given non-null, but
1079 * possibly previously deleted, node.
1080 */
1081 private Node<E> succ(Node<E> n) {
1082 // Chains of deleted nodes ending in null or self-links
1083 // are possible if multiple interior nodes are removed.
1084 for (;;) {
1085 Node<E> s = nextNode(n);
1086 if (s == null)
1087 return null;
1088 else if (s.item != null)
1089 return s;
1090 else if (s == n)
1091 return firstNode();
1092 else
1093 n = s;
1094 }
1095 }
1096
1097 /**
1098 * Advances next.
1099 */
1100 void advance() {
1101 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1102 lock.lock();
1103 try {
1104 // assert next != null;
1105 next = succ(next);
1106 nextItem = (next == null) ? null : next.item;
1107 } finally {
1108 lock.unlock();
1109 }
1110 }
1111
1112 public boolean hasNext() {
1113 return next != null;
1114 }
1115
1116 public E next() {
1117 if (next == null)
1118 throw new NoSuchElementException();
1119 lastRet = next;
1120 E x = nextItem;
1121 advance();
1122 return x;
1123 }
1124
1125 public void remove() {
1126 Node<E> n = lastRet;
1127 if (n == null)
1128 throw new IllegalStateException();
1129 lastRet = null;
1130 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1131 lock.lock();
1132 try {
1133 if (n.item != null)
1134 unlink(n);
1135 } finally {
1136 lock.unlock();
1137 }
1138 }
1139 }
1140
1141 /** Forward iterator */
1142 private class Itr extends AbstractItr {
1143 Node<E> firstNode() { return first; }
1144 Node<E> nextNode(Node<E> n) { return n.next; }
1145 }
1146
1147 /** Descending iterator */
1148 private class DescendingItr extends AbstractItr {
1149 Node<E> firstNode() { return last; }
1150 Node<E> nextNode(Node<E> n) { return n.prev; }
1151 }
1152
1153 /**
1154 * Save the state of this deque to a stream (that is, serialize it).
1155 *
1156 * @serialData The capacity (int), followed by elements (each an
1157 * {@code Object}) in the proper order, followed by a null
1158 * @param s the stream
1159 */
1160 private void writeObject(java.io.ObjectOutputStream s)
1161 throws java.io.IOException {
1162 final ReentrantLock lock = this.lock;
1163 lock.lock();
1164 try {
1165 // Write out capacity and any hidden stuff
1166 s.defaultWriteObject();
1167 // Write out all elements in the proper order.
1168 for (Node<E> p = first; p != null; p = p.next)
1169 s.writeObject(p.item);
1170 // Use trailing null as sentinel
1171 s.writeObject(null);
1172 } finally {
1173 lock.unlock();
1174 }
1175 }
1176
1177 /**
1178 * Reconstitute this deque from a stream (that is,
1179 * deserialize it).
1180 * @param s the stream
1181 */
1182 private void readObject(java.io.ObjectInputStream s)
1183 throws java.io.IOException, ClassNotFoundException {
1184 s.defaultReadObject();
1185 count = 0;
1186 first = null;
1187 last = null;
1188 // Read in all elements and place in queue
1189 for (;;) {
1190 @SuppressWarnings("unchecked")
1191 E item = (E)s.readObject();
1192 if (item == null)
1193 break;
1194 add(item);
1195 }
1196 }
1197
1198 }
View Code

 

下面从ArrayBlockingQueue的创建,添加,取出,遍历这几个方面对LinkedBlockingDeque进行分析

1. 创建

下面以LinkedBlockingDeque(int capacity)来进行说明。

public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}

说明:capacity是“链式阻塞队列”的容量。


LinkedBlockingDeque中相关的数据结果定义如下:

// “双向队列”的表头
transient Node<E> first;
// “双向队列”的表尾
transient Node<E> last;
// 节点数量
private transient int count;
// 容量
private final int capacity;
// 互斥锁 , 互斥锁对应的“非空条件notEmpty”, 互斥锁对应的“未满条件notFull”
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

说明:lock是互斥锁,用于控制多线程对LinkedBlockingDeque中元素的互斥访问;而notEmpty和notFull是与lock绑定的条件,它们用于实现对多线程更精确的控制。

双向链表的节点Node的定义如下:

static final class Node<E> {
E item;
// 数据
Node<E> prev; // 前一节点
Node<E> next; // 后一节点

Node(E x) { item
= x; }
}

 

2. 添加

下面以offer(E e)为例,对LinkedBlockingDeque的添加方法进行说明。

public boolean offer(E e) {
return offerLast(e);
}

offer()实际上是调用offerLast()将元素添加到队列的末尾。

offerLast()的源码如下:

public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
// 新建节点
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 将“新节点”添加到双向链表的末尾
return linkLast(node);
}
finally {
// 释放锁
lock.unlock();
}
}

说明:offerLast()的作用,是新建节点并将该节点插入到双向链表的末尾。它在插入节点前,会获取锁;操作完毕,再释放锁。

linkLast()的源码如下:

private boolean linkLast(Node<E> node) {
// 如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败。
if (count >= capacity)
return false;
// 将“node添加到链表末尾”,并设置node为新的尾节点
Node<E> l = last;
node.prev
= l;
last
= node;
if (first == null)
first
= node;
else
l.next
= node;
// 将“节点数量”+1
++count;
// 插入节点之后,唤醒notEmpty上的等待线程。
notEmpty.signal();
return true;
}

说明:linkLast()的作用,是将节点插入到双向队列的末尾;插入节点之后,唤醒notEmpty上的等待线程。


3. 删除

下面以take()为例,对LinkedBlockingDeque的取出方法进行说明。

public E take() throws InterruptedException {
return takeFirst();
}

take()实际上是调用takeFirst()队列的第一个元素。

takeFirst()的源码如下:

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
E x;
// 若“队列为空”,则一直等待。否则,通过unlinkFirst()删除第一个节点。
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
}
finally {
// 释放锁
lock.unlock();
}
}

说明:takeFirst()的作用,是删除双向链表的第一个节点,并返回节点对应的值。它在插入节点前,会获取锁;操作完毕,再释放锁。

unlinkFirst()的源码如下:

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
// 删除并更新“第一个节点”
Node<E> n = f.next;
E item
= f.item;
f.item
= null;
f.next
= f; // help GC
first = n;
if (n == null)
last
= null;
else
n.prev
= null;
// 将“节点数量”-1
--count;
// 删除节点之后,唤醒notFull上的等待线程。
notFull.signal();
return item;
}

说明:unlinkFirst()的作用,是将双向队列的第一个节点删除;删除节点之后,唤醒notFull上的等待线程。

 

4. 遍历

下面对LinkedBlockingDeque的遍历方法进行说明。

public Iterator<E> iterator() {
return new Itr();
}

iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

private class Itr extends AbstractItr {
// “双向队列”的表头
Node<E> firstNode() { return first; }
// 获取“节点n的下一个节点”
Node<E> nextNode(Node<E> n) { return n.next; }
}

Itr继承于AbstractItr,而AbstractItr的定义如下:

 

private abstract class AbstractItr implements Iterator<E> {
// next是下一次调用next()会返回的节点。
Node<E> next;
// nextItem是next()返回节点对应的数据。
E nextItem;
// 上一次next()返回的节点。
private Node<E> lastRet;
// 返回第一个节点
abstract Node<E> firstNode();
// 返回下一个节点
abstract Node<E> nextNode(Node<E> n);

AbstractItr() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
// 获取“LinkedBlockingDeque的互斥锁”
lock.lock();
try {
// 获取“双向队列”的表头
next = firstNode();
// 获取表头对应的数据
nextItem = (next == null) ? null : next.item;
}
finally {
// 释放“LinkedBlockingDeque的互斥锁”
lock.unlock();
}
}

// 获取n的后继节点
private Node<E> succ(Node<E> n) {
// Chains of deleted nodes ending in null or self-links
// are possible if multiple interior nodes are removed.
for (;;) {
Node
<E> s = nextNode(n);
if (s == null)
return null;
else if (s.item != null)
return s;
else if (s == n)
return firstNode();
else
n
= s;
}
}

// 更新next和nextItem。
void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// assert next != null;
next = succ(next);
nextItem
= (next == null) ? null : next.item;
}
finally {
lock.unlock();
}
}

// 返回“下一个节点是否为null”
public boolean hasNext() {
return next != null;
}

// 返回下一个节点
public E next() {
if (next == null)
throw new NoSuchElementException();
lastRet
= next;
E x
= nextItem;
advance();
return x;
}

// 删除下一个节点
public void remove() {
Node
<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet
= null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
if (n.item != null)
unlink(n);
}
finally {
lock.unlock();
}
}
}

 

LinkedBlockingDeque示例

 1 import java.util.*;
2 import java.util.concurrent.*;
3
4 /*
5 * LinkedBlockingDeque是“线程安全”的队列,而LinkedList是非线程安全的。
6 *
7 * 下面是“多个线程同时操作并且遍历queue”的示例
8 * (01) 当queue是LinkedBlockingDeque对象时,程序能正常运行。
9 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10 *
11 * @author skywang
12 */
13 public class LinkedBlockingDequeDemo1 {
14
15 // TODO: queue是LinkedList对象时,程序会出错。
16 //private static Queue<String> queue = new LinkedList<String>();
17 private static Queue<String> queue = new LinkedBlockingDeque<String>();
18 public static void main(String[] args) {
19
20 // 同时启动两个线程对queue进行操作!
21 new MyThread("ta").start();
22 new MyThread("tb").start();
23 }
24
25 private static void printAll() {
26 String value;
27 Iterator iter = queue.iterator();
28 while(iter.hasNext()) {
29 value = (String)iter.next();
30 System.out.print(value+", ");
31 }
32 System.out.println();
33 }
34
35 private static class MyThread extends Thread {
36 MyThread(String name) {
37 super(name);
38 }
39 @Override
40 public void run() {
41 int i = 0;
42 while (i++ < 6) {
43 // “线程名” + "-" + "序号"
44 String val = Thread.currentThread().getName()+i;
45 queue.add(val);
46 // 通过“Iterator”遍历queue。
47 printAll();
48 }
49 }
50 }
51 }

(某一次)运行结果

ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, tb2, tb2, ta2,
ta2,
ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3,
ta3, ta1,
tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4,
tb4, ta1, ta4, tb1, tb5,
tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5,
ta4, ta1, tb5, tb1, ta5, tb2, tb6,
ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6,
tb5, ta5, tb6, ta6,

结果说明示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingDeque进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingDeque中;接着,遍历并输出LinkedBlockingDeque中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是LinkedBlockingDeque对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。

 


更多内容

1. Java多线程系列--“JUC集合”01之 框架

2. Java多线程系列目录(共xx篇)