并发编程07-Executor线程池原理与源码分析

线程池

是什么

线程池:“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控

​ 在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题: 如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁 线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

​ 那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?这就是线程池了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上
​ 总的来说:任务提交给线程池后,它会给任务分配工作线程(Worker)来执行任务,任务完成后,工作线程回到ThreadPoolExecutor,被回收或者等待后续任务

线程池的优势

什么时候使用线程池?

  • 单个任务处理时间比较短
  • 需要处理的任务数量很大

线程池优势

1、减少开销,提升响应速度,可用性:重复使用已创建的线程,减少线程创建、销毁的性能开销,提高性能、无需等待线程创建即可立即执行。
2、便于管理:线程是稀缺资源,如果无限制创建,会消耗系统资源,提高线程切换时间,进而减低系统稳定性。使用线程池可以对线程资源统一调度,调优和监控

线程的实现方式

  1. new Thread(Runnable runnable)
  2. 继承Tread,重写run方法
  3. Callable,借助,new Thread(new FutureTask(new Callable<返回类型>() ))
1
2
3
4
5
6
7
8
9
10
// 实现Runnable接口的类将被Thread执行,表示一个基本的任务 
public interface Runnable {
// run方法就是它所有的内容,就是实际执行的任务
public abstract void run();
}
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带 有返回内容
public interface Callable<V> {
// 相对于run方法的带有返回值的call方法
V call() throws Exception;
}

Executor框架

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方 法。

下图为它的继承与实现

image-20211114234452760

ExecutorService

​ Executor下的一个重要子接口ExecutorService,其中定义了线程池的具体行为

1、excute(Runnable command):执行Runnable任务

2、submit(Task):提交Callable或者Runnable任务,并返回任务对应的FutureTask对象

3、shutdown():在完成已提交的任务后封闭办事,不在接受新任务

4、shutdownNow():停止所有正在履行的任务,不在接受新任务。

5、isTerminated():测试是否所有任务都履行完毕

6、isShutdown:测试是否该ExecutorService已被关闭

线程池的具体实现

1、ThreadPoolExecutor 默认线程池

2、ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

线程池重点属性

ctl

​ ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息:

线程池的运行状态(runState)、线程池内有效线程的数量(workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

1
2
3
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

ctl相关方法

1
2
3
4
5
6
7
8
9
10
11
12
13
//获取运行状态; 
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取活动线程数;
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取运行状态和活动线程数的值。
private static int ctlOf(int rs, int wc) { return rs | wc; }

//线程池存在5种状态
RUNNING = -1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行 处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处 于RUNNING状态,并且线程池中的任务数为0!

2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中 断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也 为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的 任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING - > TERMINATED。 进入TERMINATED的条件如下: 线程池不是RUNNING状态; 线程池状态不是TIDYING状态或TERMINATED状态; 如果线程池状态是SHUTDOWN并且workerQueue为空; workerCount为0; 设置TIDYING状态成功。

任务提交API

1
2
3
4
5
6
7
8
9
10
11
//线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
//任务提交
1、public void execute() //提交任务无返回值
2、public Future<?> submit() //任务执行完成后有返回值

参数解释

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会 提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime

线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime;

unit

keepAliveTime的单位;

workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;

4、priorityBlockingQuene:具有优先级的无界阻塞队列;

threadFactory

它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。

handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

1、AbortPolicy:直接抛出异常,默认策略;

2、CallerRunsPolicy:用调用者所在的线程来执行任务;

3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4、DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

使用例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class PolicySample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3,
5,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
Future<String> future = null;
List<Future<String>> list = new ArrayList<Future<String>>();
for (int i=0;i<20;i++){
list.add(pool.submit(new CallTask(i+1)));
Thread.sleep(50);
}
}
}
public class CallTask implements Callable<String> {
private int i;
public CallTask(int i) {
this.i = i;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"提交顺序"+i);
return "callTask 方法输出";
}
}
//4、5这2个任务进入阻塞队列后,6被提交发现阻塞队列也满了,就判断 核心线程数 < 当前线程 < 最大线程数,新开线程接受6、7
pool-1-thread-1提交顺序1
pool-1-thread-2提交顺序2
pool-1-thread-3提交顺序3
pool-1-thread-4提交顺序6
pool-1-thread-5提交顺序7
pool-1-thread-1提交顺序4
pool-1-thread-2提交顺序5

线程池监控

1
2
3
4
public long getTaskCount() //线程池已执行与未执行的任务总数 
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

线程池原理

image-20211118233809035

源码分析

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//正在执行的线程数小于核心线程数,则启动一个新线程,且把该任务当做第一个任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//入队成功,正在执行的线程数大于核心线程数且小于最大线程数,启动一个新线程执行新提交的任务
addWorker(null, false);
}
//入队失败,队列已满且正在执行的线程数小于于最大线程数,则启动线程执行该任务,否则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

image-20211121174123772

addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//整个循环做一些前置判断,判断线程池状态并且cas新增ctl的工作线程数成功,则真正进入创建worker过程
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//上锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker类

​ 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,请参见JDK源码。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?

​ 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

1
2
3
4
5
6
7
8
protected boolean tryAcquire(int unused) { 
//cas修改state,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

​ tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是 为了禁止在执行任务前对线程进行中断。 正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。

runWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

这里说明一下第一个if判断,目的是:

​ 如果线程池正在停止,那么要保证当前线程是中断状态;

​ 如果不是的话,则要保证当前线程不是中断状态;

这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:

​ 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。

​ 在线程池处于RUNNING或SHUTDOWN状态时,调用shutdownNow()方法会使线程池进入到该状态。

STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。

总结一下runWorker方法的执行过程:

​ 1.while循环不断地通过getTask()方法获取任务;

​ 2.getTask()方法从阻塞队列中取任务;

​ 3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;

​ 4.调用task.run()执行任务;

​ 5.如果task为null则跳出循环,执行processWorkerExit()方法;

​ 6.runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。

getTask方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

/** 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加 任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超 时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
/** 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控 制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

​ 这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

​ 什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。

​ getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。

processWorkerExit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
/**
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接 addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个 worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

image-20211121174031239

work实现AQS不继承ReentrantLock的原因

可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的

Worker线程具有以下特性:

1、lock方法一旦获取了独占锁,表示当前线程正在执行任务中;

2、如果正在执行任务,则不应该中断线程;

3、如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;

4、线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程是否拥有锁来判断线程是否是空闲状态;

5、之所以使用不可重入锁,是为了避免正在执行的任务调用像setCorePoolSize这样的线程池控制方法时重新获取锁从而中断正在运行的线程。

1
2
3
4
5
6
7
protected boolean tryAcquire(int unused) {
//cas修改state,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false; }

自定义线程池

MyExecutor.java

1
2
3
4
5
public interface MyExecutor {
void execute(Runnable command);
void shutdown();
Runnable getTask();
}

MyThreadPoolExecutor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
public class MyThreadPoolExecutor implements MyExecutor {
/**
* 默认队列大小
*/
private static final int defaultQueueSize = 5;

/**
* 默认池的大小
*/
private static final int defaultPoolSize = 5;

private static final long defaultAliveTime = 60l;

/**
* 线程池最大的大小
*/
private static final int maxPoolSize = 50;

/**
* 线程池大小
*/
private volatile int poolsize;

/**
* 任务容量
*/
private long completedTaskCount;

/**
* 拒绝策略
*/
private volatile RejectPolicy handler;

/**
* 是否已经中断
*/
private volatile boolean isShutDown = false;

/**
* active当前激活线程数
*/
private AtomicInteger ctl = new AtomicInteger();

public BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}

/**
* 队列
*/
private final BlockingQueue<Runnable> workQueue;

/**
* Lock
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* worker集合
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

/**
* 是否允许超时
*/
private volatile boolean allowThreadTimeOut;

private volatile long keepAliveTime;

public MyThreadPoolExecutor(){
this(defaultPoolSize,defaultQueueSize,defaultAliveTime,new DefaultRejectPolicy());
}

public MyThreadPoolExecutor(int poolsize){
this(poolsize,defaultQueueSize,defaultAliveTime,new DefaultRejectPolicy());
}

public MyThreadPoolExecutor(int poolsize, int queueSize, long keepAliveTime, RejectPolicy handler){
if(poolsize <= 0 || poolsize > maxPoolSize )
throw new IllegalArgumentException("线程池大小不能<=0");
this.poolsize = poolsize;
this.handler = handler;
this.keepAliveTime = keepAliveTime;
if(keepAliveTime > 0)
allowThreadTimeOut = true;
this.workQueue = new ArrayBlockingQueue<Runnable>(queueSize);
}

/**
* 执行任务
* @param task
*/
@Override
public void execute(Runnable task) {
if(task == null)
throw new NullPointerException("任务不能为空");
if(isShutDown)
throw new IllegalStateException("线程池已销毁,禁止提交任务...");

int c = ctl.get();
//任务数小于
if(c < poolsize){
if(addWorker(task,true))
return;
}else if(workQueue.offer(task)){

}else{
handler.rejected(task,this);//任务拒绝策略
}
}
@Override
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
isShutDown = true;
for (Worker w : workers){
Thread t = w.thread;
if(!t.isInterrupted() && w.tryLock()){
try {
t.interrupt();
} catch (Exception e) {
//e.printStackTrace();
} finally {
w.unlock();
}
}
}
} finally {
mainLock.unlock();
}

}
/**
* 取出任务
* @return
*/
@Override
public Runnable getTask() {
try {
return allowThreadTimeOut ? workQueue.poll(keepAliveTime, TimeUnit.SECONDS) : workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

public boolean isShutdown() {
return isShutDown;
}

private void runWorker(Worker worker){
Thread wt = Thread.currentThread();
Runnable task = worker.firstTask;
worker.firstTask = null;
boolean completedAbruptly = true;
try {
while (task != null || (task=getTask())!=null){
worker.lock();
if (isShutDown && !wt.isInterrupted()){
wt.interrupt();
}
try {
task.run();
} finally {
task = null;
worker.completedTask++; //当前线程完成的任务数
worker.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(worker,completedAbruptly);
}
}

private void processWorkerExit(Worker worker, boolean completedAbruptly) {
if(completedAbruptly)
ctl.decrementAndGet();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += worker.completedTask;
workers.remove(worker);
} finally {
mainLock.unlock();
}
if(completedAbruptly && !workQueue.isEmpty()){
addWorker(null,false);
}
}

/**
* 是否启动线程执行任务 or 放入
* @param r
* @param startNew
*/
private boolean addWorker(Runnable r,boolean startNew){
if(startNew){
ctl.incrementAndGet();
}
boolean workerAdded = false;
boolean workerStarted = false;

Worker w = new Worker(r);
Thread t = w.thread;
if(t != null){
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if(!isShutDown){ // 线程池未关闭
if (t.isAlive()) // 检查线程是否已经处于运行状态,start方法不能重复调用执行
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded){
t.start();
workerStarted = true;
}
}
return workerStarted;
}

static AtomicInteger atomic = new AtomicInteger();

class Worker extends ReentrantLock implements Runnable{

volatile long completedTask;
final Thread thread;
Runnable firstTask;

public Worker(Runnable r){
this.firstTask = r;
this.thread = new Thread(this,"thread-name-"+atomic.incrementAndGet());
}

public void run() {
runWorker(this);
}
}

}

RejectPolicy 线程池满拒绝策略相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public interface RejectPolicy {
/**
* 拒绝策略
* @param task
* @param executor
*/
void rejected(Runnable task, MyThreadPoolExecutor executor);
}
public class DiscardOldestRejectPolicy implements RejectPolicy {

public DiscardOldestRejectPolicy(){}
@Override
public void rejected(Runnable task, MyThreadPoolExecutor executor) {
if(!executor.isShutdown()){
executor.getWorkQueue().poll();
executor.execute(task);
}
}
}
public class PolicyException extends RuntimeException {

public PolicyException() {
super();
}

public PolicyException(String message) {
super(message);
}

}
public class DefaultRejectPolicy implements RejectPolicy {

public DefaultRejectPolicy(){}

@Override
public void rejected(Runnable task, MyThreadPoolExecutor executor) {
System.out.println("任务已经满了");
throw new PolicyException("任务已经满了");
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyThreadTest {
public static void main(String[] args) {
MyThreadPoolExecutor pool = new MyThreadPoolExecutor(3,3,60,new DiscardOldestRejectPolicy());
for (int i=0;i<10;i++){
pool.execute(new MyTask(i));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
}

image-20211121222630602

拒绝策略

1
2
3
4
AbortPolicy //拒绝任务并报错
DiscardPolicy //静默抛弃任务
DiscardOldestPolicy //它丢弃最旧的未处理请求
CallerRunsPolicy //任务被拒绝时,直接在execute方法的调用线程中运行被拒绝的任务——适用于一些必须执行的场景

Executors工具类

1
2
3
4
*  ExecutorService newFixedThreadPool(线程大小 n);创建大小固定的线程池
* ExecutorService newCachedThreadPool();缓存线程池,可根据需要,更改线程池大小
* ExecutorService newSingleThreadExecutor();创建单个线程池,只有一个线程
* ScheduledExecutorService newScheduledThreadPool(线程大小 n);线程大小固定,可以延迟或定时执行任务

线程池配置

tomcat默认线程池配置

1
2
3
4
5
6
#tomcat 线程池默认配置 
server.tomcat.threads.max=200
server.tomcat.threads.min-spare=10
如果端口号是8080
那么接收到请求的线程名称格式:http-nio-8080-exec-1
参考文档:https://blog.csdn.net/m0_52789121/article/details/126080963

一般的应用线程池配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Configuration
@Slf4j
public class ExecutorPoolConfig {

//默认线程池
public static final String DEFAULT_EXECUTOR_POOL = "DefaultExecutorPool";
//永不抛弃线程池
public static final String CALLER_RUNS_EXECUTOR_POOL = "CallerRunsExecutorPool";
@Value("${thread.pool.default.corePoolSize}")
private int corePoolSize;
@Value("${thread.pool.default.maxPoolSize}")
private int maxPoolSize;
@Value("${thread.pool.default.queueSize}")
private int queueSize;
@Value("${thread.pool.default.keepAlive}")
private int keepAlive;

@Value("${thread.pool.callRuns.corePoolSize}")
private int corePoolSizeCallerRuns;
@Value("${thread.pool.callRuns.maxPoolSize}")
private int maxPoolSizeCallerRuns;
@Value("${thread.pool.callRuns.queueSize}")
private int queueSizeCallerRuns;
@Value("${thread.pool.callRuns.keepAlive}")
private int keepAliveCallerRuns;

/**
* 默认线程池
* DiscardPolicy,线程池满了后,任务可能会被丢弃
* 但是有一个优势,就是不会阻塞主线程,进而不会阻塞TCP队列
*
* @return
*/
@Bean(DEFAULT_EXECUTOR_POOL)
public ThreadPoolTaskExecutor defaultExecutorPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueSize);
executor.setKeepAliveSeconds(keepAlive);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setThreadNamePrefix("@DefaultThreadPool-");
// 关闭inheritableThreadLocal
executor.setThreadFactory(TtlExecutors.getDisableInheritableThreadFactory(executor));
executor.initialize();
return executor;
}

/**
* 定时器适用线程池
* CallerRunsPolicy,不会丢弃任何的任务
*
* @return
*/
@Bean(name = CALLER_RUNS_EXECUTOR_POOL)
public ThreadPoolTaskExecutor callerRunsExecutorPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSizeCallerRuns);
executor.setMaxPoolSize(maxPoolSizeCallerRuns);
executor.setQueueCapacity(queueSizeCallerRuns);
executor.setKeepAliveSeconds(keepAliveCallerRuns);
/**
* CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("CallerRunsThreadPool-");
// 关闭inheritableThreadLocal
executor.setThreadFactory(TtlExecutors.getDisableInheritableThreadFactory(executor));
executor.initialize();
return executor;
}
}

TtlExecutors

参考

ThreadLocal

​ 线程变量不能传递给子线程

InheritableThreadLocal

​ 线程变量可以传递给子线程,但是遇到线程池使用场景时,只有线程创建的那一次实现了传递。

  • 继承性:InheritableThreadLocal是ThreadLocal的一个子类,它扩展了ThreadLocal的功能,使得子线程可以继承父线程的ThreadLocal变量的值。当父线程创建一个新的子线程时,子线程会接收到父线程InheritableThreadLocal变量的一个副本。
  • 使用场景:在某些情况下,当父线程需要向子线程传递一些数据时,InheritableThreadLocal非常有用。但是,它并不能解决在线程池或异步任务调用链中传递ThreadLocal值的问题,因为在这些场景中,线程是复用的,而不是新创建的

TransmittableThreadLocal

​ 线程变量可以传递给子线程,在TtlExecutors提供的线程池中或者通过new Thread(TtlRunnable runnable)新启的线程,可以实现变量实时在父子线程中传递。

  • 跨线程传递:TransmittableThreadLocal(TTL)是阿里巴巴开源的一个框架,它解决了标准ThreadLocal无法在线程池或异步任务中正确传递值的问题。TTL允许在线程切换时(例如,在线程池中使用线程)保留原始线程的变量值,并在切换后恢复这些值。
  • 增强的功能:TTL不仅提供了InheritableThreadLocal的继承性,还增强了它,使其能够在复杂的线程环境中(如线程池、异步任务等)正确传递ThreadLocal值。这使得TTL成为处理跨线程上下文或状态传递的强大工具。
  • 使用场景:在需要跨线程传递上下文或状态的场景中,TTL非常有用。例如,在分布式系统中,你可能需要在多个线程或任务之间传递全链路id、用户信息等。TTL可以方便地帮助你实现这一点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class TransmittableThreadLocalTest {

private static ThreadLocal<String> threadLocal = new ThreadLocal<String> ();
private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<String> ();
private static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<String> ();

public static void main(String[] args) {
threadLocal.set("threadLocal");
inheritableThreadLocal.set("inheritableThreadLocal");
transmittableThreadLocal.set("transmittableThreadLocal");

new Thread(() -> {
System.out.println(threadLocal.get());
System.out.println(inheritableThreadLocal.get());
System.out.println(transmittableThreadLocal.get());

}).start();
ExecutorService executorService1 = Executors.newCachedThreadPool();
//ttlExecutor execute或者submit方法自动包裹TtlRunable
Executor ttlExecutor = TtlExecutors.getTtlExecutor(executorService1);
Runnable run = new Runnable() {
@Override
public void run() {
System.out.println(threadLocal.get());
System.out.println(inheritableThreadLocal.get());
System.out.println(transmittableThreadLocal.get());
}
};
ttlExecutor.execute(run);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadLocal.set("threadLocal-1");
inheritableThreadLocal.set("inheritableThreadLocal-1");
transmittableThreadLocal.set("transmittableThreadLocal-1");
ttlExecutor.execute(run);
}
}

null
inheritableThreadLocal 普通新建线程,inheritableThreadLocal默认生效
transmittableThreadLocal 普通新建线程,transmittableThreadLocal默认生效
null
inheritableThreadLocal Ttl线程池新建线程 inheritableThreadLocal默认生效
transmittableThreadLocal Ttl线程池新建线程 transmittableThreadLocal默认生效
null
inheritableThreadLocal Ttl线程池复用线程 inheritableThreadLocal 不生效
transmittableThreadLocal-1 Ttl线程池复用线程 inheritableThreadLocal 不生效

提问

1、ThreadPoolExecutor自身有哪些状态,如何维护这些状态?

1
2
3
4
5
6
//线程池存在5种状态 
RUNNING = -1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

image-20221112231057056

2、ThreadPoolExecutor如何维护内部的工作线程?

维护HashSet workers,记录线程池里正在工作异步线程。

正常情况下,整个工作线程的生命周期:从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:

image-20211121174031239

3、ThreadPoolExecutor处理任务的整体逻辑是什么样的?

提交任务给线程池后:

1、当前线程数 < 核心线程数,则添加工作线程并执行

2、否则,阻塞队列未满?,则添加至阻塞队列排队

3、否则,当前线程数 < 最大线程数,则添加工作线程并执行

4、否则,执行拒绝策略,任务提交失败