线程池 是什么 线程池 :“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控
在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理 。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题: 如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁 线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?这就是线程池了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上 。 总的来说:任务提交给线程池后,它会给任务分配工作线程(Worker)来执行任务,任务完成后,工作线程回到ThreadPoolExecutor,被回收或者等待后续任务
线程池的优势 什么时候使用线程池?
线程池优势
1、减少开销,提升响应速度,可用性:重复使用已创建的线程,减少线程创建、销毁的性能开销,提高性能、无需等待线程创建即可立即执行。 2、便于管理:线程是稀缺资源,如果无限制创建,会消耗系统资源,提高线程切换时间,进而减低系统稳定性。使用线程池可以对线程资源统一调度,调优和监控 。
线程的实现方式
new Thread(Runnable runnable)
继承Tread ,重写run方法
Callable ,借助,new Thread(new FutureTask(new Callable<返回类型>() ))
1 2 3 4 5 6 7 8 9 10 public interface Runnable { public abstract void run () ; } public interface Callable<V> { V call () throws Exception; }
Executor框架 Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方 法。
下图为它的继承与实现
ExecutorService Executor下的一个重要子接口ExecutorService,其中定义了线程池的具体行为
1、excute(Runnable command):执行Runnable任务
2、submit(Task):提交Callable或者Runnable任务,并返回任务对应的FutureTask对象
3、shutdown():在完成已提交的任务后封闭办事,不在接受新任务
4、shutdownNow():停止所有正在履行的任务,不在接受新任务。
5、isTerminated():测试是否所有任务都履行完毕
6、isShutdown:测试是否该ExecutorService已被关闭
线程池的具体实现 1、ThreadPoolExecutor 默认线程池
2、ScheduledThreadPoolExecutor 定时线程池
ThreadPoolExecutor 线程池重点属性 ctl ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息:
线程池的运行状态 (runState)、线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount 。COUNT_BITS就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
1 2 3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;
ctl相关方法 1 2 3 4 5 6 7 8 9 10 11 12 13 private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; } private static int ctlOf (int rs, int wc) { return rs | wc; } RUNNING = -1 << COUNT_BITS; SHUTDOWN = 0 << COUNT_BITS; STOP = 1 << COUNT_BITS; TIDYING = 2 << COUNT_BITS; TERMINATED = 3 << COUNT_BITS;
1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行 处理。 (02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处 于RUNNING状态,并且线程池中的任务数为0!
2、 SHUTDOWN (1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。 (2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP (1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中 断正在处理的任务。 (2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING (1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。 (2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也 为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的 任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED (1) 状态说明:线程池彻底终止,就变成TERMINATED状态。 (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING - > TERMINATED。 进入TERMINATED的条件如下: 线程池不是RUNNING状态; 线程池状态不是TIDYING状态或TERMINATED状态; 如果线程池状态是SHUTDOWN并且workerQueue为空; workerCount为0; 设置TIDYING状态成功。
任务提交API 1 2 3 4 5 6 7 8 9 10 11 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 1、public void execute () 2、public Future<?> submit ()
参数解释
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会 提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime;
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
使用例子 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class PolicySample { public static void main (String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor pool = new ThreadPoolExecutor( 3 , 5 , 3 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); Future<String> future = null ; List<Future<String>> list = new ArrayList<Future<String>>(); for (int i=0 ;i<20 ;i++){ list.add(pool.submit(new CallTask(i+1 ))); Thread.sleep(50 ); } } }public class CallTask implements Callable <String > { private int i; public CallTask (int i) { this .i = i; } @Override public String call () throws Exception { TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName()+"提交顺序" +i); return "callTask 方法输出" ; } } pool-1 -thread-1 提交顺序1 pool-1 -thread-2 提交顺序2 pool-1 -thread-3 提交顺序3 pool-1 -thread-4 提交顺序6 pool-1 -thread-5 提交顺序7 pool-1 -thread-1 提交顺序4 pool-1 -thread-2 提交顺序5
线程池监控 1 2 3 4 public long getTaskCount () public long getCompletedTaskCount () public int getPoolSize () public int getActiveCount ()
线程池原理
源码分析 execute方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
addWorker方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker类 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,请参见JDK源码。
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
如果正在执行任务,则不应该中断线程;
如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
1 2 3 4 5 6 7 8 protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; }
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是 为了禁止在执行任务前对线程进行中断。 正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。
runWorker方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
这里说明一下第一个if判断,目的是:
如果线程池正在停止,那么要保证当前线程是中断状态;
如果不是的话,则要保证当前线程不是中断状态;
这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:
不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
在线程池处于RUNNING或SHUTDOWN状态时,调用shutdownNow()方法会使线程池进入到该状态。
STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。
总结一下runWorker方法的执行过程:
1.while循环不断地通过getTask()方法获取任务;
2.getTask()方法从阻塞队列中取任务;
3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
4.调用task.run()执行任务;
5.如果task为null则跳出循环,执行processWorkerExit()方法;
6.runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
getTask方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法。
processWorkerExit方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
work实现AQS不继承ReentrantLock的原因 可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的
Worker线程具有以下特性:
1、lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
2、如果正在执行任务,则不应该中断线程;
3、如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
4、线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程是否拥有锁来判断线程是否是空闲状态;
5、之所以使用不可重入锁,是为了避免正在执行的任务调用像setCorePoolSize这样的线程池控制方法时重新获取锁从而中断正在运行的线程。
1 2 3 4 5 6 7 protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; }
自定义线程池 MyExecutor.java
1 2 3 4 5 public interface MyExecutor { void execute (Runnable command) ; void shutdown () ; Runnable getTask () ; }
MyThreadPoolExecutor.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 public class MyThreadPoolExecutor implements MyExecutor { private static final int defaultQueueSize = 5 ; private static final int defaultPoolSize = 5 ; private static final long defaultAliveTime = 60l ; private static final int maxPoolSize = 50 ; private volatile int poolsize; private long completedTaskCount; private volatile RejectPolicy handler; private volatile boolean isShutDown = false ; private AtomicInteger ctl = new AtomicInteger(); public BlockingQueue<Runnable> getWorkQueue () { return workQueue; } private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private volatile boolean allowThreadTimeOut; private volatile long keepAliveTime; public MyThreadPoolExecutor () { this (defaultPoolSize,defaultQueueSize,defaultAliveTime,new DefaultRejectPolicy()); } public MyThreadPoolExecutor (int poolsize) { this (poolsize,defaultQueueSize,defaultAliveTime,new DefaultRejectPolicy()); } public MyThreadPoolExecutor (int poolsize, int queueSize, long keepAliveTime, RejectPolicy handler) { if (poolsize <= 0 || poolsize > maxPoolSize ) throw new IllegalArgumentException("线程池大小不能<=0" ); this .poolsize = poolsize; this .handler = handler; this .keepAliveTime = keepAliveTime; if (keepAliveTime > 0 ) allowThreadTimeOut = true ; this .workQueue = new ArrayBlockingQueue<Runnable>(queueSize); } @Override public void execute (Runnable task) { if (task == null ) throw new NullPointerException("任务不能为空" ); if (isShutDown) throw new IllegalStateException("线程池已销毁,禁止提交任务..." ); int c = ctl.get(); if (c < poolsize){ if (addWorker(task,true )) return ; }else if (workQueue.offer(task)){ }else { handler.rejected(task,this ); } } @Override public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { isShutDown = true ; for (Worker w : workers){ Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()){ try { t.interrupt(); } catch (Exception e) { } finally { w.unlock(); } } } } finally { mainLock.unlock(); } } @Override public Runnable getTask () { try { return allowThreadTimeOut ? workQueue.poll(keepAliveTime, TimeUnit.SECONDS) : workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null ; } public boolean isShutdown () { return isShutDown; } private void runWorker (Worker worker) { Thread wt = Thread.currentThread(); Runnable task = worker.firstTask; worker.firstTask = null ; boolean completedAbruptly = true ; try { while (task != null || (task=getTask())!=null ){ worker.lock(); if (isShutDown && !wt.isInterrupted()){ wt.interrupt(); } try { task.run(); } finally { task = null ; worker.completedTask++; worker.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(worker,completedAbruptly); } } private void processWorkerExit (Worker worker, boolean completedAbruptly) { if (completedAbruptly) ctl.decrementAndGet(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += worker.completedTask; workers.remove(worker); } finally { mainLock.unlock(); } if (completedAbruptly && !workQueue.isEmpty()){ addWorker(null ,false ); } } private boolean addWorker (Runnable r,boolean startNew) { if (startNew){ ctl.incrementAndGet(); } boolean workerAdded = false ; boolean workerStarted = false ; Worker w = new Worker(r); Thread t = w.thread; if (t != null ){ ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (!isShutDown){ if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded){ t.start(); workerStarted = true ; } } return workerStarted; } static AtomicInteger atomic = new AtomicInteger(); class Worker extends ReentrantLock implements Runnable { volatile long completedTask; final Thread thread; Runnable firstTask; public Worker (Runnable r) { this .firstTask = r; this .thread = new Thread(this ,"thread-name-" +atomic.incrementAndGet()); } public void run () { runWorker(this ); } } }
RejectPolicy 线程池满拒绝策略相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public interface RejectPolicy { void rejected (Runnable task, MyThreadPoolExecutor executor) ; }public class DiscardOldestRejectPolicy implements RejectPolicy { public DiscardOldestRejectPolicy () {} @Override public void rejected (Runnable task, MyThreadPoolExecutor executor) { if (!executor.isShutdown()){ executor.getWorkQueue().poll(); executor.execute(task); } } }public class PolicyException extends RuntimeException { public PolicyException () { super (); } public PolicyException (String message) { super (message); } }public class DefaultRejectPolicy implements RejectPolicy { public DefaultRejectPolicy () {} @Override public void rejected (Runnable task, MyThreadPoolExecutor executor) { System.out.println("任务已经满了" ); throw new PolicyException("任务已经满了" ); } }
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class MyThreadTest { public static void main (String[] args) { MyThreadPoolExecutor pool = new MyThreadPoolExecutor(3 ,3 ,60 ,new DiscardOldestRejectPolicy()); for (int i=0 ;i<10 ;i++){ pool.execute(new MyTask(i)); } try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } }
拒绝策略 1 2 3 4 AbortPolicy DiscardPolicy DiscardOldestPolicy CallerRunsPolicy
Executors工具类 1 2 3 4 * ExecutorService newFixedThreadPool (线程大小 n) ;创建大小固定的线程池 * ExecutorService newCachedThreadPool () ;缓存线程池,可根据需要,更改线程池大小 * ExecutorService newSingleThreadExecutor () ;创建单个线程池,只有一个线程 * ScheduledExecutorService newScheduledThreadPool (线程大小 n) ;线程大小固定,可以延迟或定时执行任务
线程池配置 tomcat默认线程池配置 1 2 3 4 5 6 #tomcat 线程池默认配置 server.tomcat.threads.max=200 server.tomcat.threads.min-spare=10 如果端口号是8080 那么接收到请求的线程名称格式:http-nio-8080-exec-1 参考文档:https://blog.csdn.net/m0_52789121/article/details/126080963
一般的应用线程池配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Configuration @Slf4j public class ExecutorPoolConfig { public static final String DEFAULT_EXECUTOR_POOL = "DefaultExecutorPool" ; public static final String CALLER_RUNS_EXECUTOR_POOL = "CallerRunsExecutorPool" ; @Value("${thread.pool.default.corePoolSize}") private int corePoolSize; @Value("${thread.pool.default.maxPoolSize}") private int maxPoolSize; @Value("${thread.pool.default.queueSize}") private int queueSize; @Value("${thread.pool.default.keepAlive}") private int keepAlive; @Value("${thread.pool.callRuns.corePoolSize}") private int corePoolSizeCallerRuns; @Value("${thread.pool.callRuns.maxPoolSize}") private int maxPoolSizeCallerRuns; @Value("${thread.pool.callRuns.queueSize}") private int queueSizeCallerRuns; @Value("${thread.pool.callRuns.keepAlive}") private int keepAliveCallerRuns; @Bean(DEFAULT_EXECUTOR_POOL) public ThreadPoolTaskExecutor defaultExecutorPool () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueSize); executor.setKeepAliveSeconds(keepAlive); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadNamePrefix("@DefaultThreadPool-" ); executor.setThreadFactory(TtlExecutors.getDisableInheritableThreadFactory(executor)); executor.initialize(); return executor; } @Bean(name = CALLER_RUNS_EXECUTOR_POOL) public ThreadPoolTaskExecutor callerRunsExecutorPool () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSizeCallerRuns); executor.setMaxPoolSize(maxPoolSizeCallerRuns); executor.setQueueCapacity(queueSizeCallerRuns); executor.setKeepAliveSeconds(keepAliveCallerRuns); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("CallerRunsThreadPool-" ); executor.setThreadFactory(TtlExecutors.getDisableInheritableThreadFactory(executor)); executor.initialize(); return executor; } }
TtlExecutors 参考
ThreadLocal 线程变量不能传递给子线程
InheritableThreadLocal 线程变量可以传递给子线程,但是遇到线程池使用场景时,只有线程创建的那一次实现了传递。
继承性 :InheritableThreadLocal是ThreadLocal的一个子类,它扩展了ThreadLocal的功能,使得子线程可以继承父线程的ThreadLocal变量的值。当父线程创建一个新的子线程时,子线程会接收到父线程InheritableThreadLocal变量的一个副本。
使用场景 :在某些情况下,当父线程需要向子线程传递一些数据时,InheritableThreadLocal非常有用。但是,它并不能解决在线程池或异步任务调用链中传递ThreadLocal值的问题,因为在这些场景中,线程是复用的,而不是新创建的
TransmittableThreadLocal 线程变量可以传递给子线程,在TtlExecutors提供的线程池中或者通过new Thread(TtlRunnable runnable)新启的线程,可以实现变量实时在父子线程中传递。
跨线程传递 :TransmittableThreadLocal(TTL)是阿里巴巴开源的一个框架,它解决了标准ThreadLocal无法在线程池或异步任务中正确传递值的问题。TTL允许在线程切换时(例如,在线程池中使用线程)保留原始线程的变量值,并在切换后恢复这些值。
增强的功能 :TTL不仅提供了InheritableThreadLocal的继承性,还增强了它,使其能够在复杂的线程环境中(如线程池、异步任务等)正确传递ThreadLocal值。这使得TTL成为处理跨线程上下文或状态传递的强大工具。
使用场景 :在需要跨线程传递上下文或状态的场景中,TTL非常有用。例如,在分布式系统中,你可能需要在多个线程或任务之间传递全链路id、用户信息等。TTL可以方便地帮助你实现这一点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class TransmittableThreadLocalTest { private static ThreadLocal<String> threadLocal = new ThreadLocal<String> (); private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<String> (); private static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<String> (); public static void main (String[] args) { threadLocal.set("threadLocal" ); inheritableThreadLocal.set("inheritableThreadLocal" ); transmittableThreadLocal.set("transmittableThreadLocal" ); new Thread(() -> { System.out.println(threadLocal.get()); System.out.println(inheritableThreadLocal.get()); System.out.println(transmittableThreadLocal.get()); }).start(); ExecutorService executorService1 = Executors.newCachedThreadPool(); Executor ttlExecutor = TtlExecutors.getTtlExecutor(executorService1); Runnable run = new Runnable() { @Override public void run () { System.out.println(threadLocal.get()); System.out.println(inheritableThreadLocal.get()); System.out.println(transmittableThreadLocal.get()); } }; ttlExecutor.execute(run); try { Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } threadLocal.set("threadLocal-1" ); inheritableThreadLocal.set("inheritableThreadLocal-1" ); transmittableThreadLocal.set("transmittableThreadLocal-1" ); ttlExecutor.execute(run); } }null inheritableThreadLocal 普通新建线程,inheritableThreadLocal默认生效 transmittableThreadLocal 普通新建线程,transmittableThreadLocal默认生效null inheritableThreadLocal Ttl线程池新建线程 inheritableThreadLocal默认生效 transmittableThreadLocal Ttl线程池新建线程 transmittableThreadLocal默认生效null inheritableThreadLocal Ttl线程池复用线程 inheritableThreadLocal 不生效 transmittableThreadLocal-1 Ttl线程池复用线程 inheritableThreadLocal 不生效
提问 1、ThreadPoolExecutor自身有哪些状态,如何维护这些状态?
1 2 3 4 5 6 //线程池存在5种状态 RUNNING = -1 << COUNT_BITS; //高3位为111 SHUTDOWN = 0 << COUNT_BITS; //高3位为000 STOP = 1 << COUNT_BITS; //高3位为001 TIDYING = 2 << COUNT_BITS; //高3位为010 TERMINATED = 3 << COUNT_BITS; //高3位为011
2、ThreadPoolExecutor如何维护内部的工作线程?
维护HashSet workers,记录线程池里正在工作异步线程。
正常情况下,整个工作线程的生命周期:从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
3、ThreadPoolExecutor处理任务的整体逻辑是什么样的?
提交任务给线程池后:
1、当前线程数 < 核心线程数,则添加工作线程并执行
2、否则,阻塞队列未满?,则添加至阻塞队列排队
3、否则,当前线程数 < 最大线程数,则添加工作线程并执行
4、否则,执行拒绝策略,任务提交失败