《java高并发程序设计》读书笔记(3)

时间:2021-10-04 23:51:30

多线程的团队协作:同步控制

  • 重入锁:这种锁可以反复进入,可以响应中断(lockInterruptibly()方法),可以申请等待时限(tryLock(),tryLock(long timeout, TimeUnit unit)),可以产生公平锁防止饥饿(new ReentrantLock(true))。案例代码:
import java.util.concurrent.locks.ReentrantLock;

/**
* ClassName:ReenterLock
* Function:
* date:2017/10/12
*
* @since jdk 1.8
*/

public class ReenterLock implements Runnable {
ReentrantLock reentrantLock=new ReentrantLock();
public static int i=0;
@Override
public void run() {
for(int j=0;j<100;j++){
reentrantLock.lock();
try{
i++;
}finally {
reentrantLock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
ReenterLock reenterLock=new ReenterLock();
Thread thread1=new Thread(reenterLock);
Thread thread2=new Thread(reenterLock);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(i);
}
}


  • Condition:重入锁的好搭档,与wati(),notify()的方法大致相同,不同的是他与重入锁关联,重入锁可以通过lock.newCondition()获得与之绑定的condition对象。这样就可以让线程在合适的时候等待,在某个特定的时刻得到通知,继续执行。该接口提供的方法为:

void await() throws InterruptedException;//线程持有相关的重入锁,调用该方法后释放锁
void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();//方法调用后系统会从当前Condition对象的等待队列中唤醒一个线程,一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取就可以继续执行了,因此在signal()方法调用后,一般需要释放相关的锁。谦让给被唤醒的线程让他可以继续执行。案例代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* ClassName:ReenterLockCondition
* Function:
* date:2017/10/12
* @since jdk 1.8
*/

public class ReenterLockCondition implements Runnable {
public static ReentrantLock lock=new ReentrantLock();
public static Condition condition=lock.newCondition();
@Override
public void run() {
try{
lock.lock();
condition.await();
System.out.println("线程正在执行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {

}
}
public static void main(String[] args) throws InterruptedException {
ReenterLockCondition reenterLockCondition=new ReenterLockCondition();
Thread t1=new Thread(reenterLockCondition);
t1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
}
}

void signalAll();//唤醒所有的线程

  • 信号量(Semaphore):无论是synchronized还是重入锁,一次都只允许一个线程访问一个资源,而信号量可以允许多个线程同时访问一个资源,信号量主要提供了以下构造函数。
    public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {//第二个参数为是否公平
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
主要的逻辑方法有:
    //或得一个准入许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//获得一个许可,但不响应中断
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//尝试获得一个许可,成功则返回ture,失败返回false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//释放许可
public void release() {
sync.releaseShared(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
案例代码:
public class SemapDemo implements Runnable {
Semaphore semaphore=new Semaphore(5);
@Override
public void run() {
try{
semaphore.acquire();//获得许可
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId()+"done!");
semaphore.release();//释放许可
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args){
ExecutorService executorService= Executors.newFixedThreadPool(20);
final SemapDemo semapDemo=new SemapDemo();
for(int i=0;i<20;i++){
executorService.submit(semapDemo);
}
}
}
  • 读写锁:读写分离锁,可以有效的帮助减少锁竞争,以提升系统性能。读写锁的访问约束情况
非阻塞
阻塞

案例代码如下:

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* ClassName:ReadWriteLockDemo
* Function:
* date:2017/10/12
* @since jdk 1.8
*/

public class ReadWriteLockDemo {
public static ReentrantLock reentrantLock=new ReentrantLock();
public static ReentrantReadWriteLock reentrantReadWriteLock=new ReentrantReadWriteLock();
public static Lock readLock=reentrantReadWriteLock.readLock();
public static Lock writelock=reentrantReadWriteLock.writeLock();
private int value;
public Object handleRead(Lock lock)throws InterruptedException{
try{
lock.lock();
Thread.sleep(1000);//模拟读操作
return value;
}finally {
lock.unlock();
}
}
public void handleWrite(Lock lock,int index)throws InterruptedException{
try{
lock.lock();
Thread.sleep(1000);//模拟写操作
value=index;
}finally {
lock.unlock();
}
}
public static void main(String[] args){
final ReadWriteLockDemo readWriteLockDemo=new ReadWriteLockDemo();
Runnable readRunnable=new Runnable() {
@Override
public void run() {
try{
readWriteLockDemo.handleRead(readLock);
readWriteLockDemo.handleRead(reentrantLock);
}catch (Exception e){
e.printStackTrace();
}
}
};
Runnable writeRunnale=()->{
try{
readWriteLockDemo.handleWrite(writelock,new Random().nextInt());
readWriteLockDemo.handleWrite(reentrantLock,new Random().nextInt());
}catch (Exception e){

}
};
for(int i=0;i<18;i++){
new Thread(readRunnable).start();
}
for (int i=18;i<20;i++){
new Thread(writeRunnale).start();
}
}
}


  • 倒计时器:CountDownLatch

这是一个非常实用的多线程控制工具类,通常用来控制线程等待,它可以让一个线程等待直到倒计时结束再开始执行。其构造函数为

public CountDownLatch(int count) {//计数个数
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
案例代码:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* ClassName:CountDownLatchDemo
* Function:
* date:2017/10/12
* @since jdk 1.8
*/

public class CountDownLatchDemo implements Runnable {
static final CountDownLatch countDownLatch=new CountDownLatch(10);
static final CountDownLatchDemo countDownLatchDemo=new CountDownLatchDemo();
@Override
public void run() {
try{
Thread.sleep(new Random().nextInt(10)*1000);

> countDownLatch.countDown();//代表自己的任务已经完成,倒计时可以减一了

System.out.println(Thread.currentThread().getId()+"执行完毕");
}catch (Exception e){
e.printStackTrace();
}

}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService= Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
executorService.submit(countDownLatchDemo);
}

> countDownLatch.await();

System.out.println("线程都已经执行完毕");
executorService.shutdown();
}
}


  • 循环栅栏:CyclicBarrier

和countdownLatch非常类似,它可以实现线程间的计数等待,比conuntDownLatch更加复杂且功能强大。其构造方法为:

//计数个数,一次计数完成后要做的动作
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
案例代码:
import java.util.Random;
import java.util.concurrent.CyclicBarrier;

/**
* ClassName:CyclicBarrierDemo
* Function:
* date:2017/10/12
* @since jdk 1.8
*/

public class CyclicBarrierDemo {
public static class Soldier implements Runnable{
private String soldier;
private final CyclicBarrier cyclicBarrier;

public Soldier(String soldier, CyclicBarrier cyclicBarrier) {
this.soldier = soldier;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try{
cyclicBarrier.await();
doWork();
cyclicBarrier.await();
}catch (Exception e){
e.printStackTrace();
}
}

private void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier+"任务完成");
}
public static class BarrierRun implements Runnable{
boolean flag;
int N;

public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}

@Override
public void run() {
if(flag){
System.out.println("司令:[士兵"+N+"个,任务完成!]");
}else{
System.out.println("司令:[士兵"+N+"个,集合完毕!]");
}

}
}

}
public static void main(String args[]){
final int N=10;
Thread[] allSoldier=new Thread[N];
boolean flag=false;
CyclicBarrier cyclicBarrier=new CyclicBarrier(N,new Soldier.BarrierRun(flag,N));
System.out.println("集合队伍!");
for(int i=0;i<N;++i){
System.out.println("士兵"+i+"报道!");
allSoldier[i]=new Thread(new Soldier("士兵"+i,cyclicBarrier));
allSoldier[i].start();
}
}
}


  • 线程阻塞工具类:LockSupport

是一个非常方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞,和Thread.supend()相比,它弥补了由于resume在前发生,导致线程无法继续执行的情况。与await相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。案例代码如下:

package suspend_resume;

import java.util.concurrent.locks.LockSupport;

/**
* ClassName:LockSupportTest
* Function:LockSupportTest
* date:2017/10/12
* @since jdk 1.8
*/

public class LockSupportTest {
public static Object object=new Object();
static changeObjectThread t1=new changeObjectThread("t1");
static changeObjectThread t2=new changeObjectThread("t2");
public static class changeObjectThread extends Thread{
changeObjectThread(String name){
super.setName(name);
}
@Override
public void run() {
synchronized (object){
System.out.println(getName()+"线程已经被挂起");
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}

上述代码自始至终都可以正常的结束,不会因为park方法而导致线程永久性的挂起。这是因为LockSupport使用类似信号量的机制,它为每个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可,如果许可不可用,就会阻塞,而unpark()则使得一个许可变为可用,但是和信号量不同的是,许可不可能累加,你不可能拥有超过一个的许可,它永远是一个。

线程复用:线程池


  • 线程池:为了避免系统频繁的创建和销毁线程,可以让创建的线程复用,于是出现了类似数据库中连接池的概念,线程池中总有几个活跃的线程存在,当你需要使用线程时,可以从池中随便拿一个空闲线程,并不急着关闭线程,而是将它退回线程池以方便其他人使用。
  • jdk对线程池的支持

在jdk中,为了能够更好的控制多线程,jdk提供了一套executor框架帮助开发人员有效地进行线程池控制。ThreadPoolExecutor表示一个线程池,Executors则扮演者线程池工厂的角色,该工厂提供了一下各种类型的线程池。

    /**
该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终保持不变,当有一个新任务提交时,线程池中若有空闲线程,则立即执行,若没有,则新的任务会被暂时存在一个任务队列中待有线程空闲时就处理在任务队列中的任务。
*/

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
该方法返回一个只有一个线程的线程池,若多余一个任务被提交到该线程池中,任务会被保存到队列中,待有空闲线程的时候再处理
*/

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
该方法返回一个可根据实际情况调整的线程池数量的线程池,线程池中线程的数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程,若所有线程都在工作,又有新的任务提交,则会创建新的线程处理任务,所有线程在当前的任务执行完毕后,将返回线程池进行复用。
*/

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
该方法返回一个ExecutorService实例对象,线程池数量为1,ExecutorService接口在ExecutorService接口之上拓展了在给定时间执行某任务的功能,如在某个固定延时之后执行,或者周期性执行某个任务,
*/

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
也返回一个ExecutorService实例对象,线程池数量可以指定。
*/

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
案例代码:
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* ClassName:ThreadPoolDemo
* Function:
* date:2017/10/13
* @since jdk 1.8
*/

public class ThreadPoolDemo {
@Test
public void test(){
ExecutorService executorService=Executors.newFixedThreadPool(5);
for (int i = 0; i <10; i++) {
System.out.println("执行任务"+i);
executorService.submit(()->{//java8新语法提交一个线程任务
System.out.println(System.currentTimeMillis()+"Thread ID"+Thread.currentThread().getId());
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
});
}
}
}
计划任务:ScheduledExecutorService接口中的方法为:
//任务执行的时长
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,

//每隔多久执行一次 TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);


  • 核心线程池的内部实现

所有线程的核心都是ThreadPoolExecutor类的封装,下面是该类的构造方法

public ThreadPoolExecutor(int corePoolSize,//线程池中的线程数量
int maximumPoolSize,//指定线程池中的最大线程数量
long keepAliveTime,//当前线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。
TimeUnit unit,//keepAliveTime的单位
BlockingQueue<Runnable> workQueue//任务队列,被提交但尚未被执行的任务
) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), //线程工厂,用于创建线程
defaultHandler//拒绝策略,当任务太多来不及处理时如何拒绝
);
}

《java高并发程序设计》读书笔记(3)

  • 拒绝策略

    JDK内置的拒绝策略如下:

    -AbortPolicy策略:任务超负载直接抛出异常,阻止系统正常工作
    -CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,这样不会丢弃任务,但是任务提交的性能可能会急剧下降。
    -DiscardOledestPolicy策略:丢弃最老的请求,也就是即将被执行的任务,并尝试再次提交当前任务。
    -DiscardPolicy策略:默默地丢弃无法处理的任务,不予任何处理,如果允许任务丢失,这应该是最好的一种方案。

  • 自定义线程创建:ThreadFactory

为什么要自定义线程池?

我们可以按照需要定义自己想要跟踪线程池究竟在何时创建了多少线程,也可以自定义线程的名称,组,以及优先级等信息,甚至可以任性的将所有的线程设置为守护线程,总之,使用自定义线程池可以让我们更加*地设置池子中所有线程的状态。
import java.util.concurrent.*;

/**
* ClassName:MyTheadPool
* Function:
* date:2017/10/13
* @since jdk 1.8
*/

public class MyTheadPool {
public void testMyThreadPool() throws InterruptedException {
ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread();
t.setDaemon(true);
System.out.println("create"+t);
return t;
}
});
for (int i = 0; i < 5; i++) {
executorService.submit(()->{
System.out.println("任务开始执行");
});
}
Thread.sleep(2000);
}
}


  • 拓展线程池

虽然JDK已经帮我们实现了一个稳定的高性能线程池,但如果我们要对这个线程池做一些拓展,比如想监控每个任务的开始和结束时间,或者其他一些自定义的增强功能,我们可以利用ThreadPoolExecutor提供的beforeExecute(),afterExecute(),terminated()三个接口对线程池进行控制。

import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* ClassName:ExThreadPool
* Function:
* date:2017/10/13
* @since jdk 1.8
*/

public class ExThreadPool {
int index=0;
@Test
public void test() throws InterruptedException {
ExecutorService es=new ThreadPoolExecutor(5,5,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行");
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成");
}

@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for (int i = 0; i <5 ; i++) {
es.execute(()->{
System.out.println("TASK-GEYM"+index);
index++;
});
Thread.sleep(10);
}
es.shutdown();
}
}


  • 优化线程池线程数量

估算线程池大小的经验公式
Ncpu=cpu的数量
Ucpu=目标cpu的使用率,0<=Ucpu<=1
W/C=等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的线程池的大小等于:

Nthreads= Ncpu*Ucpu*(1+w/c)

java中可以通过Runtime.getRuntime().availableProcessors()获取cpu的数量。


  • 在线程池中寻找堆栈
    使用线程池时使用execute()方法可以获取到异常堆栈,用submit()则获取不到。
  • Fork/Join框架

“分而治之”顾名思义就是将大任务分成小任务处理,处理完毕之后合并处理结果为最终结果。下面演示简单案例:


import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
* ClassName:CountTask
* Function:
* date:2017/10/13

* @since jdk 1.8
*/

public class CountTask extends RecursiveTask<Long>{
private static final int THRESHOLD=10000;
private long start;
private long end;

public CountTask(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long sum=0;
boolean canCompute=(end-start)<THRESHOLD;
if(canCompute){
for (long i=start;i<=end;i++){
sum+=i;
}
}else {
//分成100个小任务
long step=(start+end)/100;
ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
long pos=start;
for (int i = 0; i <100 ; i++) {
long lastOne=pos+step;
if (lastOne>end) lastOne=end;
CountTask subTask=new CountTask(pos,lastOne);
pos+=step+1;
subTasks.add(subTask);
subTask.fork();
}
for (CountTask t:subTasks){
sum+=t.join();
}
}

return sum;
}

public static void main(String[] args){
ForkJoinPool forkJoinPool=new ForkJoinPool();
CountTask task=new CountTask(0,100L);
ForkJoinTask<Long> result=forkJoinPool.submit(task);
try{
long res=result.get();
System.out.println("sum="+res);
}catch (Exception e){
e.printStackTrace();
}
}
}

jdk的并发容器


  • 并发集合

ConcurrentHashMap:这是一个高效并发HashMap,是一个线程安全的HashMap。

CopyOnWriteArrayList:这是一个List,从名字上看就是一个和ArrayList一族的,在读多写少的场合这个List性能特别好,远远高于vector。
ConcurrentLinkedQueue:高效并发队列,使用链表实现,可以看做一个线程安全的LinkedList。

BlockingQueue:这是一个接口,JDK内部通过链表,数组等方式实现了这个接口,表示阻塞队列,非常适合用于作为参数共享的通道。

ConcurrentSkipListMap:跳表的实现,这是一个Map,使用跳表的数据结构进行快速查找。

  • 线程安全的HashMap

    ①:利用Collections.synchronizedMap(new HashMap);可以获得一个线程安全的HashMap。但是它在多线程环境中的表现并不是太好,读取或者写入都需要获取mutex的锁,这会导致所有对map的操作全部进入等待状态,直到锁可用。

    ②:一个更加专业的并发HashMap是ConcurrentHashMap,它专门为并发进行性能优化,更加适合多线程的场合。

  • 有关list的线程安全
    JDK中线程安全的list可以利用Collection.synchronizedList(new LinkedList());得到,也可以利用vector,vector也是线程安全的list;
  • 高效读写队列:ConcurrentLinkedQueue
    队列也是常用的数据结构之一,JDK提供了一个ConcurrentLinkedQueue来实现高并发的队列。这个队列用链表作为数据结构,它的性能算是高并发环境中性能最好的队列。它使用CAS操作保证线程安全,并处理一些可能存在的不一致的问题,使得该队列的性能得到飞速提升。具体细节代码详见《java高并发程序设计》P128.
  • 高效读取:CopyOnWriteArrayList
    实质:在写入操作时进行一次自我复制,简言之,当这个list需要修改时,并不修改原来的内容,二是修改原内容的副本,写完之后,再讲修改的副本替换原来的数据,这样就可以保证写操作不会影响读操作。
  • 数据共享通道:BlockingQueue
    这是一个接口,并非具体实现。
  •     ArrayBlockingQueue (java.util.concurrent) 
    BlockingDeque (java.util.concurrent)
    DelayQueue (java.util.concurrent)
    DelayedWorkQueue in ScheduledThreadPoolExecutor(java.util.concurrent)
    LinkedBlockingDeque (java.util.concurrent)
    LinkedBlockingQueue (java.util.concurrent)
    LinkedTransferQueue (java.util.concurrent)
    PriorityBlockingQueue (java.util.concurrent)
    SynchronousQueue (java.util.concurrent)
    TransferQueue (java.util.concurrent)
    1. ArrayBlockingQueue:基于数组实现,适合做有界队列,队列中可容纳的最大元素队列创建时指定。
    2.LinkedBlockingQueue:基于链表实现,适合做*队列,或者边界值非常大的队列,因为其内部元素可以动态增加,不会因为初值容量大而吃掉大半内存。

    队列中的一些方法:

    offer()//将元素压入队列末尾,如果当前队列已经满了,立即返回false
    put()//将元素压入队列末尾,如果队列满了会一直等待,直到队列中空闲位置。
    poll()//从队列头部获取一个元素,如果队列为空,该方法直接返回null
    take()//从队列头部获取一个元素,如果队列为空,该方法会等待,直到队列有数据为止。