ScheduledExecutorService 继承图
ScheduledExecutorService的使用
public static void main(String[] args) {ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);// 单次执行executor.schedule(()-> {logger.info("delay 1000");}, 2, TimeUnit.SECONDS);// 循环执行executor.scheduleAtFixedRate(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}logger.info("scheduleAtFixedRate");}, 1, 2, TimeUnit.SECONDS);// 循环执行executor.scheduleWithFixedDelay(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}logger.info("scheduleWithFixedDelay");}, 1, 2, TimeUnit.SECONDS);}
scheduleAtFixedRate 与 scheduleWithFixedDelay 的区别
如上所示,假设一个task的执行周期过长(例3秒),scheduleAtFixedRate 会在(period)之后,将Task塞到等待队列中,而 scheduleWithFixedDelay 则是在执行完成之后,等待(delay)秒之后,在将Task塞到等待队列中
// scheduleAtFixedRate 的输出,从打印时间可以看出,// 周期为3秒(即任务的执行task,因为period小于执行周期,已经塞到了队列中等待执行)八月 31, 2020 2:20:38 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleAtFixedRate八月 31, 2020 2:20:41 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleAtFixedRate八月 31, 2020 2:20:44 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleAtFixedRate八月 31, 2020 2:20:47 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleAtFixedRate八月 31, 2020 2:20:50 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleAtFixedRate// scheduleWithFixedDelay的输出,从打印时间可以看出// 周期为5秒(任务的执行周期3s+delay2s),即等任务执行完成之后,经过delay之后,再执行下一个task八月 31, 2020 2:22:05 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleWithFixedDelay八月 31, 2020 2:22:10 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleWithFixedDelay八月 31, 2020 2:22:15 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleWithFixedDelay八月 31, 2020 2:22:20 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleWithFixedDelay八月 31, 2020 2:22:25 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0信息: scheduleWithFixedDelay
ScheduledExecutorService 源码分析
先看看 ScheduledFutureTask的组成
ScheduledExecutorService内部将Runnable封装成 ScheduledFutureTask
重要属性
// 任务的序列号private final long sequenceNumber;// 延时的时间private long time;// 任务的周期private final long period;// 执行的任务RunnableScheduledFuture<V> outerTask = this;// 堆顶的下标int heapIndex;
ScheduledFutureTask 的排序规则
public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;// 先比较任务的时间if (diff < 0)return -1;else if (diff > 0)return 1;// 任务的时间比较完成之后,再比较任务的序列号else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}
从构造函数开始
调用父类的构造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
因为 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor 即调用 ThreadPoolExecutor 的构造方法
schedule() 方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {// 校验参数if (command == null || unit == null)throw new NullPointerException();//1. 将 command(Runnable)封装成ScheduledFutureTask//2. decorateTask() 直接返回第二个参数,没啥用RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));//3. 延时执行delayedExecute(t);return t;}
delayedExecute()
private void delayedExecute(RunnableScheduledFuture<?> task) {// 判断当前的线程池状态是否是shutdown的,如果是,采取拒绝策略,直接拒绝if (isShutdown())reject(task);else {// 否则,线程池状态良好,获取workQueue,将task加入到workQueue中super.getQueue().add(task);// 因为有上下文的切换,所以,这里再判断一次线程池的状态// 如果线程池已经shutdown,或者,在当前的runState下不能执行入任务的话,就cancel取消人物if (isShutdown() &&// 因为是 schedule方法,period =0,所以 task.isPeriodic() = false// 执行 isRunningOrShutdown(true),// rs == RUNNING || (rs == SHUTDOWN && shutdownOK)// 所以,在shutdown状态的话,schedule可以执行,此判断不成立,执行ensurePrestart!canRunInCurrentRunState(task.isPeriodic()) &&// 否则,其他状态下,将任务移除remove(task))// 并取消futuretask.cancel(false);elseensurePrestart();}}
ensurePrestart()
void ensurePrestart() {// 获取当前的workerCountint wc = workerCountOf(ctl.get());// 如果小于核心线程数,addWroker创建线程执行任务if (wc < corePoolSize)addWorker(null, true);// 如果workerCount == 0,说明是第一次else if (wc == 0)addWorker(null, false);}
添加完worker之后,会执行runWorker方法,循环从workQueue中取出task,执行这个task
scheduleAtFixedRate()
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {// 校验参数if (command == null || unit == null)throw new NullPointerException();// 校验参数if (period <= 0)throw new IllegalArgumentException();// 将runnable封装成 ScheduledFutureTaskScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 直接返回 sft,啥都没干RunnableScheduledFuture<Void> t = decorateTask(command, sft);// 将sft的 outerTask 设置为 t(自己设置自己)sft.outerTask = t;// 延时执行delayedExecute(t);return t;}
delayedExecute()
private void delayedExecute(RunnableScheduledFuture<?> task) {// 判断当前的线程池状态是否是shutdown的,如果是,采取拒绝策略,直接拒绝if (isShutdown())reject(task);else {// 否则,线程池状态良好,获取workQueue,将task加入到workQueue中super.getQueue().add(task);// 因为有上下文的切换,所以,这里再判断一次线程池的状态// 如果线程池已经shutdown,或者,在当前的runState下不能执行入任务的话,就cancel取消人物if (isShutdown() &&// 因为是 scheduleAtFixedRate,此时的period>0,所以 task.isPeriodic() == true// 此时执行的就是isRunningOrShutdown(false)// rs == RUNNING || (rs == SHUTDOWN && shutdownOK) 这个判断就再也不成立了// 即返回false,因为在shutdown状态了,已经不能再提交新的task,这是个周期任务,已经// 无法执行了,所以在scheduleAtFixedRate中,线程池shutdown下,将会remove这个任务// 并cancel掉!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 否则,线程池状态良好,执行 ensurePrestart,创建worker,执行taskensurePrestart();}}
scheduleWithFixedDelay()
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {// 校验参数if (command == null || unit == null)throw new NullPointerException();// 校验参数if (delay <= 0)throw new IllegalArgumentException();// 封装 ScheduledFutureTaskScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));// 返回sftRunnableScheduledFuture<Void> t = decorateTask(command, sft);// 将outerTask指向自己sft.outerTask = t;// 延时执行delayedExecute(t);return t;}
delayedExecute() 和 scheduleAtFixedRate() 中的逻辑一样,只不过,period 变成了负数
添加完Worker之后,执行Task,执行的是 ScheduledFutureTask 这个Task中的 run() 方法
ScheduledFutureTask#run
public void run() {// 获取当前的Task是否是循环的Task// 在schedule中为false,其他均为trueboolean periodic = isPeriodic();// 如果是 scheduleAtFixedRate/scheduleWithFixedDelay// 且线程池 shutdown 的情况下,这个if成立,取消这个Futureif (!canRunInCurrentRunState(periodic))cancel(false);// 否则的话,就不是周期性任务,调用父类的run方法,执行任务(schedule的逻辑)else if (!periodic)ScheduledFutureTask.super.run();// 否则的话,就是周期性任务,调用父类的 runAndReset方法,执行任务// (scheduleAtFixedRate/scheduleWithFixedDelay) 的逻辑else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
cancel(false)
public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = super.cancel(mayInterruptIfRunning);if (cancelled && removeOnCancel && heapIndex >= 0)remove(this);return cancelled;}
setNextRunTime()
private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}
reExecutePeriodic(outerTask)
void reExecutePeriodic(RunnableScheduledFuture<?> task) {// 判断当前的线程池状态是否已经跑// 因为跑到了这个函数,必然是一个周期性的任务,不存在不是周期性的任务// canRunInCurrentRunState(true),在线程池shutdown的时候就是false,再也不能执行// 正常情况下(running状态),则会将task重新加入workQueue中等下下一次的执行if (canRunInCurrentRunState(true)) {super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}
