Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知 模式。
Object的监视器方法与Condition接口的对比
| 对比项 | Object Monitor Methods | Condition |
|---|---|---|
| 前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 |
| 调用方式 | 直接调用。如object.wait() | 直接调用。如condition.await() |
| 等待队列个数 | 一个 | 多个 |
| 当前线程释放锁并进入等待状态 | 支持 | 支持 |
| 当前线程释放锁并进入等待状态,在等待状态不响应中断 | 不支持 | 支持 |
| 担心线程释放锁并进入超时等待状态 | 支持 | 支持 |
| 当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
| 唤醒等待队列中的一个线程 | 支持 | 支持 |
| 唤醒等待队列中的全部线程 | 支持 | 支持 |
使用示例
class ConditionUseCase {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();public void conditionWait() throws InterruptedException {lock.lock();try {condition.await();} finally {lock.unlock();}}public void conditionSignal() throws InterruptedException {lock.lock();try {condition.signal();} finally {lock.unlock();}}}
| 方法名称 | 描述 |
|---|---|
| void await() throws InterruptedException | 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回的情况,包括: 其他线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒 - 其他线程(调用interrupt()方法)中断当前线程 |
- 如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所对应的锁
|
| void awaitUninterruptibly() | 当前线程进入等待状态直到被通知。不响应中断 |
| long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余时间,如果在nanosTimeout纳秒之前被唤醒,纳秒返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,纳秒就可以认定已经超时了 |
| boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断、或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则表示到了指定时间,返回false |
| void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁 |
| void signalAll() | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得Condition相关联的锁 |
public class BoundedQueue<T> {private Object[] items;private int addIndex, removeIndex, count;private Lock lock = new ReentrantLock();private Condition notFull = lock.newCondition();private Condition notEmpty = lock.newCondition();public BoundedQueue(int size) {items = new Object[size];}//添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”public void add(T t) throws InterruptedException {lock.lock();try {while(count == items.length) {notFull.await();}items[addIndex] = t;if(++addIndex == items.length) {addIndex = 0;}++count;notEmpty.signal();} finally {lock.unlock();}}//由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素public T remove() throws InterruptedException {lock.lock();try {while(count == 0) {notEmpty.await();}Object x = items[removeIndex];if(++removeIndex == items.length) {removeIndex = 0;}--count;notFull.signal();return (T) x;} finally {lock.unlock();}}}
ConditionObject是同步器AQS的内部类,每个Condition对象都包含着一个队列,该队列是Condition实现等待/通知功能的关键。
等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待线程。Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。
新增节点只需要将原有的尾节点nextWaiter指向它,并更新尾节点即可。这个过程并没有使用CAS保证,原有在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
等待
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//构造节点并加入等待队列尾节点Node node = addConditionWaiter();//释放锁int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点,在唤醒节点之前,会将节点移到同步队列中。调用该方法的前置条件是获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移到到同步队列并使用LockSupport唤醒节点中的线程。
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移到同步队列。当节点移到到同步队列后,当前线程再使用LockSupport唤醒该节点的线程
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
