图解线程池的工作原理

线程池的重要类
Executor接口
代表线程池的接口,有一个execute()方法,入参是Runnable类型对象。
通过分配一个线程进行处理执行。
ExecutorService接口
Executor的子接口,相当于是一个线程池的接口,有销毁线程池等方法。
Executors辅助类
线程池的辅助工具类,辅助入口类,可以通过Executors快速创建需要的线程池,比如Executors._newSingleThreadExecutor()_、Executors.newFixedThreadPool()、Executors.newCachedThreadPool()、Executors.newScheduledThreadPool()
ThreadPoolExecutor类
ExecutorService接口的实现类,真正代表一个线程池的类,一般在Executors里创建一个线程池时,内部都是直接创建一个ThreadPoolExecutor的实例对象进行返回。
corePoolSize:线程池中的核心线程数。maximumPoolSize:线程池里允许创建的最大线程数keepAliveTime:如果线程数量大于corePoolSize时,多出来的线程在空闲时间内会等待指定的keepAliveTime时间后自动释放掉。unit:keepAliveTime的时间单位。workQueue:通过ThreadPoolExecutor.execute()方法扔进来的Runnable工作任务,如果已经到达了corePoolSize个数量都在处理任务时,多余的工作任务就会进入到workQueue进行排队。threadFactory:如果需要创建新的线程放入线程池时,就通过这个线程工厂来创建线程。handler:如果workQueue具有固定的大小,往队列里扔的任务数量超过队列大小,且已经有maximumPoolSize个线程负责处理任务,则新的任务就会使用handler的拒绝策略进行拒绝处理。public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
线程池内的任务如何被提交
任务对应的就是Runnable对象,通过执行ThreadPoolExecutor.execute()方法就可以处理工作任务。 ```java /*
- 分3步执行
- 如果工作线程的数量小于核心线程的数量,那新的工作任务请求过来时,
- 就会去创建一个新的线程去处理任务,直到创建的线程数量已经达到了核心线程数量。
- 具体的创建形式就是 addWorker *
- 在一个任务成功进入工作队列进行排队,需要检查是否能够添加一个线程来处理任务
否则,该任务就需要在工作队列中进行等待有空闲的线程来处理。
- 如果新的任务无法进入工作队列进行排队,此时就会创建新的线程处理新任务,
- 创建的线程数量不会大于最大线程数量maximumPoolSize。
如果创建线程失败,就会通过handler策略拒绝任务。 */
```java// addWorker: 添加新的线程去处理任务// workQueue.offer(command): 将任务放入到工作队列当中// isRunning(): 线程池处理运行状态,新的任务就只能放入到工作队列当中,等待线程空闲后进行处理public void execute(Runnable command) {// 如果任务为空,直接抛出异常if (command == null)throw new NullPointerException();// 原子变量ctl共同存储 线程状态+线程数量。int类型存储,高3位表示线程状态,后29位表示线程数量int c = ctl.get();// 判断当前的工作线程数量 是否小于核心线程数量,如果小于,则添加新的线程去执行任务if (workerCountOf(c) < corePoolSize) {// true: 代表的是可以创建最大数量为corePoolSize内的线程if (addWorker(command, true))return;c = ctl.get();}// 如果线程池处于RUNNING状态,就将任务放入到阻塞队列中if (isRunning(c) && workQueue.offer(command)) {// 重新获取ctl的值,因为把任务添加到队列时,线程的状态可能已经改变,所以需要重新获取int recheck = ctl.get();// 线程状态不是RUNNING状态,则从队列中删除if (! isRunning(recheck) && remove(command))reject(command);// 如果当前线程池一个空闲线程都没有,且队列已满,则添加一个非核心线程(对应maximumPoolSize)else if (workerCountOf(recheck) == 0)// false: 代表的是可以创建最大数量为maximumPoolSize内的线程addWorker(null, false);}// 如果队列满了,则添加非核心线程,如果添加失败,则执行拒绝策略else if (!addWorker(command, false))reject(command);}
任务是如何被线程池消费的
基于Worker类,对应的就是工作线程,在这个类中可以知道工作线程是如何消费任务的。
Worker类实现了Runnable接口,当线程启动时,就会去执行run()方法。通过run()方法去消费一个任务;runWorker()方法,内部会task.run()去消费任务;通过getTask()方法去任务队列当中获取任务,workQueue.take()Runnable task = w.firstTask, 工作线程负责处理的第一个任务task。 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 获取任务队列中的任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
}
```
