6.1 锁与队列的关系
- CLH锁的内部队列

- 分布式锁的内部队列

- AQS的内部队列
6.2 AQS的核心成员
6.2.1 状态标志位
//同步状态,使用 volatile保证线程可见private volatile int state;
state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都可以得到最新值。AQS提供了getState()、setState()来获取和设置同步状态,具体如下:
// 获取同步的状态protected final int getState() {return state;}// 设置同步的状态protected final void setState(int newState) {state = newState;}// 通过CAS设置同步的状态protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量叫exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的get()和set()方法,具体如下:
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {//表示当前占用该锁的线程private transient Thread exclusiveOwnerThread;// 省略get/set方法}
6.2.2 队列节点类
AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:
static final class Node {/**节点等待状态值1:取消状态*/static final int CANCELLED = 1;/**节点等待状态值-1:标识后继线程处于等待状态*/static final int SIGNAL = -1;/**节点等待状态值-2:标识当前线程正在进行条件等待*/static final int CONDITION = -2;/**节点等待状态值-3:标识下一次共享锁的acquireShared操作需要无条件传播*/static final int PROPAGATE = -3;//节点状态:值为SIGNAL、CANCELLED、CONDITION、PROPAGATE、0//普通的同步节点的初始值为0,条件等待节点的初始值为CONDITION(-2)volatile int waitStatus;//节点所对应的线程,为抢锁线程或者条件等待线程volatile Thread thread;//前驱节点,当前节点会在前驱节点上自旋,循环检查前驱节点的waitStatus状态volatile Node prev;//后继节点volatile Node next;//若当前Node不是普通节点而是条件等待节点,则节点处于某个条件的等待队列上//此属性指向下一个条件等待节点,即其条件队列上的后继节点Node nextWaiter;...}
6.2.3 FIFO双向同步队列
AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。
/*首节点的引用*/private transient volatile Node head;/*尾节点的引用*/private transient volatile Node tail;
6.2.4 JUC显式锁与AQS的关系
6.2.5 ReentrantLock与AQS的组合关系
- ReentrantLock与AQS的组合关系
ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer:
static abstract class Sync extends AbstractQueuedSynchronizer {...}
ReentrantLock为了支持公平锁和非公平锁两种模式,为Sync又定义了两个子类,具体如下:
final static class NonfairSync extends Sync {...}final static class FairSync extends Sync {...}
ReentrantLock提供了两个构造器,具体如下:
public ReentrantLock() { //默认的构造器sync = new NonfairSync(); //内部使用非公平同步器}public ReentrantLock(boolean fair) { //true 为公平锁,否则为非公平锁sync = fair ? new FairSync() : new NonfairSync();}
由ReentrantLock的lock()和unlock()的源码可以看到,它们只是分别调用了sync对象的lock()和release()方法。
public void lock() { //抢占显式锁sync.lock();}public void unlock() { //释放显式锁sync.release(1);}
-
6.3 AQS中的模板模式
6.3.1 模板模式
6.3.2 一个模板模式的参考实现
模板模式的参考实现代码 ```java package com.crazymakercircle.demo.lock; import com.crazymakercircle.util.Print; public class TemplateDemo {
static abstract class AbstractAction{/*** 模板方法:算法骨架*/public void tempMethod(){Print.cfo("模板方法的算法骨架被执行");beforeAction(); // 执行前的公共操作action(); // 调用钩子方法afterAction(); // 执行后的公共操作}/*** 执行前*/protected void beforeAction(){Print.cfo("准备执行钩子方法");}/*** 钩子方法:这里定义为一个抽象方法*/public abstract void action();/*** 执行后*/private void afterAction(){Print.cfo("钩子方法执行完成");}}//子类A:提供了钩子方法实现static class ActionA extends AbstractAction{/*** 钩子方法的实现*/@Overridepublic void action(){Print.cfo("钩子方法的实现 ActionA.action() 被执行");}}//子类B:提供了钩子方法实现static class ActionB extends AbstractAction{/*** 钩子方法的实现*/@Overridepublic void action(){Print.cfo("钩子方法的实现 ActionB.action() 被执行");}}public static void main(String[] args){AbstractAction action = null;//创建一个 ActionA 实例action= new ActionA();//执行基类的模板方法action.tempMethod();//创建一个 ActionB 实例action= new ActionB();//执行基类的模板方法action.tempMethod();}
}
<a name="jHzxv"></a>## 6.3.3 AQS的模板流程- Exclusive(独享锁):只有一个线程能占有锁资源,如ReentrantLock。独享锁又可分为公平锁和非公平锁。- Share(共享锁):多个线程可同时占有锁资源,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock的Read锁。<a name="MhyL3"></a>## 6.3.4 AQS中的钩子方法自定义同步器时,AQS中需要重写的钩子方法大致如下:- tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true,若失败则返回false- tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true,若失败则返回false- tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。- tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true,若失败则返回false。- isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。只有用到condition条件队列时才需要去实现它。1. tryAcquire独占式获取锁```javaprotected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
- tryRelease独占式释放锁
```java
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();}
3. tryAcquireShared共享式获取```javaprotected long tryAcquireShared(long arg) {throw new UnsupportedOperationException();}
- tryReleaseShared共享式释放
```java
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();}
5. 查询是否处于独占模式```javaprotected boolean isHeldExclusively() {throw new UnsupportedOperationException();}
6.4 通过AQS实现一把简单的独占锁
6.4.1 简单的独占锁的UML类图
6.4.2 简单的独占锁的实现
package com.crazymakercircle.demo.lock.custom;// 省略importpublic class SimpleMockLock implements Lock{//同步器实例private final Sync sync = new Sync();// 自定义的内部类:同步器// 直接使用 AbstractQueuedSynchronizer.state 值表示锁的状态// AbstractQueuedSynchronizer.state=1 表示锁没有被占用// AbstractQueuedSynchronizer.state=0 表示锁没已经被占用private static class Sync extends AbstractQueuedSynchronizer{//钩子方法protected boolean tryAcquire(int arg){//CAS更新状态值为1if (compareAndSetState(0, 1)){setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//钩子方法protected boolean tryRelease(int arg){//如果当前线程不是占用锁的线程if (Thread.currentThread() != getExclusiveOwnerThread()){//抛出非法状态的异常throw new IllegalMonitorStateException();}//如果锁的状态为没有占用if (getState() == 0){//抛出非法状态的异常throw new IllegalMonitorStateException();}//接下来不需要使用CAS操作,因为下面的操作不存在并发场景setExclusiveOwnerThread(null);//设置状态setState(0);return true;}}//显式锁的抢占方法@Overridepublic void lock(){//委托给同步器的acquire()抢占方法sync.acquire(1);}//显式锁的释放方法@Overridepublic void unlock(){//委托给同步器的release()释放方法sync.release(1);}// 省略其他未实现的方法}}
6.4.3 SimpleMockLock测试用例
package com.crazymakercircle.demo.lock;// 省略importpublic class LockTest{@org.junit.Testpublic void testMockLock(){// 每个线程的执行轮数final int TURNS = 1000;// 线程数final int THREADS = 10;//线程池,用于多线程模拟测试ExecutorService pool = Executors.newFixedThreadPool(THREADS);//自定义的独占锁Lock lock = new SimpleMockLock();// 倒数闩CountDownLatch countDownLatch = new CountDownLatch(THREADS);long start = System.currentTimeMillis();//10个线程并发执行for (int i = 0; i < THREADS; i++){pool.submit(() ->{try{//累加 1000 次for (int j = 0; j < TURNS; j++){//传入锁,执行一次累加IncrementData.lockAndFastIncrease(lock);}Print.tco("本线程累加完成");} catch (Exception e){e.printStackTrace();}//线程执行完成,倒数闩减少一次countDownLatch.countDown();});}// 省略等待并发执行完成、结果输出的代码}}
6.5 AQS锁抢占的原理
6.5.1 显式锁抢占的总体流程
6.5.2 AQS模板方法:acquire(arg)
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
6.5.3 钩子实现:tryAcquire(arg)
private static class Sync extends AbstractQueuedSynchronizer{//钩子方法protected boolean tryAcquire(int arg){//CAS更新状态值为1if (compareAndSetState(0, 1)){setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}
6.5.4 直接入队:addWaiter
private Node addWaiter(Node mode) {//创建新节点Node node = new Node(Thread.currentThread(), mode);// 加入队列尾部,将目前的队列tail作为自己的前驱节点predNode pred = tail;// 队列不为空的时候if (pred != null) {node.prev = pred;// 先尝试通过AQS方式修改尾节点为最新的节点// 如果修改成功,将节点加入队列的尾部if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋enq(node);return node;}
static final class Node {/** 常量标识:标识当前的队列节点类型为共享型抢占 */static final Node SHARED = new Node();/** 常量标识:标识当前的队列节点类型为独占型抢占 */static final Node EXCLUSIVE = null;// 省略其他代码}
6.5.5 自旋入队:enq
/*** 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作* 如果依然不存在,就把当前线程作为head节点* 插入节点后,调用acquireQueued()进行阻塞*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {//队列为空,初始化尾节点和头节点为新节点if (compareAndSetHead(new Node()))tail = head;} else {// 队列不为空,将新节点插入队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}/*** CAS 操作head指针,仅仅被enq()调用*/private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}/*** CAS 操作tail指针,仅仅被enq()调用*/private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}
6.5.6 自旋抢占:acquireQueued()
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 自旋检查当前节点的前驱节点是否为头节点,才能获取锁for (;;) {// 获取节点的前驱节点final Node p = node.predecessor();// 节点中的线程循环地检查自己的前驱节点是否为 head 节点// 前驱节点是head时,进一步调用子类的tryAcquire(…)实现if (p == head && tryAcquire(arg)) {// tryAcquire()成功后,将当前节点设置为头节点,移除之前的头节点setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起// 如果需要挂起// 调用parkAndCheckInterrupt()方法挂起当前线程,直到被唤醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true; // 若两个操作都是true,则置true}} finally {//如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了)//那么取消节点在队列中的等待if (failed)//取消请求,将当前节点从队列中移除cancelAcquire(node);}}
6.5.7 挂起预判:shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; // 获得前驱节点的状态if (ws == Node.SIGNAL) //如果前驱节点状态为SIGNAL(值为-1)就直接返回return true;if (ws > 0) { // 前驱节点以及取消CANCELLED(1)do {// 不断地循环,找到有效前驱节点,即非CANCELLED(值为1)类型节点// 将pred记录前驱的前驱pred = pred.prev;// 调整当前节点的prev指针,保持为前驱的前驱node.prev = pred;} while (pred.waitStatus > 0);// 调整前驱节点的next指针pred.next = node;} else {// 如果前驱状态不是CANCELLED,也不是SIGNAL,就设置为SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 设置前驱状态之后,此方法返回值还是为false,表示线程不可用,被阻塞}return false;}
6.5.8 线程挂起:parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 调用park()使线程进入waiting状态return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断}
6.6 AQS的两个关键点:节点的入队和出队
6.6.1 节点的自旋入队
private Node enq(final Node node) {for (;;) { //自旋入队Node t = tail;if (t == null) {//队列为空,初始化尾节点和头节点为新节点if (compareAndSetHead(new Node()))tail = head;} else {//如果队列不为空,将新节点插入队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
6.6.2 节点的出队
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;// 在前驱节点上自旋for (;;) {// 获取节点的前驱节点final Node p = node.predecessor();// (1)前驱节点是头节点// (2)通过子类的tryAcquire()钩子实现抢占成功if (p == head && tryAcquire(arg)) {// 将当前节点设置为头节点,之前的头节点出队setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 省略park(无限期阻塞)线程的代码}} finally {// 省略其他}}
AQS释放锁时是如何唤醒后继线程的呢?AQS释放锁的核心代码如下:
public final boolean release(long arg) {if (tryRelease(arg)) { //释放锁的钩子方法的实现Node h = head; //队列头节点if (h != null && h.waitStatus != 0)unparkSuccessor(h); //唤醒后继线程return true;}return false;}
unparkSuccessor的核心代码如下:
private void unparkSuccessor(Node node) {// 省略不相关代码Node s = node.next; //后继节点// 省略不相关代码if (s != null)LockSupport.unpark(s.thread); //唤醒后继节点的线程}
6.7 AQS锁释放的原理
6.7.1 SimpleMockLock独占锁的释放流程
6.7.2 AQS模板方法:release()
public final boolean release(long arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
6.7.3 钩子实现:tryRelease()
//钩子方法protected boolean tryRelease(int arg){//如果当前线程不是占用锁的线程if (Thread.currentThread() != getExclusiveOwnerThread()){//抛出非法状态的异常throw new IllegalMonitorStateException();}//如果锁的状态为没有占用if (getState() == 0){//抛出非法状态的异常throw new IllegalMonitorStateException();}//接下来不需要使用CAS操作,因为下面的操作不存在并发场景setExclusiveOwnerThread(null);//设置状态setState(0);return true;}}
6.7.4 唤醒后继:unparkSuccessor()
private void unparkSuccessor(Node node) {int ws = node.waitStatus; // 获得节点状态,释放锁的节点,也就是头节点//CANCELLED(1)、SIGNAL(-1)、CONDITION (-2)、PROPAGATE(-3)//若头节点状态小于0,则将其置为0,表示初始状态if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next; // 找到后头的一个节点if (s == null || s.waitStatus > 0) {// 如果新节点已经被取消CANCELLED(1)s = null;// 从队列尾部开始,往前去找最前面的一个 waitStatus 小于0的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0) s = t;}//唤醒后继节点对应的线程if (s != null)LockSupport.unpark(s.thread);}
6.8 ReentrantLock的抢锁流程
6.8.1 ReentrantLock非公平锁的抢占流程
6.8.2 非公平锁的同步器子类
ReentrantLock为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法lock()的源码如下:
static final class NonfairSync extends Sync {//非公平锁抢占final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}// 省略其他}
6.8.3 非公平抢占的钩子方法:tryAcquire(arg)
static final class NonfairSync extends Sync {//非公平锁抢占的钩子方法protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}// 省略其他}abstract static class Sync extends AbstractQueuedSynchronizer {final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();// 先直接获得锁的状态int c = getState();if (c == 0) {// 如果内部队列首节点的线程执行完了,它会将锁的state设置为0// 当前抢锁线程的下一步就是直接进行抢占,不管不顾// 发现state是空的,就直接拿来加锁使用,根本不考虑后面继承者的存在if (compareAndSetState(0, acquires)) {// 1. 利用CAS自旋方式判断当前state确实为0,然后设置成acquire(1)// 这是原子性的操作,可以保证线程安全setExclusiveOwnerThread(current);// 设置当前执行的线程,直接返回truereturn true;}}else if (current == getExclusiveOwnerThread()) {// 2.当前的线程和执行中的线程是同一个,也就意味着可重入操作int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);// 表示当前锁被1个线程重复获取了nextc次return true;}// 否则就返回false,表示没有成功获取当前锁,进入排队过程return false;}// 省略其他}
6.8.4 ReentrantLock公平锁的抢占流程
6.8.5 公平锁的同步器子类
static final class FairSync extends Sync {//公平锁抢占的钩子方法final void lock() {acquire(1);}// 省略其他}
6.8.6 公平抢占的钩子方法:tryAcquire(arg)
static final class FairSync extends Sync {//公平抢占的钩子方法protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); //锁状态if (c == 0) {if (!hasQueuedPredecessors() && //有后继节点就返回,足够讲义气compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
6.8.7 是否有后继节点的判断
6.9 AQS条件队列
6.9.1 Condition基本原理
public class ConditionObject implements Condition, java.io.Serializable {//记录该队列的头节点private transient Node firstWaiter;//记录该队列的尾节点private transient Node lastWaiter;}

在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以调用newCondition()创建两个等待队列,具体如下:
private Lock lock = new ReentrantLock();//创建第一个等待队列private Condition firstCond = lock.newCondition();//创建第二个等待队列private Condition secondCond = lock.newCondition();
6.9.2 await()等待方法原理

在await()方法将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。await()方法的核心代码如下:
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter(); // step 1int savedState = fullyRelease(node); // step 2int interruptMode = 0;while (!isOnSyncQueue(node)) { // step 3LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) // step 4&& interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) //step 5unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()方法完成,该方法具体如下:
private Node addConditionWaiter() {Node t = lastWaiter;// 如果尾节点取消,重新定位尾节点if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 创建一个新Node,作为等待节点Node node = new Node(Thread.currentThread(), Node.CONDITION);// 将新Node加入等待队列if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
6.9.3 signal()唤醒方法原理

//唤醒public final void signal() {//如果当前线程不是持有该锁的线程,就抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first); //唤醒头节点}//执行唤醒private void doSignal(Node first) {do {//出队的代码写的很巧妙,要看仔细//first出队,firstWaiter 头部指向下一个节点,自己的nextWaiterif ((firstWaiter = first.nextWaiter) == null)lastWaiter = null; //如果第二节点为空,则尾部也为空//将原来头部first的后继置空,help for GCfirst.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}//将被唤醒的节点转移到同步队列final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node); // step 1int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread); // step 2:唤醒线程return true;}
