Latch翻译为中文有门闩的意思,也就是门锁,当锁没有打开时,相当于一个屏障把人挡在门外,CountDownLatch的作用是当没有放行时,可以让一组线程不能继续向下执行,等到某个节点时可以让线程“”抱团”继续执行。
CountDownLatch也可以称之为闭锁,是Java中线程的一种辅助工具类,其内部维护了一个不可为零值的初始化计数器,每当执行一次countDown()方法,计数器就会减一,当计数器为零值时,就会开始进行”抱团”开始的操作。
CountDownLatch在JDK1.5由Doug Lea引入。
适用场景
实际生活中也有很多类似的场景,比如一场运动会,等所有运动员到场比赛开始;一场会议,等所有参会人员到场会议开始。
总结起来可以概括为,等待一组线程在某个时机进行开就行后续行为。
实际开发中,比如需要对复杂数据进行组装,这个最终结果由多部分组成又互不依赖,为了节省时间就可以开启多线程进行异步加载,最终等所有数据加载完成,对数据进行封装返回。
使用案例
使用代码模拟运动会等待所有运动员准备完成,开始进行比赛的场景。
package juc.countdown;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/*** CountDownLatch 测试类** @author starsray* @date 2021/12/14*/public class Referee {public static void main(String[] args) {Referee referee = new Referee();try {referee.test();} catch (InterruptedException e) {e.printStackTrace();}}public void test() throws InterruptedException {CountDownLatch start = new CountDownLatch(1);int n = 10;CountDownLatch done = new CountDownLatch(n);for (int i = 0; i < n; i++) {new Thread(new Runner(start, done)).start();}Thread.sleep(1000);start.countDown();done.await();System.out.println("all runner complete!");}/*** 工作线程 执行单元*/public static class Runner implements Runnable {private final CountDownLatch start;private final CountDownLatch done;public Runner(CountDownLatch start, CountDownLatch done) {this.start = start;this.done = done;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " : prepare complete!");start.await();running();done.countDown();} catch (InterruptedException e) {e.printStackTrace();}}private void running() throws InterruptedException {System.out.println(Thread.currentThread().getName() + " : runner time : " + time());}public int time() {int time = (int) (1 + Math.random() * (10 - 1 + 1));try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}return time;}}}
输出结果:
Thread-1 : prepare complete!Thread-3 : prepare complete!Thread-2 : prepare complete!Thread-0 : prepare complete!Thread-4 : prepare complete!Thread-5 : prepare complete!Thread-6 : prepare complete!Thread-7 : prepare complete!Thread-8 : prepare complete!Thread-9 : prepare complete!Thread-1 : run time : 2Thread-9 : run time : 2Thread-0 : run time : 3Thread-2 : run time : 3Thread-7 : run time : 4Thread-6 : run time : 4Thread-3 : run time : 7Thread-5 : run time : 9Thread-8 : run time : 9Thread-4 : run time : 10all runner complete!
源码简析
对CountDownLatch的源码进行简单分析,查看构造方法
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}
前面说到不为零值,如果传入0会抛出IllegalArgumentException异常。而且里面维护了一个静态内部类Sync,继承了AbstractQueuedSynchronizer,调用CountDownLatch的构造方法其实也就是调用Sync的构造方法,然后设置了AQS的state的值。
CountDownLatch中关键的两个方法就是await()和countDown(),其中await阻塞调用的主线程继续执行,countDown使计数器的值每次通过CAS原子操作减一。
- await方法通过Sync调用了AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly方法会根据Sync中子类实现tryAcquireShared方法的结果进行判断,当AQS中state的值为0时tryAcquireShared返回1,抛出InterruptedException异常,当返回不为0时调用doAcquireSharedInterruptibly方法。
// CountDownLatch awaitpublic void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}// CountDownLatch 中Sync AQS方法tryAcquireShared实现protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 调用AQS中doAcquireSharedInterruptibly方法public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
- countDown方法通过Sync调用了AQS的releaseShared方法,releaseShared方法会根据子类实现Sync的tryReleaseShared执行结果判断是否继续调用AQS的doReleaseShared方法,进而返回true/false结果。
- tryReleaseShared方法可以看作是一个自选操作,每次获取AQS中当前state的值,如果为0,直接返回flase,否则就进行减一的操作,如果失败就继续重试,当最后state的值为0时,就要释放所有阻塞线程,调用AQS中的doReleaseShared方法。
// CountDownLatch countDownpublic void countDown() {sync.releaseShared(1);}// CountDownLatch 中Sync AQS方法tryReleaseShared实现protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
如果对于阻塞等待时间有要求可以使用await的重载方法,如果超时就会抛出InterruptedException异常。
public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}
CountDownLatch在实际开发中是使用率比较高的线程辅助工具类,灵活可用性强,源码实现基于AQS的基本功能,源码本身比较简单,需要深入理解AQS相关的内容。
