Semaphore介绍
信号量:操作系统中PV操作的原语在java的实现,AbstractQueuedSynchronizer实现
大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现
大小为n(n>0)的信号量可以实现限流的功能,只能有n个线程同时获取信号量
PV操作是操作系统一种实现进程互斥与同步的有效方法
PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思
PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性
①S减1;②若S减1后仍大于或等于0,则进程继续执行;③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。V操作的主要动作是:①S加1;②若相加后结果大于0,则进程继续执行;③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。
Semaphore 常用方法
构造器
/*** Creates a {@code Semaphore} with the given number of* permits and nonfair fairness setting.** @param permits the initial number of permits available.* This value may be negative, in which case releases* must occur before any acquires will be granted.*/public Semaphore(int permits) {sync = new NonfairSync(permits);}//fair 是否公平//permits 许可的资源数public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
public void acquire() throws InterruptedExceptionpublic boolean tryAcquire()public void release()public int availablePermits()public final int getQueueLength()public final boolean hasQueuedThreads()protected void reducePermits(int reduction)
- acquire() 表示阻塞并获取许可
- tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
- release() 表示释放许可
- int availablePermits():返回此信号量中当前可用的许可证数。
- int getQueueLength():返回正在等待获取许可证的线程数。
- boolean hasQueuedThreads():是否有线程正在等待获取许可证。
- void reducePermit(int reduction):减少 reduction 个许可证
- Collection getQueuedThreads():返回所有等待获取许可证的线程集合
Semaphore源码分析
关注点:
1. Semaphore的加锁解锁(共享锁)逻辑实现
2. 线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现
3:获取资源失败的入队
4:释放资源之后的取队列head 和唤醒后一个节点的线程 ```java public final void acquireSharedInterruptibly(int arg)
}throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)//取获取资源doAcquireSharedInterruptibly(arg);//获取不到入队操作
/**
* Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);//获取node,没有就入队boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//获取资源失败取阻塞if (shouldParkAfterFailedAcquire(p, node) &&//设置节点的waitStatus = -1parkAndCheckInterrupt())// park线程throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
```javapublic final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//释放资源, 资源数修改doReleaseShared();//唤醒下一个线程return true;}return false;}//AQS实现的private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//先改状态 为0if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);//唤醒下一个节点}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);//直接唤醒下一个线程}public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);//继续唤醒下个节点p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();//继续唤醒线程}}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
其他链接:
https://www.processon.com/view/link/61950f6e5653bb30803c5bd2
