并发编程09-定时任务&定时线程池(非重点)

ScheduledThreadPoolExecutor

定时线程池类的类结构图

image-20211128162622219

内部原理

​ 它用来处理延时任务或定时任务。

image-20211128162931788

它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三 种提交任务的方式:

  1. schedule
  2. scheduledAtFixedRate (固定频率,无论任务有没完成,都生成新的任务待执行)
  3. scheduledWithFixedDelay(固定延迟时间,任务完成后才都生成新的延迟任务)

它采用DelayQueue存储等待的任务

1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序; 
 2. DelayQueue也是一个无界队列;

SchduledFutureTask

image-20211128163755450

工作线程的执行过程:

​ 工作线程会从DelayQueue取已经到期的任务去执行; 执行结束后重新设置任务的到期时间,再次放回DelayQueue
​ ScheduledThreadPoolExecutor会把待执行的任务放到工作队列 DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对 队列中的ScheduledFutureTask进行排序

SchduledFutureTask之run方法实现

1
2
3
4
5
6
7
8
9
10
11
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
  1. 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行 步骤2;
  2. 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后 直接返回,否则执行步骤3;
  3. 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行 结果,然后直接返回,否则执行步骤4和步骤5;
  4. 计算下次执行该任务的具体时间;
  5. 重复执行任务。

线程池任务的提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Test {
public static void main(String[] args) {
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(10);
scheduler.schedule(()->{
System.out.println("schedule只执行一次的任务");
}, 0, TimeUnit.SECONDS );

scheduler.scheduleAtFixedRate(()->{
System.out.println("scheduleAtFixedRate固定频率执行");
},1,5, TimeUnit.SECONDS);

scheduler.scheduleWithFixedDelay(()->{
System.out.println("scheduleAtFixedRate固定延迟执行");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
},1,5, TimeUnit.SECONDS);
}
}
schedule只执行一次的任务
scheduleAtFixedRate固定频率执行
scheduleAtFixedRate固定延迟执行
scheduleAtFixedRate固定频率执行
scheduleAtFixedRate固定延迟执行
scheduleAtFixedRate固定频率执行
scheduleAtFixedRate固定延迟执行
scheduleAtFixedRate固定频率执行
scheduleAtFixedRate固定延迟执行
scheduleAtFixedRate固定频率执行
scheduleAtFixedRate固定延迟执行
scheduleAtFixedRate固定频率执行 - 固定频率
scheduleAtFixedRate固定频率执行 - 固定频率
scheduleAtFixedRate固定延迟执行
scheduleAtFixedRate固定频率执行
......

DelayedWorkQueue

​ ScheduledThreadPoolExecutor之所以要自己实现阻塞的工作队列,是因为 ScheduledThreadPoolExecutor要求的工作队列有些特殊。 DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和 PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以 DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近 的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次 执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的,下文中会说明)。

堆结构如下图:

image-20211128172423526

可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数 组表示,可以转换成如下的数组:

image-20211128172520637

在这种结构中,可以发现有如下特性: 假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则: 1 一个节点的左子节点的索引为:k = p * 2 + 1;

​ 一个节点的右子节点的索引为:k = (p + 1) * 2;

​ 一个节点的父节点的索引为:p = (k - 1) / 2。

如何保证每次取出的任务是优先级最大的

​ 答:在任务提交进入队列时,保持DelayedWorkQueue的第一个总是最小的,通过递归对比即将插入下标结点的父节点,将最小结点层层上移,最终保证队列的第一个元素始终是最小的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//即将插入下标结点的父节点下标
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}