1.什么是Latch
在本章中,我们将介绍Latch(门阀) 设计模式,该模式指定了一个屏障,只有所有的条件都达到满足的时候,门阀才能打开。
2.CountDownLatch程序实现
2.1 无限等待的Latch
在代码中, 首先定义了一个无限等待的抽象类Latch, 在Latch抽象类中定义了 await方法、countDown方法以及getUnarrived方法, 这些方法的用途在代码注释中都有详细介绍,当然在Latch中的limit属性至关重要,当limit降低到0时门阀将会被打开
public abstract class Latch {// 用于控制多少个线程完成任务时才能打开阀门protected int limit;// 通过构造函数传入limitpublic Latch(int limit) {this.limit = limit;}// 该方法会使得当前线程一直等待,直到所有的线程都完成工作被阻塞的线程是允许被中断的public abstract void await() throws InterruptedException;// 当任务线程完成工作之后调用该方法使得计数器减一public abstract void countDown();// 获取当前还有多少个线程没有完成任务public abstract int getUnarrived();}
子任务数量达到limit的时候,门阀才能打开,await() 方法用于等待所有的子任务完成,如果到达数量未达到limit的时候,将会无限等待下去,当子任务完成的时候调用countDown() 方法使计数器减少一个,表明我已经完成任务了,getUnarrived() 方法主要用于查询当前有多少个子任务还未结束。
1.无限等待CountDownLatch实现
public class CountDownLatch extends Latch{public CountDownLatch(int limit) {super(limit);}@Overridepublic void await() throws InterruptedException {synchronized (this) {// 当limit > 0时,当前线程进入堵塞状态while(limit > 0 ) {this.wait();}}}@Overridepublic void countDown() {synchronized (this) {if ( limit <= 0 )throw new IllegalStateException("all of task already arrived");// 使limit减一,并且通知阻塞线程limit--;this.notifyAll();}}@Overridepublic int getUnarrived() {// 返回有多少线程还未完成任务return limit;}}
在上述代码中, await() 方法不断判断limit的数量, 大于0时门阀将不能打开, 需要持续等待直到limit数量为0为止; countDown() 方法调用之后会导致limit—操作, 并且通知wait中的线程再次判断limit的值是否等于0, 当limit被减少到了0以下, 则抛出状态非法的异常; getUnarrived() 获取当前还有多少个子任务未完成, 这个返回值并不一定就是准确的, 在多线程的情况下, 某个线程在获得Unarrived任务数量并且返回之后, 有可能limit又被减少, 因此getUnarrived() 是一个评估值。
2.程序测试齐心协力打开门阀
/*线程*/public class ProgrammerTravel extends Thread{// 门阀private final Latch latch;// 程序private final String programmer;// 交通工具private final String transportation;// 构造函数public ProgrammerTravel(Latch latch, String programmer, String transportation) {this.latch = latch;this.programmer = programmer;this.transportation = transportation;}@Overridepublic void run() {try {// 花费路上的时间TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}latch.countDown();}}
TimeUnit.SECONDS.sleep(Thread Local Random.current() .next Int(10) ) 子句在run方法中模拟每个人到达目的地所花费的时间,当他们分别到达目的地的时候,需要执行latch.countDown(),使计数器减少一个以标明自己已到达, 代码如下:
public class LatchClient {public static void main(String[] args) throws InterruptedException {// 定义Latch, limit 为 4Latch latch = new CountDownLatch(4);new ProgrammerTravel(latch, "Alex", "Bus");new ProgrammerTravel(latch, "Gavin","Walking");latch.await();System.out.println("all arrived");}}
2.2 有超时设置的Latch
1.可超时的等待
在Latch中增加可超时的抽象方法await(TimeUnit unit, long time) 的示例代码如下:
public abstract void await(TimeUnit unit, long time) throws InterruptedException;
其中TimeUnit代表wait的时间单位,而time则是指定数量的时间单位,在该方法中又增加了WaitTimeoutException用于通知当前的等待已经超时,与之相关的代码如所示。
public class WaitTImeoutException extends Exception{public WaitTImeoutException(String message) {super(message);}}
超时功能实现
@Overridepublic void await(TimeUnit unit, long time) throws InterruptedException, WaitTImeoutException {if ( time <= 0 )throw new IllegalArgumentException("the time is invalid");long remainingNanos = unit.toNanos(time);// 等待任务将在endNanos纳秒后超时final long endNaos = System.nanoTime() + remainingNanos;synchronized (this) {while( limit > 0 ) {// 如果超市则抛出WaitTimeoutException异常if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0 )throw new WaitTImeoutException("the wait time over specify time.");// 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanosthis.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));remainingNanos = endNaos - System.nanoTime();}}}
2.收到超时通知
public class LatchClient2 {public static void main(String[] args) {Latch latch = new CountDownLatch(2);new ProgrammerTravel(latch, "Alex", "Bus").start();new ProgrammerTravel(latch, "Gavin","Walking").start();try {latch.await(TimeUnit.SECONDS, 5);System.out.println("all arrived");} catch (InterruptedException | WaitTImeoutException e) {e.printStackTrace();}}}
3.扩展功能
Latch的作用是为了等待所有子任务完成后再执行其他任务, 因此可以对Latch进行再次的扩展,增加回调接口用于运行所有子任务完成后的其他任务,增加了回调功能的CountDownLatch代码如下:
public class CountDownLatch extends Latch{public CountDownLatch(int limit) {super(limit);}private Runnable runnable;public CountDownLatch(int limit, Runnable runnable) {this(limit);this.runnable = runnable;}@Overridepublic void await() throws InterruptedException {synchronized (this) {// 当limit > 0时,当前线程进入堵塞状态while(limit > 0 ) {this.wait();}}if ( null != null ) {runnable.run();}}@Overridepublic void await(TimeUnit unit, long time) throws InterruptedException, WaitTImeoutException {if ( time <= 0 )throw new IllegalArgumentException("the time is invalid");long remainingNanos = unit.toNanos(time);// 等待任务将在endNanos纳秒后超时final long endNaos = System.nanoTime() + remainingNanos;synchronized (this) {while( limit > 0 ) {// 如果超市则抛出WaitTimeoutException异常if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0 )throw new WaitTImeoutException("the wait time over specify time.");// 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanosthis.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));remainingNanos = endNaos - System.nanoTime();}}if ( null != runnable ) {runnable.run();}}@Overridepublic void countDown() {synchronized (this) {if ( limit <= 0 )throw new IllegalStateException("all of task already arrived");// 使limit减一,并且通知阻塞线程limit--;this.notifyAll();}}@Overridepublic int getUnarrived() {// 返回有多少线程还未完成任务return limit;}}
