- 总结与使用
- 1.extends 和 implement
- 2.构造方法
- 3.属性
- 4.方法
- native方法
- Java方法
- runStateOf(int c): 计算线程运行状态 c 是ctl , 做高位运算,得到状态值
- workerCountOf(int c):计算有效线程数量 ,c是ctl ,做低位运算,得到workcount
- ctlof(int rs,int wc): 计算 ctl值 rs 表示 runstate ,wc 表示 workcount
- runStateLessThan(int c,int s): 运行状态比较
- runStateAtLeast(int c, int s):
- isRunning(int c): 是否运行中
- compareAndIncrementWorkerCount(int expect): 比较并且增加workcount
- compareAndDecrementWorkerCount(int expect):比较并且减少workcount
- decrementWorkerCount(): 递减workcount ,线程终止时调用,do while循环,直到更新成功为止
- advanceRunState(int targetState):
- tryTerminate(): 尝试停止线程池,如果非运行中状态,会尝试停止线程池
- checkShutdownAccess(): 检查是否有关闭的权限
- interruptWorkers(): 中断所有的线程
- interruptIdleWorkers(Boolean onlyOne):中断未使用工作线程
- reject(Runnable command):执行拒绝策略
- onShutdown(): 调用shutdown 的进一步清理方法,参照ScheduledThreadPoolExecutor的重写
- isRunningOrShutdown: 是否运行状态 或者SHUTDOWN状态
- drainQueue: 将队列元素移动List中,并且删除队列元素
- addWorker: 添加线程
- addWorkerFailed: 添加线程失败,回滚相关数据
- processWorkerExit: 清理已经不使用worker,completedAbruptly 参数表示突然完成的,如执行的线程异常等
- getTask: 获取线程
- runWorker:
- execute():执行线程
- shutdown:关闭线程池,不再接受新的任务
- shutdownNow:立即关闭 状态直接变更到STOP,返回队列中的线程列表
- isShutdown: 是否shutdown 状态, 非RUNNING状态
- isTerminating():是否停止中状态 ,即 SHUTDOWN ,STOP , TINDYING
- isTerminated: 是否TERMINATED状态
- awaitTermination: 等待timeout时间,检查是否TERMINATED
- setCorePoolSize:设置核心线程数(修改)
- prestartCoreThread:预启动一个核心线程
- ensurePrestart:预启动一个核心线程,另外当核心线程数为0时也启动一个线程
- prestartAllCoreThreads:预启动所有核心线程
- allowCoreThreadTimeOut: 设置核心线程超时
- setMaximumPoolSize: 设置最大线程数
- setKeepAliveTime: 设置线程停止运行后的存活时间
- purge:删除队列中所有的future任务
- getActiveCount: 统计运行的线程数
- getTaskCount:统计运行中和运行完成的线程总数
- getCompletedTaskCount: 统计运行完成的线程总数
- 5.内部类
总结与使用
线程池的参数的使用
新进入的线程直接以核心线程去执行,如果核心线程没有空闲,那么加入到阻塞队列当中,如果阻塞队列满了,那么启用最大线程数,开始开辟新的线程执行任务,如果最大线程数也开辟满了,那么执行拒绝策略。如果线程池空闲下来了,那么按照设置的线程存活时间来销毁线程,如果设置了核心线程会超时,那么也会清理核心线程。
几个问题:
如果采取 拒绝策略 选择 回到主线程中执行 ,那么线程异常时,是否会引起主线程中断?
worker 和task 的关系是什么样的呢?
学习的前提: 了解位移运算相关的知识
1.extends 和 implement
extends AbstractExecutorService
抽象类 实现了ExecutorService接口,
public abstract class AbstractExecutorService implements ExecutorService {//返回给定的 运行线程和默认值的RunnableFutureprotected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}//返回 给定callable 的RunnableFutureprotected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}//提交任务public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}//提交任务,带有默认返回值public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}//提交callable 任务public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}//执行线程集合private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {if (tasks == null) throw new NullPointerException();int ntasks = tasks.size();if (ntasks == 0) throw new IllegalArgumentException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);try {ExecutionException ee = null;final long deadline = timed ? System.nanoTime() + nanos : 0L;Iterator<? extends Callable<T>> it = tasks.iterator();futures.add(ecs.submit(it.next()));--ntasks;int active = 1;for (;;) {Future<T> f = ecs.poll();if (f == null) {if (ntasks > 0) {--ntasks;futures.add(ecs.submit(it.next()));++active;}else if (active == 0)break;else if (timed) {f = ecs.poll(nanos, TimeUnit.NANOSECONDS);if (f == null)throw new TimeoutException();nanos = deadline - System.nanoTime();} else f = ecs.take();}if (f != null) {--active;try {return f.get();} catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);}}}if (ee == null)ee = new ExecutionException();throw ee;} finally {for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {try {return doInvokeAny(tasks, false, 0);} catch (TimeoutException cannotHappen) {assert false;return null;}}public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {return doInvokeAny(tasks, true, unit.toNanos(timeout));}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {if (tasks == null)throw new NullPointerException();long nanos = unit.toNanos(timeout);ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks)futures.add(newTaskFor(t));final long deadline = System.nanoTime() + nanos;final int size = futures.size();for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i));nanos = deadline - System.nanoTime();if (nanos <= 0L)return futures;}for (int i = 0; i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {if (nanos <= 0L)return futures;try {f.get(nanos, TimeUnit.NANOSECONDS);} catch (CancellationException ignore) {} catch (ExecutionException ignore) {} catch (TimeoutException toe) {return futures;}nanos = deadline - System.nanoTime();}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);}}}
public interface ExecutorService extends Executor {//有序关闭线程池,关闭中 执行之前提交的任务,但不接受新的任务。void shutdown();//尝试停止所有的任务,并返回在等待中的任务列表List<Runnable> shutdownNow();//是否已经关闭boolean isShutdown();//关闭后,所有任务都已经完成,返回trueboolean isTerminated();//阻塞, 所有任务执行完成,或者超时,或者中断(interrupt) 结束boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;//提交任务<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);//执行任务集,并返回结果<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;//执行任务集,所有任务执行完成,或者超时 的时候 返回执行结果<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;//执行任务,返回指定的结果<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
public interface Executor {//执行线程void execute(Runnable command);}
2.构造方法
参数:
int corePoolSize :核心线程数
int maximumPoolSize: 最大线程数
long keepAliveTime:线程未使用后的存活时间
TimeUnit unit: 时间单位
BlockingQueue
ThreadFactory threadFactory: 线程工厂
RejectedExecutionHandler handler: 线程池满的拒绝策略
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
3.属性
静态常量:
// 32-3=29 位数控制private static final int COUNT_BITS = Integer.SIZE - 3;// 1 左移29位 -1 ,是 28位2进制1 ,即 2^29 -1 也就是线程池的最大容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;//线程状态//-1左移29位,10000000000000000000000000000001 -> 补码 01111111111111111111111111111110 -> 左移 11100000000000000000000000000000private static final int RUNNING = -1 << COUNT_BITS;// 0 左移,不改变 00000000000000000000000000000000private static final int SHUTDOWN = 0 << COUNT_BITS;//2^29 00100000000000000000000000000000private static final int STOP = 1 << COUNT_BITS;//2^30 01000000000000000000000000000000private static final int TIDYING = 2 << COUNT_BITS;//2^30 + 2^29 01100000000000000000000000000000private static final int TERMINATED = 3 << COUNT_BITS;//默认的拒绝策略private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();//调用shutdown和shutdownNow 需要的权限private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");//表示只中断一个线程private static final boolean ONLY_ONE = true;
常量
//运行状态控制 存放 workcount 工作线程数量 和 runState 线程池状态信息//为什么能存放两个值,ctl 是runstate | workcount , runstate 值分布在高位, 即第30,31位, workcount 分布在1-29位 ,所以两者做|运算,两个值都能保存private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//阻塞队列,final 确保队列大小不可改变private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();//工作线程集合,mainLock访问private final HashSet<Worker> workers = new HashSet<Worker>();//等待条件private final Condition termination = mainLock.newCondition();
属性
//池中最大线程数private int largestPoolSize;//完成的任务数private long completedTaskCount;//线程工厂private volatile ThreadFactory threadFactory;//拒绝策略private volatile RejectedExecutionHandler handler;//线程空闲后的存活时间private volatile long keepAliveTime;//是否允许核心线程超时,true表示允许,核心线程会在keepAliveTime时间后超时。false表示空闲状态,核心线程一直存活。private volatile boolean allowCoreThreadTimeOut;//核心线程数private volatile int corePoolSize;//最大线程数 不能大于CAPACITYprivate volatile int maximumPoolSize;
4.方法
native方法
Java方法
runStateOf(int c): 计算线程运行状态 c 是ctl , 做高位运算,得到状态值
// c & ~CAPACITY CAPACITY = 00001111111111111111111111111111 ~CAPACITY = 111100000000000000000000000private static int runStateOf(int c) { return c & ~CAPACITY; }
workerCountOf(int c):计算有效线程数量 ,c是ctl ,做低位运算,得到workcount
private static int workerCountOf(int c) { return c & CAPACITY; }
ctlof(int rs,int wc): 计算 ctl值 rs 表示 runstate ,wc 表示 workcount
private static int ctlOf(int rs, int wc) { return rs | wc; }
runStateLessThan(int c,int s): 运行状态比较
private static boolean runStateLessThan(int c, int s) { return c < s;}
runStateAtLeast(int c, int s):
private static boolean runStateAtLeast(int c, int s) { return c >= s;}
isRunning(int c): 是否运行中
private static boolean isRunning(int c) {return c < SHUTDOWN;}
compareAndIncrementWorkerCount(int expect): 比较并且增加workcount
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}
compareAndDecrementWorkerCount(int expect):比较并且减少workcount
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}
decrementWorkerCount(): 递减workcount ,线程终止时调用,do while循环,直到更新成功为止
private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}
advanceRunState(int targetState):
private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;}}
tryTerminate(): 尝试停止线程池,如果非运行中状态,会尝试停止线程池
final void tryTerminate() {for (;;) {int c = ctl.get();//状态为 STOP 或者状态为SHUTDOWN 但是队列为空的时候 不执行return操作if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//如果还有空闲的线程,那么中断一个线程if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE);return;}//获取锁,进行停止final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//状态更新为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));//唤醒所有等待的线程termination.signalAll();}return;}} finally {mainLock.unlock();}}}
checkShutdownAccess(): 检查是否有关闭的权限
private void checkShutdownAccess() {SecurityManager security = System.getSecurityManager();if (security != null) {security.checkPermission(shutdownPerm);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)security.checkAccess(w.thread);} finally {mainLock.unlock();}}}
interruptWorkers(): 中断所有的线程
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}
interruptIdleWorkers(Boolean onlyOne):中断未使用工作线程
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;//如果worker的线程在运行中,那么w.tryLock() 将会失败,否则可以中断workerif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}
reject(Runnable command):执行拒绝策略
final void reject(Runnable command) {handler.rejectedExecution(command, this);}
onShutdown(): 调用shutdown 的进一步清理方法,参照ScheduledThreadPoolExecutor的重写
void onShutdown() {};
isRunningOrShutdown: 是否运行状态 或者SHUTDOWN状态
final boolean isRunningOrShutdown(boolean shutdownOK) {int rs = runStateOf(ctl.get());// shutdownOk 为false,那么返回是否运行状态。 为true,返回是否为RUNNING和SHUTDOWN 状态return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);}
drainQueue: 将队列元素移动List中,并且删除队列元素
private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();//复制并删除队列元素q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;}
addWorker: 添加线程
private boolean addWorker(Runnable firstTask, boolean core) {//循环标志retry:for (;;) {//获取线程池ctl值int c = ctl.get();//计算线程池状态int rs = runStateOf(c);//如果线程池状态为SHUTDOWN,STOP,TIDYING,TERMINATED状态,// 并且如果是SHUTDOWN状态还需要firstTask不为空,阻塞队列为空,线程才会添加失败if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//计算线程池运行中的线程总数int wc = workerCountOf(c);//如果线程总数大于等于设置的容量上限,//或者 如果线程需要加入核心线程池,核心线程数的容量小于wc,如果不加入,那线程总数要小于wcif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//自增线程数,如果失败,退出最外层for循环if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();//如果运行状态变更,那么继续最外层for循环if (runStateOf(c) != rs) continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//根据传入的线程参数,新建一个workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//加锁后再次检查线程池的状态int rs = runStateOf(ctl.get());//线程池是RUNNING状态,或者 线程池是SHUTDOWN状态而且加入的线程不为空if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//先检查线程是否是alive,激活的,如果已经激活,那么线程已经在执行,不能加入线程池if (t.isAlive()) throw new IllegalThreadStateException();//添加到工作组中workers.add(w);int s = workers.size();if (s > largestPoolSize) largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//如果添加成功,那么直接启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)//如果添加失败,那么减少计数,并且在workers中移除计入的workeraddWorkerFailed(w);}return workerStarted;}
addWorkerFailed: 添加线程失败,回滚相关数据
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);decrementWorkerCount();tryTerminate();} finally {mainLock.unlock();}}
processWorkerExit: 清理已经不使用worker,completedAbruptly 参数表示突然完成的,如执行的线程异常等
private void processWorkerExit(Worker w, boolean completedAbruptly) {//递减工作线程数if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//增加线程池完成任务数completedTaskCount += w.completedTasks;//移除workerworkers.remove(w);} finally {mainLock.unlock();}//检查并尝试停止线程池tryTerminate();int c = ctl.get();//如果线程池是SHUTDOWN 或者 Running状态,if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {//如果没有设置核心线程数超时,那么如果worker的总数比核心线程数小,也需要添加workerint min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}//如果不是异常关闭,那么需要给线程池在添加一个workeraddWorker(null, false);}}
getTask: 获取线程
private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();int rs = runStateOf(c);//如果线程池状态为SHUTDOWN 或者 线程池状态为 STOP TIDYING TERMINATED并且阻塞队列为空// 此时递减worker的计数,并且返回空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//是否可以超时 允许核心线程超时终止 或者 线程总数已经大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//线程总数大于池的最大线程数 这时候需要递减worker count 并且返回空Task//如果超时了, 阻塞队列为空,那么也递减workcount ,返回空Task//即线程池现无待执行的线程,或者线程池满无法执行if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c)) return null;continue;}try {//如果是核心线程超时Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
runWorker:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();//获取worker中的线程,并且解锁worker对象Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//当worker中task等于空时,试图从队列中获取一个任务线程while (task != null || (task = getTask()) != null) {//先锁定workerw.lock();//如果线程池处于停止中状态,确保线程中断, 如果不是确保线程不中断if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {//任务执行前的操作beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//任务执行完之后的操作afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//如果执行失败,需要回滚数据processWorkerExit(w, completedAbruptly);}}
execute():执行线程
public void execute(Runnable command) {if (command == null) throw new NullPointerException();int c = ctl.get();//如果工作的线程总数小于核心线程数,那么直接创建一个线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果线程总数大于核心线程数,那么添加进队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态,并且移除队列中线程成功,那么执行阻塞策略reject(command);else if (workerCountOf(recheck) == 0)//如果工作线程的总数等于0,那么创建一个worker,传入的线程为null,会总动从阻塞队列中拿取线程addWorker(null, false);} else if (!addWorker(command, false))//如果阻塞队列无法添加,那么创建新的线程,数量取决于最大线程数reject(command);}
shutdown:关闭线程池,不再接受新的任务
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查权限checkShutdownAccess();//变更状态advanceRunState(SHUTDOWN);//中断正在等待的线程,空值默认调用 onlyOne =falseinterruptIdleWorkers();//ScheduledThreadPoolExecutor 需要执行,清理队列onShutdown();} finally {mainLock.unlock();}//尝试停止线程池tryTerminate();}
shutdownNow:立即关闭 状态直接变更到STOP,返回队列中的线程列表
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}
isShutdown: 是否shutdown 状态, 非RUNNING状态
public boolean isShutdown() {return ! isRunning(ctl.get());}
isTerminating():是否停止中状态 ,即 SHUTDOWN ,STOP , TINDYING
public boolean isTerminating() {int c = ctl.get();return ! isRunning(c) && runStateLessThan(c, TERMINATED);}
isTerminated: 是否TERMINATED状态
public boolean isTerminated() {return runStateAtLeast(ctl.get(), TERMINATED);}
awaitTermination: 等待timeout时间,检查是否TERMINATED
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {//如果已经停止,返回tureif (runStateAtLeast(ctl.get(), TERMINATED))return true;//如果已经超时,并且没有终止,返回falseif (nanos <= 0)return false;//等待timeout,纳秒单位,返回的值为timeout剩余时间 继续循环nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}
setCorePoolSize:设置核心线程数(修改)
public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0) throw new IllegalArgumentException();//差值int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;//如果线程总数大于核心线程数 收缩workerCountif (workerCountOf(ctl.get()) > corePoolSize)//中断Worker,worker在运行中时获取不到中断需要的锁,所以中断的是未使用的workerinterruptIdleWorkers();else if (delta > 0) {//如果设置的核心线程数比之前的的大 并且 线程总数 小于核心线程数//添加worker ,delta如果大于阻塞队列长度,那么添加worker数位阻塞队列长度数量,因为addWorker(null,true)方法创建的worker取值阻塞队列中的线程//当阻塞队列为0时也就不会创建workerint k = Math.min(delta, workQueue.size());//循环创建workerwhile (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}
prestartCoreThread:预启动一个核心线程
public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);}
ensurePrestart:预启动一个核心线程,另外当核心线程数为0时也启动一个线程
void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)//线程总数等于0 并且线程总数不小于核心线程数,说明核心线程数为0addWorker(null, false);}
prestartAllCoreThreads:预启动所有核心线程
public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;}
allowCoreThreadTimeOut: 设置核心线程超时
public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)//中断未使用的workerinterruptIdleWorkers();}}
setMaximumPoolSize: 设置最大线程数
public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();this.maximumPoolSize = maximumPoolSize;//如果运行的线程总数大于最大线程数,那么需要中断未在运行的workerif (workerCountOf(ctl.get()) > maximumPoolSize)interruptIdleWorkers();}
setKeepAliveTime: 设置线程停止运行后的存活时间
public void setKeepAliveTime(long time, TimeUnit unit) {if (time < 0)throw new IllegalArgumentException();if (time == 0 && allowsCoreThreadTimeOut())throw new IllegalArgumentException("Core threads must have nonzero keep alive times");long keepAliveTime = unit.toNanos(time);long delta = keepAliveTime - this.keepAliveTime;this.keepAliveTime = keepAliveTime;//如果时间是减小,中断未在运行的workerif (delta < 0)interruptIdleWorkers();}
purge:删除队列中所有的future任务
public void purge() {final BlockingQueue<Runnable> q = workQueue;try {Iterator<Runnable> it = q.iterator();while (it.hasNext()) {Runnable r = it.next();if (r instanceof Future<?> && ((Future<?>)r).isCancelled())it.remove();}} catch (ConcurrentModificationException fallThrough) {for (Object r : q.toArray())if (r instanceof Future<?> && ((Future<?>)r).isCancelled())q.remove(r);}tryTerminate();}
getActiveCount: 统计运行的线程数
public int getActiveCount() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int n = 0;for (Worker w : workers)if (w.isLocked())++n;return n;} finally {mainLock.unlock();}}
getTaskCount:统计运行中和运行完成的线程总数
public long getTaskCount() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {long n = completedTaskCount;for (Worker w : workers) {n += w.completedTasks;if (w.isLocked())++n;}return n + workQueue.size();} finally {mainLock.unlock();}}
getCompletedTaskCount: 统计运行完成的线程总数
public long getCompletedTaskCount() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {long n = completedTaskCount;for (Worker w : workers)n += w.completedTasks;return n;} finally {mainLock.unlock();}}
5.内部类
静态内部类
阻塞策略: CallerRunsPolicy 被拒绝的任务回到主线程中执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
阻塞策略: AbortPolicy 被拒绝的任务 异常抛出
public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
阻塞策略: DiscardPolicy 被拒绝的任务 直接丢弃,不做任何处理
public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
阻塞策略:DiscardOldestPolicy 被拒绝的任务 移除阻塞队列中的头部任务,然后执行 被拒绝的任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
内部类
Worker:
线程工作类,实现了Runable类,继承了抽象类 AbstractQueuedSynchronizer(FIFO队列,锁同步),work运行后,同时只有一个线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{private static final long serialVersionUID = 6138294804551838833L;//运行的线程final Thread thread;//初始任务Runnable firstTask;//线程任务计数器volatile long completedTasks;//Work 构造,根据新任务构建Worker(Runnable firstTask) {setState(-1); //禁止中断,直到worker开始运行this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//运行的主循环委托给外部public void run() {runWorker(this);}// state 0 代表无锁, 1代表已锁定protected boolean isHeldExclusively() {return getState() != 0;}//尝试获取锁protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
