- 1. 线程池的 ctl
- 2. 线程池生命周期状态常量
- 3. 打包函数与拆包函数
- 4. 运行状态的判断
- 5. 修改 ctl 中 workCount 的大小
- 6. 修改 ctl 中 runState 的大小
- 7. 仍存在的疑问
- 8. threadPoolExecutor.execute
- 9. 添加工作线程-addWorker
- 2:当工作线程数 < 核心线程数的时候,通过
addWorker(command, true)添加核心线程执行 command 任务。 - 3:double check 的时候,如果发现线程池处于正常运行状态但是里面没有工作线程,则添加个空任务和一个普通线程,这样一个 task 为空的 worker 在线程执行的时候会去阻塞任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务。
- 4:队列已满的情况下,通过添加普通线程(非核心线程)去执行当前任务,如果失败了则执行拒绝策略。
https://www.cnblogs.com/jajian/p/11442929.html
1. 线程池的 ctl
ctl是线程池源码中常常用到的一个变量。- 它的主要作用是记录线程池的生命周期状态和当前工作的线程数。
- 作者通过巧妙的设计,将一个整型变量按二进制位分成两部分,分别表示两个信息。
1.1 声明与初始化
源码:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
分析一波:
- ctl (线程池控制状态)是原子整型的,这意味这对它进行的操作具有原子性。
- 因此作为 ctl 组成部分的
runState(线程池生命周期状态) 和workerCount(工作线程数) 也将同时具有原子性。 - ThreadPoolExecutor 使用
ctlOf方法来将runState和workerCount两个变量(都是整型) 打包成一个ctl变量。稍后将解读这个方法的实现。
1.2. 两个工具人常量 COUNT_BITS 和 CAPACITY
源码:
private static final int COUNT_BITS = Integer.SIZE - 3; // 29private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111
分析一波:
COUNT_BITS常量的值为Integer.SIZE - 3(29) ,其中Integer.SIZE为整型最大位数,取其为 32 。- 如此
COUNT_BITS实际的值其实就是 29 。(因为线程池的状态有 5 个,需要 3 位去表示) - CAPACITY 常量的值为
(1 << COUNT_BITS) - 1,其中<<为左移运算符,这么说可能不太直观,我以二进制直接写出这个数将有助于理解:
00000000 00000000 00000000 00000001 << 29 - 1 → 00011111 11111111 11111111 11111111
- 因此在接下来的代码中,
**COUNT_BITS**(即 29) 就用来表示分隔**runState**和**workerCount**的位数; - 而
**CAPACITY**则作为取这两个变量 (**runState**和**workerCount**) 的工具(具体是怎么使用的请看下文)
2. 线程池生命周期状态常量
源码:
private static final int RUNNING = -1 <<COUNT_BITS;private static final int SHUTDOWN = 0 <<COUNT_BITS;private static final int STOP = 1 <<COUNT_BITS;private static final int TIDYING = 2 <<COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING: 111 00000000000000000000000000000SHUTDOWN: 000 00000000000000000000000000000STOP: 001 00000000000000000000000000000TIDYING: 010 00000000000000000000000000000TERMINATED: 011 00000000000000000000000000000
分析一波:
- 这里解答了上边关于 COUNT_BITS 变量为什么要减 3 的问题:因为线程池的生命周期有 5 个状态,为了表达这 5 个状态,我们需要 3 个二进制位。
- 注意到这里标注状态使用的并不是 -1 ~ 3 ,而是这 5 个数字分别左移 COUNT_BITS 位,这样做的好处将在接下来的代码中得到体现。
3. 打包函数与拆包函数
源码:
// capacity:00011111 11111111 11111111 11111111//拆包函数private static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }//打包函数private static int ctlOf(int rs, int wc) { return rs | wc; }
分析一波:
- 此处我们解答了
**CAPACITY = 00011111 11111111 11111111 11111111**常量的作用,他是一个后 29 位均为 1 ,前 3 位为 0 的整数,因此我们可以通过:- 对 CAPACITY 和 ctl 进行
**&**(按位与)操作就能取到 ctl 的后 29 位,即**workerCount**。 - 对 CAPACITY 进行
**~**(按位取反)操作后,再和 ctl 进行**&**操作就能取到**runState**。它的高 3 位是 ctl 的高 3 位,低 29 位为 0。这也解释了为什么之前提到的生命周期常量要在 -1 ~ 3 的基础上再左移 29 位,因为不在常量初始化处左移的话就要在拆包的时候右移来保证取到的是正确的数值。然而拆包操作是要经常进行的,而常量的初始化只有一次。两下对比,明显在初始化时左移是效率更高的选择。
- 对 CAPACITY 和 ctl 进行
- 除了拆包时的效率,常量初始化时左移也提高了打包函数的效率:此处打包函数可以直接对 runState 和 workerCount 进行
|(按位或) 操作来得到 ctl 变量,就是因为 runState 的高 3 位为有效信息,而 workerCount 的低 29 位为有效信息,合起来正好得到一个含 32 位有效信息的整型变量。 - 说到这里可能仍有些让人疑惑,我将再以二进制的形式表示出所有涉及到的变量/常量:
//下文中a和b分别代表runState和workerCount的有效信息//CAPACITY0001 1111 1111 1111//ctlaaab bbbb bbbb bbbb//runStateaaa0 0000 0000 0000//workerCount000b bbbb bbbb bbbb
4. 运行状态的判断
源码:
private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}
分析一波:
- 注意这里传入的s是用了之前定义的生命周期常量。
- 这里判断状态的大小时,直接将 c 和 s 进行了比较,这是因为代表状态的信息占据了两个变量的高 3 位,而比较高位的大小时,低位是没有影响的。
5. 修改 ctl 中 workCount 的大小
/*** 尝试通过 cas 方式增加 ctl 中的 工作线程数*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** 尝试通过 cas 方式减少 ctl 中的 工作线程数*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** 递减 ctl 的 workerCount 字段. 仅在调用线程突然中止时(see processWorkerExit)*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}
注意到这里的修改都使用了原子整型的 CAS 方法。
6. 修改 ctl 中 runState 的大小
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
注意到修改 runState 并没有再提供专门的方法,而是直接使用了原子整型的 CAS 方法来替换原来的 ctl 。
7. 仍存在的疑问
- Q1:如果经过递增 compareAndIncrementWorkerCount ,使得 workerCount 的大小超过29位,会发生什么?会有安全检查吗?
- A1:有安全检查,在ThreadPoolExecutor 类的 addWorker 方法中有这样一行代码:
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;
8. threadPoolExecutor.execute
public void execute(Runnable command) {// 1. 进行任务的非空校验if (command == null)throw new NullPointerException();// 2. 添加核心线程执行任务int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 3. 任务入队列 (会把逻辑判断和逻辑返回 true 后的执行(workQueue.offer(command))一并放到 if 里面)// 如果任务队列满了,workQueue.offer(command) 返回 false,否则插入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 二次校验(如果没有在运行中,就从队列删除,然后执行拒绝策略)if (! isRunning(recheck) && remove(command))reject(command);// 如果线程池中没有线程了,开启一个新的线程执行任务(上边已经把任务放入队列中了,所以 commend 为 null)else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 4. 添加普通线程执行任务,如果失败则执行拒绝策略else if (!addWorker(command, false))reject(command);}
- 任务的非空校验。
- 获取当前
RUNNING的线程数,如果小于核心线程数,则创建核心线程去执行任务,否则走 #3。 - 如果当前线程池处于
RUNNING状态,那么就将任务放入队列中。这时还会再做个双重校验,因为可能存在有些线程在我们上次检查后死了,或者从我们进入这个方法后 pool 被关闭了,所以我们需要再次检查 state。如果线程池停止了就需要回滚刚才的添加任务到队列中的操作并通过拒绝策略拒绝该任务,或者如果池中没有线程了,则新开启一个线程执行任务。 - 如果队列满了之后无法在将任务加入队列,则创建新的线程去执行任务,如果也失败了,那么就可能是线程池关闭了或者线程池饱和了,这时执行拒绝策略不再接受任务。
双重校验中有以下两个点需要注意:
- 为什么需要 double check 线程池的状态?
在多线程环境下,线程池的状态时刻在变化,而 ctl.get() 是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workque 是线程池之前的状态。倘若没有 double check,万一线程池处于非 running 状态(在多线程环境下很有可能发生),那么 command 永远不会执行。
- 为什么 addWorker(null, false) 的任务为 null?
addWorker(null, false),这个方法执行时只是创建了一个新的线程,但是没有传入任务,这是因为前面已经将任务添加到队列中了
代码流程图:
以上的源码其实只有 10 几行,看起来很简单,主要是它的封装性比较好,其中主要有两个点需要重点解释,分别是:线程池的状态和 addWorker() 添加工作的方法,这两个点弄明白了这段线程池的源码差不多也就理解了。
线程池运行状态-runState
线程有状态,线程池也有它的运行状态,这些状态提供了主生命周期控制,伴随着线程池的运行,由内部来维护,从源码中我们可以发现线程池共有5个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
各状态值所代表的的含义和该状态值下可执行的操作,具体信息如下:
| 运行状态 | 状态描述 |
|---|---|
| RUNNING | 接收新任务,并且也能处理阻塞队列中的任务。 |
| SHUTDOWN | 不接收新任务,但是却可以继续处理阻塞队列中的任务。 |
| STOP | 不接收新任务,同时也不处理队列任务,并且中断正在进行的任务。 |
| TIDYING | 所有任务都已终止,workercount (有效线程数) 为0,线程转向 TIDYING 状态将会运行 terminated() 钩子方法。 |
| TERMINATED | terminated() 方法调用完成后变成此状态。 |
生命周期状态流转如下图所示:
很多时候我们表示状态都是通过简单的 int 值来表示,例如数据库数据的删除标志 delete_flag 其中 0 表示有效,1 表示删除。而在线程池的源码里我们可以看到它是通过如下方式来进行表示的,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)何做到的呢?将十进制 int 值转换为二进制的值,共32位,其中高 3 位代表运行状态(runState ),而低 29 位代表工作线程数(workerCount)。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
//获取线程池状态private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程数量private static int workerCountOf(int c) { return c & CAPACITY; }// Packing and unpacking ctl → 将 runState 和 workerCount 两个变量(都是整型)打包成一个 ctl 变量private static int ctlOf(int rs, int wc) { return rs | wc; }
通过巧妙的位运算可以分别获取高 3 位的运行状态值和低 29 位的线程数量值
9. 添加工作线程-addWorker
添加线程是通过 addWorker() 方法来实现的,这个方法有两个入参,Runnable firstTask 和 boolean core。
private boolean addWorker(Runnable firstTask, boolean core){...}
Runnable firstTask即是当前添加的线程需要执行的首个任务.boolean core用来标记当前执行的线程是否是核心线程还是普通线程.
返回前面的线程池的 execute() 方法的代码中,可以发现这个addWorker() 有三个地方在调用,分别在 #2,#3和#4。
2:当工作线程数 < 核心线程数的时候,通过
addWorker(command, true)添加核心线程执行 command 任务。3:double check 的时候,如果发现线程池处于正常运行状态但是里面没有工作线程,则添加个空任务和一个普通线程,这样一个 task 为空的 worker 在线程执行的时候会去阻塞任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务。
4:队列已满的情况下,通过添加普通线程(非核心线程)去执行当前任务,如果失败了则执行拒绝策略。
**addWorker()** 方法调用的地方我们看完了,接下来我们一起来看下它里面究竟做了些什么,源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
这个方法稍微有点长,我们分段来看下,将上面的代码我们拆分成两个部分来看,首先看第一部分:
retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);//获取线程池的状态// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 尝试通过CAS方式增加workerCountif (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl// 如果线程池状态发生变化,重新从最外层循环if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}
这部分代码有两层嵌套的 for 死循环,在第一行有个retry:代码,这个也许有些同学没怎么见过,这个是相当于是一个位置标记,retry后面跟循环,标记这个循环的位置。
我们平时写 for 循环的时候,是通过
continue;或break;来跳出当前循环,但是如果我们有多重嵌套的 for 循环,如果我们想在里层的某个循环体中当达到某个条件的时候直接跳出所有循环或跳出到某个指定的位置,则使用retry:来标记这个位置就可以了,之后跳转到这个位置时用continue retry。
代码中共有4个位置有改变循环体继续执行下去,分别是两个return false;,一个break retry;和一个continue retry;
首先我们来看下第一个 **return false;**,这个 return 在最外层的一个 for 循环,
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;
这是一个判断线程池状态和线程队列情况的代码,这个逻辑判断有点绕可以改成
rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
这样就好理解了,逻辑判断成立可以分为以下几种情况直接返回 false,表示添加工作线程失败。
rs > shutdown: 线程池状态处于STOP,TIDYING,TERMINATED时,添加工作线程失败,不接受新任务。rs >= shutdown && firstTask != null:线程池状态处于SHUTDOWN,STOP,TIDYING,TERMINATED状态且 worker 的首个任务不为空时,添加工作线程失败,不接受新任务。rs >= shutdown && workQueue.isEmpty:线程池状态处于SHUTDOWN,STOP,TIDYING,TERMINATED状态且阻塞队列为空时,添加工作线程失败,不接受新任务。线程池处于 shutdown 且 队列任务不为空时,可以执行 addWork
这样看来,最外层的 for 循环是不断的校验当前的线程池状态是否能接受新任务,如果校验通过了之后才能继续往下运行。
然后接下来看第二个**return false;**,这个 return 是在内层的第二个 for 循环中,是判断线程池中当前的工作线程数量的,不满足条件的话直接返回 false,表示添加工作线程失败。
- 工作线程数量是否超过可表示的最大容量(CAPACITY).
- 如果添加核心工作线程,是否超过最大核心线程容量(corePoolSize).
- 如果添加普通工作线程,是否超过线程池最大线程容量(maximumPoolSize).
后面的 **break retry;** ,表示如果尝试通过 CAS 方式增加工作线程数 workerCount 成功,则跳出这个双循环,往下执行后面第二部分的代码
而 continue retry; 是再次校验下线程池状态是否发生变化,如果发生了变化则重新从最外层 for 开始继续循环执行。
通过第一部分代码的解析,我们发现只有 break retry; 的时候才能执行到后面第二部分的代码,而后面第二部分代码做了些什么呢?
boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建 Worker 对象实例w = new Worker(firstTask);//获取 Worker 对象里的线程final Thread t = w.thread;if (t != null) {//开启可重入锁,独占final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.//获取线程池运行状态int rs = runStateOf(ctl.get());//满足 rs < SHUTDOWN 判断线程池是否是RUNNING,或者//rs == SHUTDOWN && firstTask == null 线程池如果是SHUTDOWN,且首个任务firstTask 为空,if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//将Worker实例加入线程池 workersworkers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//线程添加成功标志位 -> trueworkerAdded = true;}} finally {//释放锁mainLock.unlock();}//如果worker实例加入线程池成功,则启动线程,同时修改线程启动成功标志位 -> trueif (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)//添加线程失败addWorkerFailed(w);}return workerStarted;
这部分代码主要的目的其实就是启动一个线程,前面是一堆的条件判断,看是否能够启动一个工作线程。它由两个try...catch...finally 内容组成,可以将他们拆开来看,这样就很容易看懂。
我们先看里面一层的 try...catch...finally,当 Worker 实例中的 Thread 线程不为空的时候,开启一个独占锁 ReentrantLock mainLock,防止其他线程也来修改操作。
try {//获取线程池运行状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}
- 首先检查线程池的状态,当线程池处于
RUNNING状态或者线程池处于SHUTDOWN状态但是当前线程的 firstTask 为空,满足以上条件时才能将 worker 实例添加进线程池,即workers.add(w);。 - 同时修改 largestPoolSize,largestPoolSize 变量用于记录出现过的最大线程数。
- 将标志位 workerAdded 设置为 true,表示添加工作线程成功。
- 无论成功与否,在 finally 中都必须执行
mainLock.unlock()来释放锁。
外面一层的try...catch...finally主要是为了判断工作线程是否启动成功,如果内层 try...catch...finally 代码执行成功,即 worker 添加进线程池成功,workerAdded 标志位置为 true,则启动 worker 中的线程 t.start(),同时将标志位 workerStarted 置为 true,表示线程启动成功。
if (workerAdded) {t.start();workerStarted = true;}
如果失败了,即 workerStarted == false,则在 finally 里面必须执行 addWorkerFailed(w) 方法,这个方法相当于是用来回滚操作的,前面增的这里移除,前面加的这里减去。
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)//从线程池中移除worker实例workers.remove(w);//通过CAS,将工作线程数量workerCount减1decrementWorkerCount();// 转换到 TERMINATED 状态tryTerminate();} finally {mainLock.unlock();}}
tryTeminate.java
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminate// 终端空闲线程interruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated(); // 钩子函数} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
如果(SHUTDOWN 状态,并且线程池和队列为空)或(STOP 状态并且池为空),则转换到 TERMINATED 状态。
如果有资格终止但 workerCount 非零,则中断空闲的工作程序以确保关闭信号传播。
必须在任何可能使终止成为可能的操作之后调用此方法
— 在关闭期间减少工作线程数或从队列中删除任务。该方法是非私有的,允许从 ScheduledThreadPoolExecutor 访问。
10. Worker类
上面我们分析了addWorker 方法的源码,并且看到了 Thread t = w.thread,workers.add(w)和t.start()等代码,知道了线程池的运行状态和添加工作线程的流程,那么我们还有一些疑问:
- 这里的 Worker 是什么?和 Thread 有什么区别?
- 线程启动后是如何拿任务?在哪拿任务去执行的?
- 阻塞队列满后,额外新创建的线程是去队列里拿任务的吗?如果不是那它是去哪拿的?
- 核心线程会一直存在于线程池中吗?额外创建的普通线程执行完任务后会销毁吗?
**Worker** 是 ThreadPoolExecutor 的一个内部类,主要是用来维护线程执行任务的中断控制状态,它实现了 Runnable 接口同时继承了 AQS,实现 Runnable 接口意味着 Worker 就是一个线程,继承 AQS 是为了实现独占锁这个功能。
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;//构造函数,初始化AQS的state值为-1Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}}
至于为什么没有使用可重入锁 ReentrantLock,而是使用 AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
lock方法一旦获取了独占锁,表示当前线程正在执行任务中。- 如果正在执行任务,则不应该中断线程。
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行
shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
interruptIdleWorkers.java
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}
Worker 类有一个构造方法,构造参数为给定的首个任务 firstTask,并持有一个线程 thread。thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,可以用来执行任务;
firstTask 用它来初始化时传入的第一个任务,这个任务可以有也可以为 null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务(核心线程创建时的情况);如果这个值是 null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
11. 任务运行-runWorker
上面我们一起看过线程的启动 t.start(),具体运行是在 Worker 的 run() 方法中
public void run() {runWorker(this);}
run() 方法中又调用了 runWorker() 方法,所有的实现都在这里
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
很多人看到这样的代码就感觉头痛,其实你细看,这里面我们可以看关键点,里面有三块 try...catch...finally 代码,我们将这三块分别单独拎出来看并且将抛异常的地方暂时删掉或注释掉,这样它看起来就清爽了很多
Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 循环的判断任务(firstTask或从队列中获取的task)是否为空while (task != null || (task = getTask()) != null) {// Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1w.lock();// 如果线程池运行状态是stopping, 确保线程是中断状态;// 如果不是stopping, 确保线程是非中断状态.if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();//此处省略了第二个try...catch...finally}// 走到这里说明某一次getTask()返回为null,线程正常退出completedAbruptly = false;} finally {//处理线程退出processWorkerExit(w, completedAbruptly);}
第二个try...catch...finally
try {beforeExecute(wt, task);Throwable thrown = null;//此处省略了第三个try...catch...finally} finally {task = null;w.completedTasks++;w.unlock();}
第三个try...catch...finally
try {// 运行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}
上面的代码中可以看到有 **beforeExecute**、**afterExecute** 和 **terminaerd** 三个函数,它们都是钩子函数,可以分别在子类中重写它们用来扩展 ThreadPoolExecutor,例如添加日志、计时、监视或者统计信息收集的功能。
beforeExecute():线程执行之前调用afterExecute():线程执行之后调用terminaerd():线程池退出时候调用
这样拆分完之后发现,其实主要注意两个点就行了,分别是 getTask() 和 task.run(),task.run() 就是运行任务,那我们继续来看下 getTask() 是如何获取任务的。
12. 获取任务-getTask
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);//1.线程池状态是 STOP,TIDYING,TERMINATED//2.线程池 shutdown 并且队列是空的.//满足以上两个条件之一则工作线程数wc减去1,然后直接返回nullif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//允许核心工作线程对象销毁淘汰 or 工作线程数 > 最大核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//1.工作线程数 > 最大线程数maximumPoolSize or (timed == true && timedOut == true)//2.工作线程数 > 1 或者队列为空//同时满足以上两个条件则通过CAS把线程数减去1,同时返回null。CAS把线程数减去1失败会进入下一轮循环做重试if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {/// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
里面有个关键字 allowCoreThreadTimeOut,它的默认值为 **false**,在 Java1.6 开始你可以通过 threadPoolExecutor.allowCoreThreadTimeOut(true) 方式来设置为 true,通过字面意思就可以明白这个字段的作用是什么了,即是否允许核心线程超时销毁。
:::info 默认的情况下核心线程数量会一直保持,即使这些线程是空闲的它也是会一直存在的,而当设置为 true 时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将销毁关闭。 :::
- execute 里根据 corePoolSize 、 workCount、maximumPoolSize 和 queue_size 判断要不要 执行 addWork()
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务- 线程池中的每一个线程被封装成一个
Worker对象,ThreadPool维护的其实就是一组Worker对象,Worker类继承了 AQS,并实现了Runnable接口
线程池如何做到复用?
**runWorker()**通过循环来取任务,通过**getTask()**方法。- 如果工作线程数大于核心线程数,则通过
poll()从队列取任务;如果工作线程数小于核心线程数,则通过take()从队列取任务;- 这 2 个方法等区别是
take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程。这样的话,是不是已经搞清楚线程池中的核心线程复用的原因了。
- 这 2 个方法等区别是
- 线程的唤醒是在
execute时,当调用workQueue.offer()方法,将任务放入阻塞队列时,会调用Condition.signal()方法唤醒一个之前阻塞的线程。这部分不细讲,感兴趣的同学自行查看。
总结
- 当 Thread 的 run 方法执行完一个任务之后,会循环地从阻塞队列中取任务来执行,这样执行完一个任务之后就不会立即销毁了;
- 当工作线程数小于核心线程数,那些空闲的核心线程再去队列取任务的时候,如果队列中的 Runnable 数量为0,就会阻塞当前线程,这样线程就不会回收了
[
](https://blog.csdn.net/weixin_48509270/article/details/106795567)
