一、介绍
1.1 简介
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
1.2 Semaphore API
//返回此信号量中当前可用的许可证数。public int availablePermits();//返回正在等待获取许可证的线程数。public final int getQueueLength();//是否有线程正在等待获取许可证。public final boolean hasQueuedThreads();//获取并返回所有立即可用的许可证。public int drainPermits();//减少reduction个许可证,是个protected方法。protected void reducePermits(int reduction)//返回所有等待获取许可证的线程集合protected Collection<Thread> getQueuedThreads() ;//是否公平锁public boolean isFair();
二、示例
模拟30个线程读取数据保存到数据库中,而数据库的连接数只有10个,这是需要限制获取数据库连接数最多只能10;不然获取不到数据库连接异常。
public class SemaphoreTest {private static final int THREAD_COUNT = 30;private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);private static Semaphore s = new Semaphore(10);public static void main(String[] args) {for (int i = 0; i < THREAD_COUNT; i++) {int a=i;threadPool.execute(new Runnable() {@Overridepublic void run() {try {s.acquire();System.out.println("save data" + a);TimeUnit.SECONDS.sleep(1);s.release();} catch (InterruptedException e) {}}});}threadPool.shutdown();}}
三、源码分析
3.1 UML 图示

从UML 图示可以看到,Semaphore 内部采用AQS 实现了公平锁和非公平锁
3.2 Semaphore的构造方法
//默认初始化非公平锁public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}
3.3 acquire 方法
从Semaphore中获取”许可证“,阻塞直到有一个”许可证“可用 或者 线程被打断。
获取许可证,如果有许可证可用, 线程会立即返回执行,并减少许可证的数量;
如果没有许可证可用,那么会阻塞线程的执行,直到:
- 有其他线程释放有许可证, 当前线程是下一个分配许可证
- 有其他线程打断当前线程
实际上是调用acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//判断线程是否被打断if (Thread.interrupted())throw new InterruptedException();//获取共享锁if (tryAcquireShared(arg) < 0)//失败后,进入等待队列doAcquireSharedInterruptibly(arg);}
非公平锁 获取共享锁实现nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {//自旋for (;;) {//获取”许可证“数量int available = getState();//剩余”许可证“数量int remaining = available - acquires;//如果”许可证“数量少于0, 则阻塞线程if (remaining < 0 ||//否则,cas 更新可用的”许可证“数量compareAndSetState(available, remaining))return remaining;}}
公平锁 获取共享锁实现
protected int tryAcquireShared(int acquires) {//自旋for (;;) {//当前是否有线程在等待队列中,等待获取许可证if (hasQueuedPredecessors())return -1;//获取”许可证“数量int available = getState();//剩余”许可证“数量int remaining = available - acquires;//如果”许可证“数量少于0, 则阻塞线程if (remaining < 0 ||//否则,cas 更新可用的许可证数量compareAndSetState(available, remaining))return remaining;}}
3.4 release 方法
实际上是调用releaseShared 方法
public final boolean releaseShared(int arg) {//尝试释放共享锁if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
tryReleaseShared 方法逻辑
protected final boolean tryReleaseShared(int releases) {//自旋for (;;) {//获取”许可证“数量int current = getState();//回收后的数量int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//cas 更新”许可证“数量if (compareAndSetState(current, next))return true;}}
参考
- 《Java并发编程的艺术》
