
用法
public class ScheduledThreadPoolExecutorExample {public static void main(String[] args) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);Task task = new Task("任务");System.out.println("Created : " + task.getName());//延迟多长时间之后只执行一次// executor.schedule(task, 2, TimeUnit.SECONDS);//延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;// executor.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS); //任务+延迟//延迟指定时间后执行一次,之后按照固定的时长周期执行;executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);//任延迟取最大值 稳定定时器}}
底层调用的;方法
private void delayedExecute(RunnableScheduledFuture<?> task) {//如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉if (isShutdown())reject(task);else {//与ThreadPoolExecutor不同,这里直接把任务加入延迟队列super.getQueue().add(task);//如果当前状态无法执行任务,则取消if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else//和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;//增加Worker,确保提交的任务能够被执行ensurePrestart();}}
/*
Specialized delay queue. To mesh with TPE declarations, this
class must be declared as a BlockingQueue even though
it can only hold RunnableScheduledFutures.
*/
DelayedWorkQueue
public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)// 容量扩增50%。grow();size = i + 1;// 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。if (i == 0) {queue[0] = e;setIndex(e, 0);} else {// 插入堆尾。siftUp(i, e);}// 如果新加入的元素成为了堆顶,则原先的leader就无效了。if (queue[0] == e) {leader = null;// 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。available.signal();}} finally {lock.unlock();}return true;}

RunnableScheduledFuture
siftup方法:
private void siftUp(int k, RunnableScheduledFuture<?> key) {// 找到父节点的索引while (k > 0) {// 获取父节点int parent = (k - 1) >>> 1;RunnableScheduledFuture<?> e = queue[parent];// 如果key节点的执行时间大于父节点的执行时间,不需要再排序了if (key.compareTo(e) >= 0)break;// 如果key.compareTo(e) < 0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面queue[k] = e;setIndex(e, k);// 设置索引为kk = parent;}// key设置为排序后的位置中queue[k] = key;setIndex(key, k);}}
任务执行:
public void run() {// 是否周期性,就是判断period是否为0。boolean periodic = isPeriodic();// 检查任务是否可以被执行。if (!canRunInCurrentRunState(periodic))cancel(false);// 如果非周期性任务直接调用run运行即可。else if (!periodic)ScheduledFutureTask.super.run();// 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();// 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。reExecutePeriodic(outerTask);}}
fied-rate模式和fixed-delay模式区别
private void setNextRunTime() {long p = period;/** fixed-rate模式,时间设置为上一次时间+p。* 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。* 这次任务还没执行完是 不会执行下一次的。*/if (p > 0)time += p;/*** fixed-delay模式,计算下一次任务可以被执行的时间。* 就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。*/elsetime = triggerTime(-p);}long triggerTime(long delay) {/** 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。** 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。*/return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}/*** 主要就是有这么一种情况:* 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。** 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。** 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。* 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。*/private long overflowFree(long delay) {Delayed head = (Delayed) super.getQueue().peek();if (head != null) {long headDelay = head.getDelay(NANOSECONDS);if (headDelay < 0 && (delay - headDelay < 0))delay = Long.MAX_VALUE + headDelay;}return delay;}

DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序(time小的排在前面),若time相同则根据sequenceNumber排序( sequenceNumber小的排在前面);
