JDK线程池
Worker
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer // 继承AQS目的是为了实现不可重入的特性去反应线程现在的执行状态
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 官翻:工作器的工作线程。线程工厂创建失败时为空
final Thread thread;
/** Initial task to run. Possibly null. */
// 官翻:工作器的首个任务,可能为空
Runnable firstTask;
/** Per-thread task counter */
// 工作器累计执行的任务数量
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker 官翻:直到调用runWorker,否则不可以中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods 官翻:以下为锁的实现方法
//
// The value 0 represents the unlocked state. 官翻:state为0没有被锁住
// The value 1 represents the locked state. state为1被锁住
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 锁住
public void lock() { acquire(1); }
// 尝试锁住
public boolean tryLock() { return tryAcquire(1); }
// 释放锁
public void unlock() { release(1); }
// 是否被锁住
public boolean isLocked() { return isHeldExclusively(); }
// 如果工作线程就绪则打断它
// 这个方法是给interruptWorkers方法调用的,目的是为了防止打断还没有就绪的工作线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 官翻:如果当前运行中的工作线程数量小于最大核心线程数,尝试以提交的任务作为它的初始任务开启一个新的工作线程。
* 对addWorker的调用原子地检查线程池状态和worker数量,因此通过返回false来防止报错,
* 这些报错会在不应该添加线程的情况下添加线程。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 官翻:如果任务能够成功入队,那么我们仍然需要再次检查我们是否应该添加一个线程
* (因为上次检查已经失效)或线程池在进入此方法后是否关闭。所以我们重新检查状态,
* 如果已停止将入队操作回滚,或者启动一个新线程(如果没有)。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 官翻:如果任务无法入队,我们就尝试添加一个(非核心的)新线程。如果添加失败,就说明线程池已经关机或者Worker数达到最大上限遂拒绝任务
*/
// 1.当前工作线程数小于核心线程数,添加工作线程执行任务
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程
if (addWorker(command, true))
return;
// 添加成功返回,否则重读ctl
c = ctl.get();
}
// 2.如果线程池处于RUNNING状态,尝试将任务入队
if (isRunning(c) && workQueue.offer(command)) {
// 入队后重读ctl
int recheck = ctl.get();
// 再次检查线程池状态,如果不是RUNNING将任务移除队列并将其拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池Worker数为0(说明这个线程池的最大核心线程为0或者允许核心线程超时),
// 我们需要添加第一个非核心线程来执行工作队列中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果入队失败尝试添加非核心线程执行任务
else if (!addWorker(command, false))
// 如果无法添加非核心线程,拒绝任务
reject(command);
}
addWorker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// 双重自旋为了CAS失败重试
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 官翻:仅在必要的情况下检查队列空否
if (rs >= SHUTDOWN && // 在状态为非RUNNING的前提下:
! (rs == SHUTDOWN && // 如果状态为STOP、TIDYING或TERMINATED,返回添加失败。解释:该状态下不再创建新工作器
firstTask == null && // 如果状态为SHUTDOWN,同时firstTask不为空,返回添加失败。解释:该状态下不可再提交任务
! workQueue.isEmpty())) // 如果状态为SHUTDOWN,同时firstTask为空,同时队列为空,返回失败。解释:提交任务和队列都为空那还创个锤子
return false;
// 自旋为Worker数加一
for (;;) {
// 容量判断
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS操作 Worker数加一
if (compareAndIncrementWorkerCount(c))
// Worker数加一成功跳出循环
break retry;
// Worker数加一失败,重读ctl
c = ctl.get(); // Re-read ctl
// 线程池状态改变需要全部重新校验(跳到外循环继续)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// 官翻:否则,只是由于Worder数改变导致的CAS失败只需要重新内循环即可
}
}
// Worker的工作线程是否就绪(被调用了start方法)
boolean workerStarted = false;
// Worker是否成功添加到Worker集合
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
// t是被添加的Worker的工作线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 开始将Worker添加到Worker集合
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 当线程池处于RUNNING状态,或者处于SHUTDOWN状态且提交的任务为空时才可以添加Worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable 官翻:预检线程是否处于新建状态
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果Worker被成功添加到集合,调用线程对象的start方法,让其处于就绪状态(开始争抢CPU执行任务)
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果没有调用start被视为添加Worker失败
if (! workerStarted)
// 执行回调函数
// 该方法中的逻辑:将传入的工作器从工作器集合移除,并且为Worker数做CAS减一操作。
// 然后尝试终止线程池,因为添加Worker失败有可能是线程池已经被调用shutdown或者shutdownNow方法
addWorkerFailed(w);
}
// 只有Worder的工作线程被调用了start方法才算添加成功
return workerStarted;
}
getTask
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// timedOut用于标识调用工作队列的poll方法超时
boolean timedOut = false; // Did the last poll() time out? 官翻:上次调用poll超时了吗?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary. 官翻:仅在必要时检查队列是否为空(人话:把队列判空放在最后)
// 当线程池状态非RUNNING时,如果状态为STOP或者队列为空,方法返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 循环CAS将Worker数减一(因为工作线程获取不到任务会退出然后死亡)
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling? 官翻:工作器会超时吗?
// 当允许核心线程超时时或者当前Worker数已经大于最大核心线程数时,需要超时获取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) // 如果Worker数超出最大线程数,或者需要超时的情况下已经超时
&& (wc > 1 || workQueue.isEmpty())) { // 就需要在队列为空的时候让其退出
if (compareAndDecrementWorkerCount(c)) // wc > 1 是为了符合上边的“人话”,仅在必要时检查队列是否为空。
return null; // 当仅剩一个Worker时再判断队列中是否有任务,
continue; // CAS失败重试 只要队列有任务就算最后一个线程超时了,它也得干活!
}
try {
// 允许超时时调用poll方法获取任务,否则调用take获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // take在获取不到任务时会一直阻塞,这就是核心线程会一直留在线程池的原因
if (r != null)
return r;
// poll超时返回null
timedOut = true;
} catch (InterruptedException retry) {
// 当take或poll抛出打断异常,说明Worker被打断(有人调了工作线程的打断方法),重新检查状态来退出本方法
timedOut = false;
}
}
}
runWoker
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 初始化AQS的state,将其置为0,就可以被中断了
w.unlock(); // allow interrupts 官翻:允许中断
// 突然完成标识:当Worker正常退出时为false,发生异常突然结束就为true
boolean completedAbruptly = true;
try {
// 工作线程一旦启动,Worker就永远被困在这个循环中。除非getTask抛出中断异常,或者提交的任务发生异常,线程会异常退出。如果getTask返回空会正常退出
while (task != null || (task = getTask()) != null) { // 核心线程会阻塞在getTask方法。非核心线程会超时等待
// worker设置独占锁
// shutdown 时会判断当前worker的状态,根据独占锁的状态来判断worker是否在处理任务是否工作
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 官翻:如果线程池停止,确保线程被打断。如果没有,确保前程不被打断。
// 这需要在第二种情况下重新检查,以处理关闭清除中断时的无竞争。
if ((runStateAtLeast(ctl.get(), STOP) || // 1.线程池至少处于STOP状态,且没有被打断
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) // 2.线程池小于STOP状态,线程池处于中断状态刷新标志位后大于等于STOP
&& !wt.isInterrupted()) // 再次判断标志位如果没有被中断
wt.interrupt(); // 以上两种情况总结:如果线程池大于等于STOP,设置中断。否则删除中断标志
try {
// 钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
// 需要注意的是:除非提交的任务中有响应中断的逻辑,
// 否则,即使工作线程被设置中断标志位也不会抛出中断异常
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数
afterExecute(task, thrown);
}
} finally {
// 任务置空防止重复执行
task = null;
// Worker完成数统计
w.completedTasks++;
// 释放独占锁
w.unlock();
}
}
// 获取不到任务,退出循环,Worker正常下工,不是突然结束
completedAbruptly = false;
} finally {
// Worker退出流程,completedAbruptly为true异常退出,completedAbruptly为false正常退出
processWorkerExit(w, completedAbruptly);
}
}
processWorkerExit
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 这里只为异常退出做减一操作,因为正常退出已经在getTask方法中减过了不需要再减
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 官翻:如果是异常退出,说明还没调整工作器数量
decrementWorkerCount(); // 工作器数量CAS减1,失败重试直到成功为止
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将工作器完成任务数累计到线程池中
completedTaskCount += w.completedTasks;
// 将工作器删除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池(这里并不一定会终止,方法中会有层层状态判断)
// 之所以会尝试终止,是因为本工作器的退出有可能是调用关机或者立即关机导致的
tryTerminate();
// 以下代码是为了防止在线程池停止之前,阻塞队列里有任务,但是没有工作线程处理的尴尬状况
int c = ctl.get();
// 当线程池处于RUNNING或者SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// 如果不是异常结束尝试向池子中补充工作线程
if (!completedAbruptly) {
// 允许核心线程超时时池子中最小线程数就是0,否则就是核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
// 当阻塞队列中有任务,至少需要一个线程来处理任务
min = 1;
// 如果池子中的工作线程够用就结束方法
if (workerCountOf(c) >= min)
return; // replacement not needed 官翻:不需要替换
}
// 向池子中补充工作器
addWorker(null, false);
}
}
shutdown
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
/*
* 方法总结:
* 1.将线程池状态设置成SHUTDOWN
* 2.打断所有空闲工作线程
* 3.调用尝试终止线程池方法
*/
// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验
checkShutdownAccess();
// 自旋将线程池状态设置成SHUTDOWN,失败一直重试直到成功
advanceRunState(SHUTDOWN);
// 打断所有闲置工作线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor 官翻:为执行类型线程池设置的钩子函数
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
shutdownNow
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
/*
* 方法总结:
* 1.将线程池状态设置成SHUTDOWN
* 2.打断所有空闲工作线程
* 3.删除阻塞队列中的任务并把任务返回
* 4.调用尝试终止线程池方法
*/
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 自旋将线程池状态设置成STOP,失败一直重试知道成功
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 将阻塞队列中的任务全部删除并返回赋值给tasks
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
return tasks;
}
interruptIdleWorkers
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有工作线程
for (Worker w : workers) {
Thread t = w.thread;
// 获取不可重入独占锁之后再打断它(能够获取到锁的工作线程都是闲置线程)
if (!t.isInterrupted() && w.tryLock()) {
try {
// 这里没有像interruptWorkers方法一样调用worker的interruptIfStarted方法是因为
// 能获取到锁肯定都是非新建状态的工作线程,换句话说,他们肯定已经执行过runWorker方法中的unlock(使AQS的state变为0)
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// onlyOne为true的情况仅来自于tryTerminate方法
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptWorkers
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 这里不能调thread的interrupt方法,因为不确保线程是否已经调用过start方法
// 需要通过Worder中的方法来判断AQS的state是否大于等于0,才能打断线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
tryTerminate
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*
* 本方法的调用时机:关机时、 立即关机时、 添加工作器失败时、工作器退出时、 任务移除队列时、工作队列清空时
* 对应的方法: shutdown、shutdownNow、addWorder、 processWorkerExit、remove、 purge
*
* 方法总结:首先校验各种条件和状态是否符合终止条件,如符合将状态设置为TIDYING再调用钩子函数,执行钩子结束之后再将状态流转为TERMINATED,至此池子完全关闭
* 终止条件:1.如果是SHUTDOWN状态,需要阻塞队列中没有任务。2.是STOP状态。两者符合其一即可
*/
final void tryTerminate() {
// 自旋
for (;;) {
int c = ctl.get();
// 判断线程池状态
if (isRunning(c) || // 不生效的条件:1.线程池是RUNNING状态
runStateAtLeast(c, TIDYING) || // 2.线程池是TIDYING或TERMINATED状态
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 3.线程池是SHUTDOWN状态,但是阻塞队列不为空
return; // 总结:只有SHUTDOWN状态同时阻塞队列为空,或者为STOP状态才能继续
// 判断线程数量:如果工作线程数量不是零,本方法就无法继续进行。换句话说只有池子中没有线程了,才能关闭池子
if (workerCountOf(c) != 0) { // Eligible to terminate 官翻:能够终止
// 只打断一个空闲线程,然后返回结束
// 这里只打断一个工作线程是为了让其在退出的时候通过调用processWorkerExit方法再次进入tryTerminate方法,
// 让关闭信号在每个线程之间传播
interruptIdleWorkers(ONLY_ONE);
return;
}
// 下边是线程池的状态流转 SHUTDOWN或STOP --> TIDYING --> TERMINATED
// 如果CAS失败会自旋重试
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// CAS操作将线程池置为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 钩子函数,需要自行实现,用于关闭线程池后整理
terminated();
} finally {
// 不CAS,直接置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有调用awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS 官翻:如果CAS失败自旋重试
}
}
以下为,动态线程池,方法
setCorePoolSize
/**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
* they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
// 输入的核心线程数不可以小于0
if (corePoolSize < 0)
throw new IllegalArgumentException();
// 计算差额 δ = 新核心线程数 - 旧核心线程数
// δ > 0 需要扩容, δ < 0 需要缩容
int delta = corePoolSize - this.corePoolSize;
// 生效新核心线程数
this.corePoolSize = corePoolSize;
// 如果当前工作线程数大于新核心线程数,打断所有闲置线程
// 注意这里可能打算核心线程,因为核心线程本来就是计算出来的,没有一个标签来只能某个工作线程是否是核心线程
// 在这里进行判断时有两种情况,1.扩容:即使δ大于0需要扩容,也有可能出现工作线程数量大于核心线程,非核心线程也有可能在活跃中
// 这时候打断所有闲置线程会有可能让活跃的工作线程数量缩小到核心线程数以下,但是为了动态缩容这样做也是有必要的
// 2.缩容:当使δ小于0时需要缩容,为了即使把常驻线程缩到核心线程数以下,需要立刻打断所有闲置线程。
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
// 如果当前总工作线程数小于等于新核心线程数,并且需要扩容时,我们就需要根据队列中的任务数量来预热池子
else if (delta > 0) {
// We don't really know how many new threads are "needed". 官翻:我们并不知道到底有多少线程真正被需要。
// As a heuristic, prestart enough new workers (up to new 作为驱动性的方法,需要预热足够多的工作线程(预热到核心线程数个)
// core size) to handle the current number of tasks in 来处理当前工作队列中的任务,直到队列为空时就停止。
// queue, but stop if queue becomes empty while doing so.
// 上边这堆屁话的意思就是需要根据队列中的任务来预热线程,最大预热核心线程数个线程
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
// 队列空了就停
if (workQueue.isEmpty())
break;
}
}
}
setMaximumPoolSize
/**
* Sets the maximum allowed number of threads. This overrides any
* value set in the constructor. If the new value is smaller than
* the current value, excess existing threads will be
* terminated when they next become idle.
*
* @param maximumPoolSize the new maximum
* @throws IllegalArgumentException if the new maximum is
* less than or equal to zero, or
* less than the {@linkplain #getCorePoolSize core pool size}
* @see #getMaximumPoolSize
*/
public void setMaximumPoolSize(int maximumPoolSize) {
// 最大线程数需要大于零,并且需要大于等于核心线程数
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
// 直接赋值生效
this.maximumPoolSize = maximumPoolSize;
// 这里需要判断一下如果当前总工作线程数大于最大线程数,需要打断所有闲置工作线程
// 但是有一个问题,如果当前没有闲置线程怎么办?没关系,因为在getTask方法中会判断当前当前工作线程数是否大于最大线程数
// 如果大于,会直接返回null让工作线程正常退出
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
allowCoreThreadTimeOut (Method)
/**
* Sets the policy governing whether core threads may time out and
* terminate if no tasks arrive within the keep-alive time, being
* replaced if needed when new tasks arrive. When false, core
* threads are never terminated due to lack of incoming
* tasks. When true, the same keep-alive policy applying to
* non-core threads applies also to core threads. To avoid
* continual thread replacement, the keep-alive time must be
* greater than zero when setting {@code true}. This method
* should in general be called before the pool is actively used.
*
* @param value {@code true} if should time out, else {@code false}
* @throws IllegalArgumentException if value is {@code true}
* and the current keep-alive time is not greater than zero
*
* @since 1.6
*/
public void allowCoreThreadTimeOut(boolean value) {
// 防御性变成,存活时间必须大于0才能让核心线程超时
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
// 传入的新值和当前值不同时生效
if (value != allowCoreThreadTimeOut) {
// 生效新值
allowCoreThreadTimeOut = value;
// 如果新值允许核心线程超时,就打断所有闲置线程
// 这里为什么要打断所有闲置线程呢?因为如果工作队列中一直没有任务,那核心线程就会一直阻塞在阻塞队列的take方法中
if (value)
interruptIdleWorkers();
}
}
setKeepAliveTime
/**
* Sets the time limit for which threads may remain idle before
* being terminated. If there are more than the core number of
* threads currently in the pool, after waiting this amount of
* time without processing a task, excess threads will be
* terminated. This overrides any value set in the constructor.
*
* @param time the time to wait. A time value of zero will cause
* excess threads to terminate immediately after executing tasks.
* @param unit the time unit of the {@code time} argument
* @throws IllegalArgumentException if {@code time} less than zero or
* if {@code time} is zero and {@code allowsCoreThreadTimeOut}
* @see #getKeepAliveTime(TimeUnit)
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
// 防御性编程,时间必须大于等于0
if (time < 0)
throw new IllegalArgumentException();
// 如果允许核心线程超时,超时时间设置为0,如果工作队列中没有任务,核心线程就永远无法超时退出
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
// 将时间转换成纳秒
long keepAliveTime = unit.toNanos(time);
// 计算出和旧值得差值δ
long delta = keepAliveTime - this.keepAliveTime;
// 新值覆盖老值
this.keepAliveTime = keepAliveTime;
// 当超时时间缩短时,需要打断所有闲置线程。因为如果不打断,阻塞在poll方法中的工作线程,它的超时时间仍然是按照旧值计算的
if (delta < 0)
interruptIdleWorkers();
}
setThreadFactory
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
// 没什么好说的
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
setRejectedExecutionHandler
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
// 也没什么好说的
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
示例代码
/**
* 状态流转
* @see java.util.concurrent.ThreadPoolExecutor.RUNNING 运行
* ↓
* @see java.util.concurrent.ThreadPoolExecutor.SHUTDOWN 关机(调用shutdown)
* @see java.util.concurrent.ThreadPoolExecutor.STOP 停止(调用shutdownNow)
* ↓
* @see java.util.concurrent.ThreadPoolExecutor.TIDYING 整理
* ↓
* @see java.util.concurrent.ThreadPoolExecutor.TERMINATED 终止
*
* 关键成员
* @see java.util.concurrent.ThreadPoolExecutor.Worker 工作器:线程池中负责执行任务的包装内部类
* @see java.util.concurrent.ThreadPoolExecutor#execute 提交任务
* @see java.util.concurrent.ThreadPoolExecutor#addWorker 添加工作器
* @see java.util.concurrent.ThreadPoolExecutor#getTask 从队列中获取任务
* @see java.util.concurrent.ThreadPoolExecutor#runWorker 工作器执行任务体的代理方法
* @see java.util.concurrent.ThreadPoolExecutor#shutdown 关机:状态变为SHUTDOWN
* @see java.util.concurrent.ThreadPoolExecutor#shutdownNow 立即关机:状态变更为STOP
* @see java.util.concurrent.ThreadPoolExecutor#tryTerminate 调用时机:关机时 立即关机时 添加工作器失败时 工作器退出时 任务移除队列时 工作队列清空时
*/
testShutdownNow:
private static void testShutdownNow() throws InterruptedException {
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));
for (int i = 0; i < 3; i++) {
pool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 开始运行");
for (;;) {
if (Thread.interrupted()) {
throw new InterruptedException("被shutdownNow方法打断了");
}
}
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + " 发生异常:" + e.getMessage());
}
System.out.println(Thread.currentThread().getName() + " 结束运行");
});
}
Thread.sleep(5000);
System.out.println("5s过去了");
pool.shutdownNow();
Thread.sleep(5000);
System.out.println("10s过去了");
}
Tomcat线程池
tomcat线程池和jdk有细微差别
execute(对外)
/**
* 对外暴露的提交方法
*/
@Override
public void execute(Runnable command) {
execute(command, 0, TimeUnit.MILLISECONDS);
}
execute(废弃)
// .....
持续更新中…