一、简介
Java 中实现同步除了有关键字synchronized 外,还有JDK5发布并发包下的锁。并发包下的锁其实也是采用了CAS算法和队列实现的,下面让我们了解一下具体的实现。
二、Lock 接口
2.1 介绍
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。
2.2 Lock 接口的定义
public interface Lock {/*** 获取锁,调用该方法当前线程会获取锁*/void lock();/***可中断获取锁,即在获取锁的过程中,可以中断当前线程*/void lockInterruptibly() throws InterruptedException;/*** 尝试非阻塞的获取锁,调用该方法后立即返回,返回值为true 则获取到锁,false则没有*/boolean tryLock();/*** 超时尝试获得锁:* 有以下三种情况* 1. 当前线程在给超时时间内获取到锁* 2. 当前线程在超时时间内被中断* 3. 超时时间结束,返回false*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/*** 释放锁*/void unlock();/***获取等待通知组件,该组件与当前锁绑定,线程只有获取到了锁后,才能调用Conditon 中的 await(), signal()等等待通知方法* 与Object 对象中的wait,notify 类似*/Condition newCondition();}
2.3 Lock 接口比关键字 synchronized 的优势
- 可以尝试非阻塞的获取锁
- 能超时获取锁
- 能被中断地获取锁
三、队列同步器(AQS, AbstractQueuedSynchronizer)
3.1 介绍
- 队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
- 同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。
3.2 基本结构图示

3.3 AQS 供子类重写的方法
// 互斥模式下使用:尝试获取锁protected boolean tryAcquire(int arg)// 互斥模式下使用:尝试释放锁protected boolean tryRelease(int arg)//共享模式下使用:尝试获取锁protected int tryAcquireShared(int arg)// 共享模式下使用:尝试释放锁protected boolean tryReleaseShared(int arg)// 如果当前线程独占着锁,返回trueprotected boolean isHeldExclusively()
3.4 AQS 中的模板方法
//独占式获取锁public final void acquire(int arg) {//1.tryAcquire(arg)尝试获取锁失败//2. 则进入队列中acquireQueued(addWaiter(Node.EXCLUSIVE), arg)//3. 没有能进入队列,则打断线程if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//可响应中断地获取锁public final void acquireInterruptibly(int arg)//在acquireInterruptibly方法的基础上加上超时限制,如果在超时时间内没有获取锁,则返回falsepublic final boolean tryAcquireNanos(int arg, long nanosTimeout)//共享式获取锁public final void acquireShared(int arg)//可打断,共享式获取锁public final void acquireSharedInterruptibly(int arg)//可打断,超时机制,共享式获取锁public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)//独占式释放锁public final boolean release(int arg)//共享式释放锁public final boolean releaseShared(int arg)
3.5 AQS 内部节点类
static final class Node {/** 标记该节点是共享模式 waiting */static final Node SHARED = new Node();/** 标识一个节点是互斥模式 waiting*/static final Node EXCLUSIVE = null;/** waitStatus 等待状态的值 1标识该线程已被取消*/static final int CANCELLED = 1;/** 标识后继节点需要唤醒*/static final int SIGNAL = -1;/** 标识线程等待在一个条件上 */static final int CONDITION = -2;/*** 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)*/static final int PROPAGATE = -3;/*** 当前节点保存的线程对应的等待状态*/volatile int waitStatus;/*** aqs同步队列节点前驱*/volatile Node prev;/*** aqs同步队列节点后继*/volatile Node next;/*** 当前节点保存的线程;*/volatile Thread thread;/*** 下一个等待在条件上的节点(Condition等待队列指针)*/Node nextWaiter;/*** 判断下一个等待在条件上的节点是否是共享模式*/final boolean isShared() {return nextWaiter == SHARED;}/*** 获取前一个节点*/final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
3.6 AQS 操作图示
- 添加节点到AQS 的过程(没有获取到锁,加入队列)
同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

- 成功获取锁,首节点设置的过程(公平锁)
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可
- 独占式获取锁的流程图

- 独占式超时获取锁

四、LockSupport工具
4.1 方法(以下方法底层都采用sun.misc.Unsafe 类实现)
#阻塞当前线程public static void park()#超时阻塞当前线程public static void parkNanos(Object blocker, long nanos)#阻塞当前线程,直到deadlinepublic static void parkUntil(Object blocker, long deadline)#唤醒处于阻塞的线程public static void unpark(Thread thread)
4.2 简单使用
Thread thread = new Thread(() -> {long a = System.currentTimeMillis();//阻塞线程LockSupport.park();System.out.println("线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");});thread.start();Thread.sleep(1000);//唤醒线程//也可以使用打断线程来唤醒//thread.interrupt();LockSupport.unpark(thread);//输出:线程完成! 时间: 1秒
4.3 比较LockSupport.park(),Object.wait() 等方法
public class TestLockSupport {private static Object lock = new Object();public static void main(String[] args) throws InterruptedException {testObjectWait();testLockSupport();}public static void testLockSupport() throws InterruptedException {Thread thread = new Thread(() -> {long a = System.currentTimeMillis();//阻塞线程//1. 可以在任意地方调用//2. 不需要捕获异常//3.不会释放锁资源,它只是单纯阻塞线程LockSupport.park();System.out.println("testLockSupport 线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");});thread.start();Thread.sleep(1000);//主线程唤醒thread线程//1. 可以唤醒指定线程//2. 可以在任意地方调用LockSupport.unpark(thread);}public static void testObjectWait() throws InterruptedException {Thread thread = new Thread(() -> {long a = System.currentTimeMillis();//阻塞线程synchronized (lock) {try {//方法Object.wait调用前,需要先获取对象监视器,即在同步方法内或同步代码块内调用//Object.wait会抛出InterruptedException异常lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("testObjectWait 线程完成! 时间: " + ((System.currentTimeMillis() - a) / 1000) + "秒");});thread.start();Thread.sleep(1000);//主线程唤醒thread线程//1. 随机唤醒被阻塞的一个线程//2. 需要先获取对象监视器,即在同步方法内或同步代码块内调用//3. 不能在Object.wait()方法前调用,否则抛出IllegalMonitorStateException异常synchronized (lock) {lock.notify();}}}
总结:
- Object.wait()方法,需要先获取对象监视器,即在同步方法内或同步代码块内调用
- Object.wait() 会抛出InterruptedException异常
- LockSupport.park() 可以在任意地方调用
- LockSupport.park() 不需要捕获异常
- LockSupport.park() 不会释放锁资源,它只是单纯阻塞线程
- Object.notify() 随机唤醒被阻塞的一个线程
- Object.notify() 需要先获取对象监视器,即在同步方法内或同步代码块内调用
- Object.notify() 不能在Object.wait()方法前调用,否则抛出IllegalMonitorStateException异常
- LockSupport.unpark(Thread thread) 可以唤醒指定线程
- LockSupport.unpark(Thread thread) 可以在任意地方调用
五、Condition接口
5.1 接口定义
public interface Condition {/*** 阻塞,直到被唤醒或被中断*/void await() throws InterruptedException;/***阻塞,直到被唤醒,不能被中断*/void awaitUninterruptibly();/*** 超时阻塞*/long awaitNanos(long nanosTimeout) throws InterruptedException;/*** 超时阻塞*/boolean await(long time, TimeUnit unit) throws InterruptedException;/*** 阻塞*/boolean awaitUntil(Date deadline) throws InterruptedException;/***唤醒*/void signal();/*** 唤醒全部*/void signalAll();}
5.2 Condition 实现的等待队列的基本结构
5.3 同步队列与等待队列图示
Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
5.4 利用Condition的等待通知过程图示

5.5 唤醒图示
在等待队列中的唤醒节点,需要考虑并发问题,需要采用了cas 重新入同步队列,保证是队列的尾节点
六、实例
在动手实现的锁中,我们自己实现的同步队列。 在这里我们使用AQS来实现一个公平、可重入锁。
public class MyLock {private Sync sync = new Sync();private class Sync extends AbstractQueuedSynchronizer {public Sync() {}@Overrideprotected boolean tryAcquire(int state) {final Thread current = Thread.currentThread();//获取当前状态int c = getState();//判断是否持有锁if (c == 0) {//这里实现的是公平锁//判断当前线程是否是同步队列头指针指向的线程节点//cas修改状态if (!hasQueuedPredecessors() && compareAndSetState(0, state)) {//获取到锁,标识当前线程持有锁setExclusiveOwnerThread(current);return true;}} else if (current == getExclusiveOwnerThread()) { //c>0 已持有锁,处理可重入锁操作//修改状态int nextc = c + state;if (nextc < 0)//防止超出int 值范围throw new Error("超出锁可重入次数");//设置状态setState(nextc);return true;}return false;}@Overrideprotected boolean tryRelease(int state) {//修改状态int c = getState() - state;//判断释放锁的线程是否是持有锁if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {//如果状态值为0, 则线程释放锁free = true;setExclusiveOwnerThread(null);}setState(c);return free;}}public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}}
- 测试
public class MyLockTest {static MyLock lock=new MyLock();public static void test(){lock.lock();try {System.out.println(Thread.currentThread().getName()+ " 进入test "+ System.currentTimeMillis());test2();Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {lock.unlock();System.out.println(Thread.currentThread().getName() +" 释放锁"+ System.currentTimeMillis());}}public static void test2(){lock.lock();try {System.out.println("进入test2"+ System.currentTimeMillis());Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {lock.unlock();System.out.println("test2释放锁"+ System.currentTimeMillis());}}public static void main(String[] args) {//测试是否可重入//test锁没有释放,也可以进入方法test2test();//公平锁测试//按顺序输出// new Thread(()->{test();},"Thread-B").start();// new Thread(()->{test();},"Thread-A").start();// new Thread(()->{test();},"Thread-C").start();}}
在并发包下,实现锁的基础原理知识了解完毕。以上自定义锁的实现,就是摘自ReentrantLock 锁的公平锁实现,接下来该了解一下,并发包下锁的实现细节,如公平锁和非公平锁实现区别、共享锁和非共享锁实现区别、如何加入超时机制、如何加入中断机制等等。
参考
- 《Java并发编程的艺术》
- 死磕 java同步系列之AQS起篇
