介绍:
会换栅栏,实现一组线程等待某个状态,再同时执行。CyclicBarrier可以被重用
构造方法
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。public CyclicBarrier(int parties)// 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)public CyclicBarrier(int parties, Runnable barrierAction)
重要方法
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时public int await() throws InterruptedException, BrokenBarrierExceptionpublic int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException//循环 通过reset()方法可以进行重置public void reset()/** The lock for guarding barrier entry */private final ReentrantLock lock = new ReentrantLock();/** Condition to wait on until tripped */private final Condition trip = lock.newCondition();
CyclicBarrier应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
public class CyclicBarrierTest2 {//保存每个学生的平均成绩private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();private ExecutorService threadPool= Executors.newFixedThreadPool(3);private CyclicBarrier cb=new CyclicBarrier(3,()->{int result=0;Set<String> set = map.keySet();for(String s:set){result+=map.get(s);}System.out.println("三人平均成绩为:"+(result/3)+"分");});public void count(){for(int i=0;i<3;i++){threadPool.execute(new Runnable(){@Overridepublic void run() {//获取学生平均成绩int score=(int)(Math.random()*40+60);map.put(Thread.currentThread().getName(), score);System.out.println(Thread.currentThread().getName()+"同学的平均成绩为:"+score);try {//执行完运行await(),等待所有学生平均成绩都计算完毕cb.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});}}public static void main(String[] args) {CyclicBarrierTest2 cb=new CyclicBarrierTest2();cb.count();}}
利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景
public class CyclicBarrierTest3 {public static void main(String[] args) {AtomicInteger counter = new AtomicInteger();ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 1000, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100),(r) -> new Thread(r, counter.addAndGet(1) + " 号 "),new ThreadPoolExecutor.AbortPolicy());CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> System.out.println("裁判:比赛开始~~"));for (int i = 0; i < 10; i++) {threadPoolExecutor.submit(new Runner(cyclicBarrier));}}static class Runner extends Thread{private CyclicBarrier cyclicBarrier;public Runner (CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {try {int sleepMills = ThreadLocalRandom.current().nextInt(1000);Thread.sleep(sleepMills);System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting());cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}}}}
CyclicBarrier与CountDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
- CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
- CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
- CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
- CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
- CyclicBarrier是通过ReentrantLock的”独占锁”和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现
核心就在 doawit中队列入队
判断条件在唤醒线程,再释放锁,条件队列转到同步队列,再从同步队列中唤醒线程的设计
CyclicBarrier源码分析
// 代码模式lock.lock();try{//和兴// condition的await;,// 阻塞线程,释放锁//进入条件队列// 可以在释放锁的时候唤醒head后续节点的所有线程// 被唤醒的线程获取锁,如果失败会阻塞 ,这个是独占锁,双向队列await();}finally{lock.unLock()}private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();// 会唤醒,将条件队列转为同步队列}
死锁
优化:可以调整时间,调整顺序
public class DeadLockTest {private static String a = "a";private static String b = "b";public static void main(String[] args) {Thread threadA = new Thread(()->{synchronized (a) {log.debug("threadA进入a同步块,执行中...");try {//Thread.sleep(2000); 条件队列作用: 打破死锁的循环a.wait(5000);synchronized (b) {log.debug("threadA进入b同步块,执行中...");}} catch (InterruptedException e) {e.printStackTrace();}}},"threadA");Thread threadB = new Thread(()->{synchronized (b) {log.debug("threadB进入b同步块,执行中...");try {//b.wait(5000);Thread.sleep(2000);synchronized (a) {log.debug("threadB进入a同步块,执行中...");}} catch (InterruptedException e) {e.printStackTrace();}}},"threadB");threadA.start();threadB.start();}}
/*** @author Fox* 哲学家就餐问题*/public class PhilosopherEatTest {public static void main(String[] args) {//初始化五根筷子Chopstick c1 = new Chopstick(1);Chopstick c2 = new Chopstick(2);Chopstick c3 = new Chopstick(3);Chopstick c4 = new Chopstick(4);Chopstick c5 = new Chopstick(5);// 思考: 如何打破循环, 调整获取锁的顺序new Philosopher("苏格拉底", c1, c2).start();new Philosopher("柏拉图", c2, c3).start();new Philosopher("亚里士多德", c3, c4).start();new Philosopher("赫拉克利特", c4, c5).start();new Philosopher("阿基米德", c1,c5).start();}}

