原文链接:https://www.jianshu.com/p/4de5dca3b187
涉及的主要方法
void shutdown();List<Runnable> shutdownNow();boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;线程池状态
/** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:** RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/
void shutdown()
注释:
/*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution. Use {@link #awaitTermination awaitTermination}* to do that.** @throws SecurityException if a security manager exists and* shutting down this ExecutorService may manipulate* threads that the caller is not permitted to modify* because it does not hold {@link* java.lang.RuntimePermission}{@code ("modifyThread")},* or the security manager's {@code checkAccess} method* denies access.*/void shutdown();
该方法会停止ExecutorService添加新的任务, 但是老任务还是会继续执行.
This method does not wait for previously submitted tasks to * complete execution.
这句话指的是该方法会立即返回, 但不一定代表之前提交的任务已经全部完成了. 如果需要一个阻塞的方法, 可以调用awaitTermination方法.
该方法内部实现是设置了状态, 并interrupt了所有的空闲线程, 使其不再接受新的任务.
List<Runnable> shutdownNow()
注释:
/*** Attempts to stop all actively executing tasks, halts the* processing of waiting tasks, and returns a list of the tasks* that were awaiting execution.** <p>This method does not wait for actively executing tasks to* terminate. Use {@link #awaitTermination awaitTermination} to* do that.** <p>There are no guarantees beyond best-effort attempts to stop* processing actively executing tasks. For example, typical* implementations will cancel via {@link Thread#interrupt}, so any* task that fails to respond to interrupts may never terminate.** @return list of tasks that never commenced execution* @throws SecurityException if a security manager exists and* shutting down this ExecutorService may manipulate* threads that the caller is not permitted to modify* because it does not hold {@link* java.lang.RuntimePermission}{@code ("modifyThread")},* or the security manager's {@code checkAccess} method* denies access.*/List<Runnable> shutdownNow();
该方法尝试停止所有正在执行的任务, 停止对正在等待执行的任务的处理, 并且返回正在等待执行的任务.
同shutdown(), 该方法也是立刻返回的, 不会等到所有任务终止以后才返回.
因为终止是通过interrupt实现的, 所以如果那个任务没有对interrupt做出正确响应, 那么该方法将无法终止该任务. 所以传进去的任务需要对interrup做出合适的响应.
boolean awaitTermination(long timeout, TimeUnit unit)
注释:
/*** Blocks until all tasks have completed execution after a shutdown* request, or the timeout occurs, or the current thread is* interrupted, whichever happens first.** @param timeout the maximum time to wait* @param unit the time unit of the timeout argument* @return {@code true} if this executor terminated and* {@code false} if the timeout elapsed before termination* @throws InterruptedException if interrupted while waiting*/boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;
该方法是阻塞的, 阻塞到所有任务都完成(必须在shutdown调用之后)或者超时. 如果executor在超时之前终止了, 那么返回true, 否则返回false.
注意, 如果不在awaitTermination前调用shutdown, 则即使在超时之前所有任务都已经完成, awaitTermination仍然会等待着, 而且最后一定返回false, 因为没有shutdown的调用不会使executor的状态变为terminated.
例子:
public static void testAwaitTerminationWithoutShutdown(){Runnable runnable = () -> {System.out.println("I'm a very quick task");};executorService.submit(runnable);try {executorService.awaitTermination(3000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace();}}
上面这段代码将会阻塞3000毫秒, 并且最终返回true, 即使仅有的任务一瞬间就完成了, 因为没有对shutdown的调用, 所以executorService的状态不可能会变成terminated.
实例
shutdown后再尝试添加任务:
public static void testShutdown(){Runnable runnable = () -> {try {System.out.println("going to sleep for 1s");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}};scheduledExecutorService.submit(runnable);scheduledExecutorService.shutdown();try {scheduledExecutorService.submit(runnable);} catch (RejectedExecutionException e){System.out.println("cannot add task after shutdown");}}
输出(输出顺序不一定一致):
cannot add task after shutdowngoing to sleep for 1s
从输出可以到确实有RejectedExecutionException被抛出了, 另外从这次输出也可以看出shutdown确实立马就返回了.
shutdownNow()关闭成功的例子:
public static void shutdownNowNormally() throws InterruptedException {Runnable task = () -> {try {System.out.println(String.format("now is %s, I'm going to sleep for 10s", getCurrentTime()));Thread.sleep(10000);} catch (InterruptedException e) {System.out.println(String.format("someone asked me to terminate at: %s", getCurrentTime()));}};scheduledExecutorService.submit(task);Thread.sleep(1000);scheduledExecutorService.shutdownNow();}
输出:
Now is 13:47:30, I'm going to sleep for 10ssomeone asked me to terminate at: 13:47:31
shutdownNow()不成功的例子:
因为shutdownNow()最终是通过interrupt来打断工作线程, 如果任务没有对interrupt做出反应, 那么shutdownNow()将无法正常terminate.
public static void shutdownNowNotWorking(){Runnable task = () ->{while (true){try {System.out.println("I'm gonna sleep for 1s");Thread.sleep(1000);} catch (InterruptedException e) {System.out.println(String.format("I'll ignore this InterruptedException. Now is : %s", getCurrentTime()));}}};scheduledExecutorService.submit(task);scheduledExecutorService.shutdownNow();}
输出:
I'm gonna sleep for 1sI'll ignore this InterruptedException. Now is : 13:53:12I'm gonna sleep for 1sI'm gonna sleep for 1sI'm gonna sleep for 1sI'm gonna sleep for 1s
void shutdown()源码
ThreadPoolExecutor中的实现:
设置状态并interrupt全部空闲的工作线程(即不让其再继续从任务队列中获取任务). 但是之前提交的任务还会被执行.
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown();} finally {mainLock.unlock();}tryTerminate();}
interruptIdleWorkers方法:
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}
List<Runnable> shutdownNow()源码
ThreadPoolExecutor中的源码:
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}
其中interruptWorkers方法:
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}
可以看到即使这个工作线程已经拿到任务在执行中, 也会被interrupt, 这种情况需要我们的任务对interrupt做出响应, 否则就会导致shutdownNow也无法终止executorService.
runWoker()源码
idle worker指的就是正在执行while (task != null || (task = getTask()) != null)这个while条件的worker, 即还未成功取到task的任务.
而interruptIdleWorkers()方法就是针对这个状态的woker, 如果getTask()返回值是null, 那么该woker线程就会结束了. 从getTask()源码中可以看到, 如果shutdown的时候, wokerQueue(BlockingQueue)的poll()或者take()方法能够响应interrupt(), 从而导致getTask()会继续下一次循环, 从而能够检查到shutdown状态, 从而直接返回null, 进而使woker退出. 所以shutdown不会对已经进入while body的woker线程起作用.
而shutdown仅仅调用了一次interruptIdleWorkers(), 所以那些idle的wokers被直接结束了, 但是剩下的仍然在工作的workers不会受到影响, 如果任务队列中仍然有剩余的任务, 那么这些woker仍然能够取出并且完成 (因为shutdown()方法仅仅将状态改成了SHUTDOWN).
shutdownNow()中设置状态为STOP, 并调用了interruptWorkers()方法. 所以即使worker已经执行到task.run(), 如果我们传进去的任务的run方法有对interrupt做出合适响应, 那么依然可以被停止, 否则shutdownNow()也无法终止. 另外结合getTask(), 可以知道即使已经缓存在任务队列中的任务也不会被执行了 (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())).
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
其中getTask()方法:
private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();int rs = runStateOf(c);if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
其中processWorkerExit方法:
注意processWorkerExit方法会调用tryTerminate()方法. 所以每次有一个woker结束的时候, 都会尝试termiante, 所以仅仅调用shutdown也可以使得在全部任务完成以后terminate.
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}addWorker(null, false);}}
tryTerminate()源码:
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}}}
一个关键的地方在于interruptIdleWorkers(ONLY_ONE);, 下面是关于这个参数的解释:
如果这个参数是true的话, 那么一次最多interrupt一个空闲的worker. 因为每一个worker在退出的时候都会调用processWorkerExit方法, 而且processWorkerExit方法中也会继续调用tryTerminate()方法, 所以注释里面的propagate就能解释得通了. in case all threads are currently waiting, 这里还不是很理解, 这里是说避免所有线程都在那时刻等待的情况, 但是这样做的目的还是不很清楚.
总结
要让ExecutorService能够被正常关闭, 需要任务本身对interrupted这个状态做出反应, 否则可能无法正常关闭ExecutorService.
