ScheduledThreadPoolExecutor
定时线程池类的类结构图

内部原理
它用来处理延时任务或定时任务。

它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三 种提交任务的方式:
- schedule
- scheduledAtFixedRate (固定频率,无论任务有没完成,都生成新的任务待执行)
- scheduledWithFixedDelay(固定延迟时间,任务完成后才都生成新的延迟任务)
它采用DelayQueue存储等待的任务
1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
2. DelayQueue也是一个无界队列;
SchduledFutureTask

工作线程的执行过程:
工作线程会从DelayQueue取已经到期的任务去执行; 执行结束后重新设置任务的到期时间,再次放回DelayQueue
ScheduledThreadPoolExecutor会把待执行的任务放到工作队列 DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对 队列中的ScheduledFutureTask进行排序
SchduledFutureTask之run方法实现
1 2 3 4 5 6 7 8 9 10 11
| public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
|
- 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行 步骤2;
- 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后 直接返回,否则执行步骤3;
- 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行 结果,然后直接返回,否则执行步骤4和步骤5;
- 计算下次执行该任务的具体时间;
- 重复执行任务。
线程池任务的提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class Test { public static void main(String[] args) { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); scheduler.schedule(()->{ System.out.println("schedule只执行一次的任务"); }, 0, TimeUnit.SECONDS );
scheduler.scheduleAtFixedRate(()->{ System.out.println("scheduleAtFixedRate固定频率执行"); },1,5, TimeUnit.SECONDS);
scheduler.scheduleWithFixedDelay(()->{ System.out.println("scheduleAtFixedRate固定延迟执行"); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } },1,5, TimeUnit.SECONDS); } } schedule只执行一次的任务 scheduleAtFixedRate固定频率执行 scheduleAtFixedRate固定延迟执行 scheduleAtFixedRate固定频率执行 scheduleAtFixedRate固定延迟执行 scheduleAtFixedRate固定频率执行 scheduleAtFixedRate固定延迟执行 scheduleAtFixedRate固定频率执行 scheduleAtFixedRate固定延迟执行 scheduleAtFixedRate固定频率执行 scheduleAtFixedRate固定延迟执行 scheduleAtFixedRate固定频率执行 - 固定频率 scheduleAtFixedRate固定频率执行 - 固定频率 scheduleAtFixedRate固定延迟执行 scheduleAtFixedRate固定频率执行 ......
|
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己实现阻塞的工作队列,是因为 ScheduledThreadPoolExecutor要求的工作队列有些特殊。 DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和 PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以 DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近 的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次 执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的,下文中会说明)。
堆结构如下图:

可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数 组表示,可以转换成如下的数组:

在这种结构中,可以发现有如下特性: 假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则: 1 一个节点的左子节点的索引为:k = p * 2 + 1;
一个节点的右子节点的索引为:k = (p + 1) * 2;
一个节点的父节点的索引为:p = (k - 1) / 2。
如何保证每次取出的任务是优先级最大的
答:在任务提交进入队列时,保持DelayedWorkQueue的第一个总是最小的,通过递归对比即将插入下标结点的父节点,将最小结点层层上移,最终保证队列的第一个元素始终是最小的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
|