Java并发编程学习——CountDownLatch、CyclicBarrier和Semaphore

时间:2022-11-05 20:49:44

  学习并发编程,自然要学习JDK提供的并发工具类,了解他们后,我们就可以更好的去控制程序的并发,为我们的开发也是有了很大帮助呢,本文主要参考了方腾飞老师的《Java并发编程的艺术》。

  在JDK的并发包中已经提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类中提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段。

1、等待多线程完成的CountDownLatch

  CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。CountDownLatch允许一个或多个线程等待其他线程完成操作。

  假设有一个需求:需要解析一个Excel中的多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法就是使用join()方法,如下所示:

package chapter8;

/**
 * 使用join()方法,使得主线程在所有子线程执行完毕后结束
 * @author LXH
 *
 */
public class JoinCountDownLatch {
	public static void main(String[] args) throws InterruptedException {
		Thread parser1 = new Thread(new Runnable() {
			@Override
			public void run() {
				
			}
		});
		Thread parser2 = new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("parser2 finish");
			}
		});
		
		parser1.start();
		parser2.start();
		parser1.join();
		parser2.join();
		System.out.println("all parser finish");
	}
}

  join用于让当前执行线程等待join线程执行结束。其实现原理就是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里面实现的,在JDK中看不到。

  在JDK 1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,并且比join的功能更多。

/**
 * 仅仅提供了一个构造方法, 参数count为计数值
 * @param count
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

  CountDownLatch中较为重要的三个方法,当我们调用countDown方法是,count值就会键1。由于countDown方法可以用在任何地方,所以这里的count,可以是count个线程,也可以是1个线程中的count个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程中。

/**
 * 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
 * @throws InterruptedException
 */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
/**
 * 与await()方法类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
 * @param timeout
 * @param unit
 * @return
 * @throws InterruptedException
 */
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
 * count值减一
 */
public void countDown() {
    sync.releaseShared(1);
}

  用下面一个例子来简单介绍一下CountDownLatch的使用方法

package chapter8;

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch的简单用法
 * @author LXH
 *
 */
public class CountDownLatchTest {
	private static CountDownLatch c = new CountDownLatch(2);
	
	public static void main(String[] args) throws InterruptedException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(1);
				c.countDown();
				System.out.println(2);
				c.countDown();
			}
		}).start();
		
		c.await(); // 只有当count值为0后才继续执行
		System.out.println(3);
	}
}
  注意:计数器必须大于等于0,只是等于0的时候,计数器就是0,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值(只有在构造它的时候的一次赋值)。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

2、同步屏障CyclicBarrier

  CyclicBarrier意思是可循环使用(Cyclic)的屏障(Barrier),要做的事情是,让一组线程到达一个屏障(同步点)时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

 2.1 CyclicBarrier简介

  CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

/**
 * 参数barrierAction为当这些线程都达到barrier状态时会执行的内容。方便执行更复杂的业务场景。
 * @param parties
 * @param barrierAction
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

/**
 * 默认的构造方法,参数表示屏障拦截的线程数量
 * @param parties
 */
public CyclicBarrier(int parties) {
    this(parties, null);
}

  当线程到达屏障时,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。实例代码如下所示。

package chapter8;

import java.util.concurrent.CyclicBarrier;

/**
 * 因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出:1 2 或 2 1.
 * @author LXH
 *
 */
public class CyclicBarrierTest {
	private static CyclicBarrier c = new CyclicBarrier(2);

	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();;
		
		try {
			c.await();
		} catch (Exception e) {
		}
		System.out.println(2);
	}
}

  若将new CyclicBarrier(2)修改为new CyclicBarrier(3),则主线程和子线程将拥有等待,不会执行输出语句。因为没有第三线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

  使用CyclicBarrier的双参构造方法,用于在所有线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码如下所示。

package chapter8;

import java.util.concurrent.CyclicBarrier;

/**
 * 使用CyclicBarrier的双参构造方法,指定优先执行的方法。
 * @author LXH
 *
 */
public class CyclicBarrierTest02 {
	private static CyclicBarrier c = new CyclicBarrier(2, new A());
	
	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();;
		
		try {
			c.await();
		} catch (Exception e) {
		}
		System.out.println(2);
	}
	
	static class A implements Runnable {
		@Override
		public void run() {
			System.out.println(3);
		}
	}
}

2.2 CyclicBarrier的应用场景

  CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

  eg:用一个Excel保存了用户所有的银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日平均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后再用barrierAction用这些线程的计算结果,计算出真个Excel的日均银行流水。示例代码如下。

package chapter8;

import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class BankWaterService implements Runnable {
	/**
	 * 创建屏障,当有4个线程到达屏障,并处理完成之后,优先执行当前类的run方法
	 */
	private CyclicBarrier c = new CyclicBarrier(4, this);
	/**
	 * 假设只有4个sheet,所以只启动四个线程
	 */
	private Executor executor = Executors.newFixedThreadPool(4);
	/**
	 * 保存每个sheet计算出的结果
	 */
	private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
	
	private void count() {
		for(int i = 0; i < 4; i++) {
			executor.execute(new Runnable() {
				@Override
				public void run() {
					// 计算后当前sheet的值,计算代码省略
					sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
					// 通知屏障,我已到达,阻塞。
					try {
						c.await();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}
	}
	
	@Override
	public void run() {
		int result = 0;
		for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
			result += sheet.getValue();
		}
		sheetBankWaterCount.put("result", result);
		System.out.println(result);
	} 
	
	public static void main(String[] args) {
		BankWaterService bankWaterService = new BankWaterService();
		bankWaterService.count();
	}
}

 2.3 CyclicBarrier与CountDownLatch的区别

  CountDownLatch的计数器只能使用一次(不能中间改变count的值),而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。eg:计算发生错误,重置计数器,并让线程重新执行一遍。下面是CyclicBarrier重复利用的示例代码。
public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
 
        try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("CyclicBarrier重用");
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
 
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
        }
    }
}

  CyclicBarrier的功能更加强大,提供了其他有用的方法,如getNumberWaiting方法可以获得CyclicBarrier阻塞线程的数量,isBroken()方法用来了解阻塞线程是否被中断。

package chapter8;

import java.util.concurrent.CyclicBarrier;

/**
 * @author LXH
 *
 */
public class CyclicBarrierTest3 {
	private static CyclicBarrier c = new CyclicBarrier(2);
	
	public static void main(String[] args) {
		Thread thread = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();
				} catch (Exception e) {
				}
			}
		});
		thread.start();
		/*
			中断这个线程。 
			除非当前线程中断自身,这是始终允许的,所以调用此线程的checkAccess方法,这可能会导致抛出SecurityException 。 
			如果该线程阻塞的调用wait() , wait(long) ,或wait(long, int)的方法Object类,或者在join() , join(long) , join(long, int) , sleep(long) ,或sleep(long, int) ,这个类的方法,那么它的中断状态将被清除,并且将收到一个InterruptedException 。 
		 */
		thread.interrupt();
		
		try {
			c.await();
		} catch (Exception e) {
			System.out.println(c.isBroken());        // true
		}
	}
}

3 控制并发线程数的Semaphore

  Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

 3.1 Semaphore简介

  Semaphore提供了两个构造方法,用于设置允许最多访问特定资源的线程数量以及是否公平设置。
/**
 * 接收一个整形数字,表示可用的许可证的数量,默认为非公平设置
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

/**
 * 接收一个整形数字,表示可用的许可证的数量。接收一个是否公平设置
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore还提供了一些其他方法,具体如下
/**
 * 获取一个Semaphore信号量
 * @throws InterruptedException
 */
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

/**
 * 使用方法尝试获取许可证
 * 获得许可证,如果有可用并立即返回,值为true ,将可用许可证数量减少一个。 
 * 如果没有许可证可用,那么该方法将立即返回值为false 。 
 * @return
 */
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

/**
 * 归还一个信号量
 */
public void release() {
    sync.releaseShared(1);
}

/**
 *是否有线程正在等待获取信号量
 *
 * @return 
 */
public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

/**
 *返回次信号量中当前可以使用的信号量数
 *
 * @return 
 */
public int availablePermits() {
    return sync.getPermits();
}

/**
 * 返回正在等待获取信号量的线程数
 * @return 
 */
public final int getQueueLength() {
    return sync.getQueueLength();
}

/**
 *返回所有等待获取信号量的线程集合,protected方法
 *
 * @return the collection of threads
 */
protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

  Semaphore的用法很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以使用tryAcquire()方法尝试获取许可证。

 3.2 应用场景

  Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。

  eg:假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这是我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这是,就可以使用Semaphore来做流量控制。

package chapter8;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 虽然有30个线程在执行,但是只允许10个线程并发执行
 * @author LXH
 *
 */
public class SemaphoreTest {
	private static final int THREAD_COUNT = 30;
	
	private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
	
	private static Semaphore s = new Semaphore(10);
	
	public static void main(String[] args) {
		for(int i = 0; i < THREAD_COUNT; i++) {
			threadPool.execute(new Runnable() {
				@Override
				public void run() {
					try {
						s.acquire();
						System.out.println("获取成功,save data");
						s.release();
					} catch (Exception e) {
					}
				}
			});
		}
		threadPool.shutdown();
	}
}

4 线程间交换数据的Exchanger

  Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也只需exchange()方法,当这两个线程都达到同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

 4.1 Exchanger的应用场景

  用于遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换两个人的数据,并使用交叉规则得到两个交配结果。

  用于校对工作:对某一项工作,两个人同时进行,最后校对两人的数据是否相等。

package chapter8;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
	private static final Exchanger<String> exgr = new Exchanger<>();
	private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
	
	public static void main(String[] args) {
		threadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String A = "银行流水A";	// A录入银行流水数据
					exgr.exchange(A);
				} catch (Exception e) {
				}
			}
		});
		threadPool.execute(new Runnable() {
			@Override
			public void run() {
				try {
					String B = "银行流水A";	// B录入银行流水数据
					String A = exgr.exchange("B");
					System.out.println("A和B数据是否一致:" + A.equals(B) + ", A录入的是: " + A + ",B录入的是:" + B);
				} catch (Exception e) {
				}
			}
		});
		
		threadPool.shutdown();
	}
}

运行结果:

Java并发编程学习——CountDownLatch、CyclicBarrier和Semaphore

  如果两个线程有一个没有执行exchange()方法,则会造成一致等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时常。


  好啦,到这里,这几个小工具的使用就介绍完了,希望对大家有些帮助吧。有什么见解或问题,给小编留言哦。